[addressing-devel] FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/
Brought to you by:
nodens2k
From: Gridsystems C. <gsc...@us...> - 2008-09-03 21:27:53
|
Update of /cvsroot/fura/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api In directory sc8-pr-cvs17.sourceforge.net:/tmp/cvs-serv8372/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api Modified Files: ApiShellBrokerImpl.java ShellBrokerUtils.java ShellExecutorList.java ShellExecutorWatchDog.java Log Message: cvssync-20080903232636 Index: ApiShellBrokerImpl.java =================================================================== RCS file: /cvsroot/fura/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api/ApiShellBrokerImpl.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** ApiShellBrokerImpl.java 23 Jul 2008 09:58:30 -0000 1.2 --- ApiShellBrokerImpl.java 3 Sep 2008 21:27:22 -0000 1.3 *************** *** 32,35 **** --- 32,36 ---- import com.gridsystems.innergrid.kernel.KernelException; import com.gridsystems.innergrid.kernel.server.ApiServerFactory; + import com.gridsystems.module.api.DownloadPolicy; import com.gridsystems.resource.ResourcePlugin; import com.gridsystems.resource.api.ApiResourceManager; *************** *** 229,232 **** --- 230,234 ---- ResourceProperties resProp; if (isNew) { + // New Shell Executor, it was not registered before shellexec = new Resource(); *************** *** 239,242 **** --- 241,247 ---- shellexec.setProperties(resProp); + // Save Resource + apires.addResource(shellexec); + } else { resProp = getResourceProperties(info, executionType); *************** *** 247,258 **** if (shellexec.getProperties().getState() != ResourceProperties.STATE_DISABLED) { resProp.setState(ResourceProperties.STATE_ACTIVE); } resProp.setUploadBandwidth(shellexec.getProperties().getUploadBandwidth()); resProp.setDownloadBandwidth(shellexec.getProperties().getDownloadBandwidth()); shellexec.setProperties(resProp); - } ! // Save Resource ! apires.addResource(shellexec); ShellExecutor shell = ShellExecutorList.getInstance().get(shellName); --- 252,264 ---- if (shellexec.getProperties().getState() != ResourceProperties.STATE_DISABLED) { resProp.setState(ResourceProperties.STATE_ACTIVE); + } else { + resProp.setState(shellexec.getProperties().getState()); } resProp.setUploadBandwidth(shellexec.getProperties().getUploadBandwidth()); resProp.setDownloadBandwidth(shellexec.getProperties().getDownloadBandwidth()); shellexec.setProperties(resProp); ! apires.updateResourceProperties(resId, resProp, true); ! } ShellExecutor shell = ShellExecutorList.getInstance().get(shellName); *************** *** 304,308 **** resProp.setCpus(new CPU[]{info.getCpu()}); resProp.setOss(new OS[] { info.getOs() }); ! resProp.setKeywords(null); break; default: --- 310,314 ---- resProp.setCpus(new CPU[]{info.getCpu()}); resProp.setOss(new OS[] { info.getOs() }); ! resProp.setKeywords(new String[0]); break; default: *************** *** 322,326 **** resProp.setMegaFlops(DEFAULT_MAINTENANCE_FACTOR * info.getMegaFlops()); resProp.setMemory(info.getMemory()); - resProp.setKeywords(new String[0]); resProp.setState(ResourceProperties.STATE_ACTIVE); --- 328,331 ---- *************** *** 329,332 **** --- 334,341 ---- resProp.setVersion(info.getVersion()); resProp.setOnOverloadPolicy(ResourceProperties.ON_OVERLOAD_IGNORE); + + // @todo Case 11073 + resProp.setSupportedResultFilterPolicies(new String[]{DownloadPolicy.POLICY_FINAL}); + return resProp; } Index: ShellExecutorList.java =================================================================== RCS file: /cvsroot/fura/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api/ShellExecutorList.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ShellExecutorList.java 16 Jul 2008 06:55:51 -0000 1.1 --- ShellExecutorList.java 3 Sep 2008 21:27:22 -0000 1.2 *************** *** 352,356 **** ShellBrokerUtils.markShellExecutorFree(resId); */ ! ShellExecutorWatchDog.awake(); //ShellBrokerUtils.assignWorkToShellExecutor(resId, apiShell); } --- 352,356 ---- ShellBrokerUtils.markShellExecutorFree(resId); */ ! ShellExecutorWatchDog.awake(resId); //ShellBrokerUtils.assignWorkToShellExecutor(resId, apiShell); } *************** *** 377,387 **** apiShell.cancelAssignment(ra); } catch (Exception e) { log.warn("Error while it tries cancel micro-task " + ra.getMicroTask() + " of task " + ra.getTask() + " in Shell Executor " + name, e); ! return; } // Find work for this Shell Executor ShellBrokerUtils.markShellExecutorFree(resId); ! ShellBrokerUtils.assignWorkToShellExecutor(resId, apiShell); } } --- 377,399 ---- apiShell.cancelAssignment(ra); } catch (Exception e) { + boolean doReturn = true; + if (e instanceof KernelException) { + KernelException ke = (KernelException)e; + // SHE002: This shell executor is not processing the microtask + if ("SHE002".equals(ke.getCode())) { + doReturn = false; + } + } log.warn("Error while it tries cancel micro-task " + ra.getMicroTask() + " of task " + ra.getTask() + " in Shell Executor " + name, e); ! if (doReturn) { ! return; ! } } // Find work for this Shell Executor ShellBrokerUtils.markShellExecutorFree(resId); ! //ShellBrokerUtils.assignWorkToShellExecutor(resId, apiShell); ! // To avoid deadlock, better to use this command. ! ShellExecutorWatchDog.awake(resId); } } *************** *** 572,576 **** if (arg0.getNewState() == TaskInfo.STATE_INPROGRESS) { log.debug("Calling from awake: " + arg0.getSource()); ! ShellExecutorWatchDog.awake(); } } --- 584,588 ---- if (arg0.getNewState() == TaskInfo.STATE_INPROGRESS) { log.debug("Calling from awake: " + arg0.getSource()); ! ShellExecutorWatchDog.awake(null); } } Index: ShellExecutorWatchDog.java =================================================================== RCS file: /cvsroot/fura/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api/ShellExecutorWatchDog.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ShellExecutorWatchDog.java 16 Jul 2008 06:55:51 -0000 1.1 --- ShellExecutorWatchDog.java 3 Sep 2008 21:27:22 -0000 1.2 *************** *** 19,22 **** --- 19,24 ---- import java.rmi.RemoteException; + import java.util.HashSet; + import java.util.Set; import org.apache.log4j.Logger; *************** *** 68,72 **** * The Shell Executor watchdog instance. */ ! private static ShellExecutorWatchDog watchdog = null;; --- 70,79 ---- * The Shell Executor watchdog instance. */ ! private static ShellExecutorWatchDog watchdog = null; ! ! /** ! * Place to store shell resources notifications. ! */ ! private final Set<ResourceId> notificationsList = new HashSet<ResourceId>(); *************** *** 83,90 **** /** * Sets timed out shell executors to <code>STATE_TIMEDOUT</code> state. ! * * @throws RemoteException if error */ ! private void doTimeoutShellExecutors() throws Exception { if (ShellExecutorList.getInstance().getCount() == 0) { return; --- 90,99 ---- /** * Sets timed out shell executors to <code>STATE_TIMEDOUT</code> state. ! * @param shells Shells that have requested work. ! * null if all need check if must work * @throws RemoteException if error */ ! private static void doTimeoutShellExecutors(ResourceId[] shells) ! throws RemoteException { if (ShellExecutorList.getInstance().getCount() == 0) { return; *************** *** 93,97 **** ApiResourceQuery api = ApiServerFactory.newApi(ApiResourceQuery.class); ApiShellExecutor apiShell; ! Resource[] resources = api.getAllResources(ApiShellBroker.RESOURCE_SHELL_FILTER); ApiDispatcher apiDisp = ApiServerFactory.newApi(ApiDispatcher.class); --- 102,107 ---- ApiResourceQuery api = ApiServerFactory.newApi(ApiResourceQuery.class); ApiShellExecutor apiShell; ! String filter = createFilter(shells); ! Resource[] resources = api.getAllResources(filter); ApiDispatcher apiDisp = ApiServerFactory.newApi(ApiDispatcher.class); *************** *** 105,108 **** --- 115,119 ---- log.debug(i + "/" + resources.length + " Checking Shell Executor " + name); } + // 1.- Getting instance of ApiShell ResourceProperties resProp2 = r.getProperties(); int state = resProp2.getState(); *************** *** 110,163 **** apiShell = ShellBrokerUtils.getInstanceOfApiShell(name); } catch (Exception ex) { ! if (state == ResourceProperties.STATE_DISABLED) { ! continue; ! } else { ! state = ResourceProperties.STATE_DISABLED; ! apiShell = null; ! log.warn(ex.getMessage(), ex); } } ! if (apiShell != null) { ! // Update Information of Shell Executor in Resource List. ! try { ! // Update ResourceProperties Bean ! Status status = apiShell.checkStatus(r.getId()); ! updateResourceFromStatusBean(apiMan, r.getId(), status); ! ! // Check if this Shell Executor is Free ! AssignmentStatus[] resAssign = status.getCurrentAssignments(); ! if ((resAssign == null) || (resAssign.length == 0) ! || r.getUsage().isMoreAssignmentsAccepted()) { ! isFree = true; ! } else { ! // Update Assignment Status ! try { ! for (int j = 0; j < resAssign.length; j++) { ! apiDisp.updateAssignmentStatus(resAssign[j]); ! } ! } catch (Exception e) { ! log.warn("Error updating shell executor assignment state", e); ! } ! } ! } catch (NullPointerException npe) { ! log.error("Null pointer exception: ", npe); ! return; ! } catch (Exception e) { ! if (state != ResourceProperties.STATE_DISABLED) { ! log.warn("Cannot connect with Shell Executor " ! + r.getId().getResourceName() ! + ". Set to Disabled state.", e); ! state = ResourceProperties.STATE_DISABLED; ! } } } ! ! if (state == ResourceProperties.STATE_DISABLED) { continue; } ! ! // Call to API to Update Shell state. ! if (state != resProp2.getState()) { ! updateShellExecutorState(apiMan, r.getId(), state); } if (log.isDebugEnabled()) { --- 121,171 ---- apiShell = ShellBrokerUtils.getInstanceOfApiShell(name); } catch (Exception ex) { ! log.error("Unknown error getting instance of ApiShell for " + name, ex); ! continue; ! } ! // At this point, apiShell never can be null. ! // 2.- Check connection with Shell ! Status status = null; ! ResourceId resId = r.getId(); ! try { ! status = apiShell.checkStatus(resId); ! } catch (RemoteException e) { ! if (state != ResourceProperties.STATE_TIMEDOUT) { ! log.warn("Cannot connect with shell " + resId + ":" + e.getMessage(), e); ! // Set state to Timeout ! updateShellExecutorState(apiMan, r.getId(), ResourceProperties.STATE_TIMEDOUT); } + continue; } ! // 3.-Update Information of Shell Executor in Resource List. ! try { ! // Update ResourceProperties Bean ! updateResourceFromStatusBean(apiMan, r, status); ! } catch (RemoteException e) { ! if (state != ResourceProperties.STATE_TIMEDOUT) { ! log.warn("Cannot connect with Shell Executor " ! + r.getId() + ". Set to Timeout state.", e); ! state = ResourceProperties.STATE_TIMEDOUT; } } ! if (r.getProperties().getState() == ResourceProperties.STATE_DISABLED) { continue; } ! // 4.- Check if this Shell Executor is Free ! AssignmentStatus[] resAssign = status.getCurrentAssignments(); ! if ((resAssign == null) || (resAssign.length == 0)) { ! isFree = true; ! } else { ! // Update Assignment Status ! for (int j = 0; j < resAssign.length; j++) { ! try { ! apiDisp.updateAssignmentStatus(resAssign[j]); ! } catch (RemoteException e) { ! log.warn("Error updating shell assignment state: " + resAssign[j], e); ! } ! } ! if (r.getUsage().isMoreAssignmentsAccepted()) { ! isFree = true; ! } } if (log.isDebugEnabled()) { *************** *** 166,170 **** if (isFree) { // Find work for this Shell Executor - ResourceId resId = r.getId(); ShellBrokerUtils.markShellExecutorFree(resId); ShellBrokerUtils.assignWorkToShellExecutor(resId, apiShell); --- 174,177 ---- *************** *** 175,178 **** --- 182,211 ---- /** + * Create filter of shell resources. + * @param shells List of shell to create filter. Null for all shell resources. + * @return Generated filter. + */ + private static String createFilter(ResourceId[] shells) { + String filter; + if (shells == null) { + filter = ApiShellBroker.RESOURCE_SHELL_FILTER; + } else { + StringBuffer filterBuf = new StringBuffer(); + for (int i = 0; i < shells.length; i++) { + ResourceId resId = shells[i]; + if (i > 0) { + filterBuf.append(" OR "); + } + filterBuf.append("( Id.ResourceName='").append(resId.getResourceName()); + filterBuf.append("' AND Id.ResourceType='").append(resId.getResourceType()); + filterBuf.append("' )"); + } + filter = filterBuf.toString(); + } + // @todo Add to filter Active_State + return filter; + } + + /** * Update state of Resource. * @param apiMan ApiResourceManager instance *************** *** 181,185 **** * @throws RemoteException If error */ ! private void updateShellExecutorState(ApiResourceManager apiMan, ResourceId id, int state) throws RemoteException { try { --- 214,218 ---- * @throws RemoteException If error */ ! private static void updateShellExecutorState(ApiResourceManager apiMan, ResourceId id, int state) throws RemoteException { try { *************** *** 198,202 **** * @param api Instance of ApiResourceQuery */ ! private void printAllResources(ApiResourceQuery api) { try { Resource[] res = api.getAllResources(null); --- 231,235 ---- * @param api Instance of ApiResourceQuery */ ! private static void printAllResources(ApiResourceQuery api) { try { Resource[] res = api.getAllResources(null); *************** *** 218,226 **** * Update information of ShellExecutor. * @param apiMan Instance of ApiResourceManager ! * @param id resource Identifier. * @param status Information sent by Shell Executor. */ ! private void updateResourceFromStatusBean(ApiResourceManager apiMan, ! ResourceId id, Status status) throws RemoteException { // Update Resource Properties ResourceProperties resProp = new ResourceProperties(); --- 251,260 ---- * Update information of ShellExecutor. * @param apiMan Instance of ApiResourceManager ! * @param res resource Identifier. * @param status Information sent by Shell Executor. */ ! private static void updateResourceFromStatusBean(ApiResourceManager apiMan, ! Resource res, Status status) throws RemoteException { ! ResourceId id = res.getId(); // Update Resource Properties ResourceProperties resProp = new ResourceProperties(); *************** *** 234,241 **** } CPU[] cpus = resProp.getCpus(); ! for (int j = 0; j < cpus.length; j++) { ! cpus[j].setUsagePercent(status.getCpuUsage()); } - apiMan.updateResourceProperties(id, resProp, true); } --- 268,279 ---- } CPU[] cpus = resProp.getCpus(); ! if (cpus != null) { ! for (int j = 0; j < cpus.length; j++) { ! cpus[j].setUsagePercent(status.getCpuUsage()); ! } ! } ! if (res.getProperties().getState() == ResourceProperties.STATE_TIMEDOUT) { ! resProp.setState(ResourceProperties.STATE_ACTIVE); } apiMan.updateResourceProperties(id, resProp, true); } *************** *** 269,275 **** this.wait(WAIT_TIME); } long id = (System.currentTimeMillis() / THOUSANDS_LONG) % TEN_THOUSANDS_LONG; log.debug("Checking system (" + id + ")..."); ! doTimeoutShellExecutors(); log.debug("System revised (" + id + "). "); --- 307,325 ---- this.wait(WAIT_TIME); } + if (this.isInterrupted()) { + break; + } + ResourceId[] shells = null; + synchronized (this.notificationsList) { + int size = notificationsList.size(); + if (size > 0) { + shells = notificationsList.toArray(new ResourceId[size]); + notificationsList.clear(); + } + } + long id = (System.currentTimeMillis() / THOUSANDS_LONG) % TEN_THOUSANDS_LONG; log.debug("Checking system (" + id + ")..."); ! doTimeoutShellExecutors(shells); log.debug("System revised (" + id + "). "); *************** *** 299,304 **** /** * Forces the thread to wake up. */ ! public synchronized void internalAwake() { this.notifyAll(); } --- 349,360 ---- /** * Forces the thread to wake up. + * @param resId Shell Resource that has produced this event. Null for all shells. */ ! public synchronized void internalAwake(ResourceId resId) { ! if (resId != null) { ! synchronized (this.notificationsList) { ! this.notificationsList.add(resId); ! } ! } this.notifyAll(); } *************** *** 310,314 **** public void interrupt() { super.interrupt(); ! internalAwake(); } --- 366,370 ---- public void interrupt() { super.interrupt(); ! internalAwake(null); } *************** *** 336,343 **** /** * Forces the thread to wake up. */ ! public static void awake() { if (watchdog != null) { ! watchdog.internalAwake(); } } --- 392,400 ---- /** * Forces the thread to wake up. + * @param resId Shell Resource that has produced this event. Null for all shells. */ ! public static void awake(ResourceId resId) { if (watchdog != null) { ! watchdog.internalAwake(resId); } } Index: ShellBrokerUtils.java =================================================================== RCS file: /cvsroot/fura/FuraSrc/Server/Plugins/ShellPlugin/brokerplugin/src/com/gridsystems/shellbroker/internal/api/ShellBrokerUtils.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ShellBrokerUtils.java 16 Jul 2008 06:55:51 -0000 1.1 --- ShellBrokerUtils.java 3 Sep 2008 21:27:22 -0000 1.2 *************** *** 243,250 **** try { Connection con = ShellBrokerUtils.getConnection(sh); ! apiShell = (ApiShellExecutor) ApiFactory.newApi(ApiShellExecutor.class, con); } catch (Exception e) { throw new Exception("Error while it tries create instance of ApiShellExecutor", e); } return apiShell; } --- 243,254 ---- try { Connection con = ShellBrokerUtils.getConnection(sh); ! apiShell = ApiFactory.newApi(ApiShellExecutor.class, con); } catch (Exception e) { throw new Exception("Error while it tries create instance of ApiShellExecutor", e); } + if (apiShell == null) { + throw new Exception("ApiFactory has produced an ApiShell with value null.", + new Exception()); + } return apiShell; } |