From: <bsc...@us...> - 2009-06-26 08:00:31
|
Revision: 4813 http://unicore.svn.sourceforge.net/unicore/?rev=4813&view=rev Author: bschuller Date: 2009-06-26 08:00:19 +0000 (Fri, 26 Jun 2009) Log Message: ----------- simplified action queue; simplified legacy TSI state update Modified Paths: -------------- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/JobRunner.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/LegacyTSI.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIConnectionFactory.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/persistence/AbstractActionStore.java xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/functional/PerfFileIOLegacyTSI.java xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/legacy/TestLegacyTSI.java xnjs/trunk/xnjs-module-core/src/test/resources/ems/example4.jsdl Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/JobRunner.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/JobRunner.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/JobRunner.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -33,6 +33,8 @@ package de.fzj.unicore.xnjs.ems; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.log4j.Logger; import de.fzj.unicore.xnjs.Configuration; @@ -64,15 +66,16 @@ volatile boolean isInterrupted=false; - private static int count = 1; + private static AtomicInteger count = new AtomicInteger(0); public JobRunner(InternalManager mgr, Configuration config) { super(); this.configuration=config; - super.setName("XNJS-JobRunner-"+count); + int n=count.incrementAndGet(); + super.setName("XNJS-JobRunner-"+n); this.mgr=mgr; - logger.debug("Job runner thread "+count+" starting"); - count++; + logger.debug("Job runner thread "+n+" starting"); + } @@ -83,7 +86,7 @@ public void interrupt() { isInterrupted=true; logger.debug(getName()+" stopping"); - count--; + count.decrementAndGet(); super.interrupt(); } Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -60,11 +60,12 @@ @Dependency(classes={TSIConnectionFactory.class}) public class Execution extends BasicExecution { - //GuardedBy lock - private Map<String,String> bssJobStates; - //GuardedBy lock - private final Map<String,String> jobIDs; - + private final Map<String,BSSInfo> bssInfo=new ConcurrentHashMap<String, BSSInfo>(); + + /** + * this lock protects against a race condition when submitting a job. Need to + * make sure that no status updates are run at the same time + */ private final Object lock=new Object(); private int updateInterval; @@ -97,8 +98,7 @@ public Execution(Configuration config){ super(config); factory=configuration.getComponentInstanceOfType(TSIConnectionFactory.class); - bssJobStates=new ConcurrentHashMap<String,String>(); - jobIDs=new ConcurrentHashMap<String,String>(); + try{ updateInterval=Integer.parseInt(getConfiguration().getProperty(UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL)); }catch(Exception e){ @@ -124,7 +124,6 @@ } }; getConfiguration().getScheduledExecutor().scheduleWithFixedDelay(r, updateInterval, updateInterval, TimeUnit.MILLISECONDS); - } /** @@ -135,25 +134,26 @@ protected void doSubmit(POSIXApplicationType appDescription, Action job) throws ExecutionException { TSIConnection conn=null; try{ - String tsiCmd=TSIUtils.makeSubmitCommand(appDescription, job, getConfiguration().getGrounder()); - conn=configuration.getComponentInstanceOfType(TSIConnectionFactory.class).getTSIConnection(job.getClient()); - String res=conn.send(tsiCmd); - job.addLogTrace("Command is: \n"+tsiCmd); - if(res.contains("TSI_FAILED")){ - job.addLogTrace("TSI reply: FAILED."); - throw new ExecutionException("Submission to classic TSI failed. Reply was <"+res+">"); - } - job.addLogTrace("TSI reply: submission OK."); - String bssid=res.trim(); //strip newline - job.setBSID(bssid); - String msg="Submitted to classic TSI as ["+conn.getIdLine()+"] with BSSID="+bssid; - logger.debug(msg); - job.addLogTrace(msg); - conn.done(); synchronized(lock){ - bssJobStates.put(bssid,"UNKNOWN"); - jobIDs.put(bssid,job.getUUID()); + String tsiCmd=TSIUtils.makeSubmitCommand(appDescription, job, getConfiguration().getGrounder()); + conn=configuration.getComponentInstanceOfType(TSIConnectionFactory.class).getTSIConnection(job.getClient()); + String res=conn.send(tsiCmd); + job.addLogTrace("Command is: \n"+tsiCmd); + if(res.contains("TSI_FAILED")){ + job.addLogTrace("TSI reply: FAILED."); + throw new ExecutionException("Submission to classic TSI failed. Reply was <"+res+">"); + } + job.addLogTrace("TSI reply: submission OK."); + String bssid=res.trim(); //strip newline + job.setBSID(bssid); + String msg="Submitted to classic TSI as ["+conn.getIdLine()+"] with BSSID="+bssid; + logger.debug(msg); + job.addLogTrace(msg); + conn.done(); + BSSInfo newJob=new BSSInfo(bssid,job.getUUID(),"UNKNOWN"); + bssInfo.put(bssid, newJob); } + }catch(Exception ex){ logger.error("Error submitting job.",ex); //rethrow @@ -171,8 +171,10 @@ try{ final String bssid=job.getBSID(); String status; - synchronized (lock) { - status=bssJobStates.get(bssid); + //synchronized (lock) { + BSSInfo info=bssInfo.get(bssid); + status=info!=null?info.bssState:null; + if(logger.isTraceEnabled())logger.trace("Action with bssid="+bssid+" is "+status); if("QUEUED".equals(status)){ @@ -192,19 +194,21 @@ i++; job.getProcessingContext().put(GRACE_PERIOD,i); job.setDirty(); + String jobID=job.getUUID(); + String bssID=job.getBSID(); + if(i.intValue()-1<gracePeriod){ - if(logger.isDebugEnabled())logger.debug("No BSS status found for "+job.getUUID()+" BSS id="+job.getBSID()+", will re-check."); - jobIDs.put(job.getBSID(), job.getUUID()); - bssJobStates.put(job.getBSID(), "UNKNOWN"); + if(logger.isDebugEnabled())logger.debug("No BSS status found for "+jobID+" BSS id="+bssID+", will re-check."); + bssInfo.put(bssID,new BSSInfo(bssID, jobID, "UNKNOWN")); } else { - if(logger.isDebugEnabled())logger.debug("No BSS status found for "+job.getUUID()+", assuming it is completed."); + if(logger.isDebugEnabled())logger.debug("No BSS status found for "+jobID+", assuming it is completed."); status="COMPLETED"; } - } + //} if ("COMPLETED".equals(status)){ - jobIDs.remove(job.getBSID()); + bssInfo.remove(job.getBSID()); //check outcome and set status code... if(job.getExecutionContext().getExitCode()==null)getExitCode(job); Integer exitCode=job.getExecutionContext().getExitCode(); @@ -228,12 +232,15 @@ //updates the status hashmap. Called periodically from a scheduler thread private void updateBSSStates() throws Exception { - logger.trace("Updating status on legacy TSI"); - TSIConnection conn=factory.getTSIConnection(factory.getBSSUSer()+" NONE"); - String res=conn.send(TSIUtils.makeStatusCommand()); - conn.done(); - if(logger.isTraceEnabled())logger.trace("BSS Status listing: \n"+res); - TSIUtils.updateStatusListing(bssJobStates,jobIDs,res,configuration.getInternalManager()); + //do not update while job is being submitted! + synchronized (lock) { + logger.trace("Updating status on legacy TSI"); + TSIConnection conn=factory.getTSIConnection(factory.getBSSUSer()+" NONE"); + String res=conn.send(TSIUtils.makeStatusCommand()); + conn.done(); + if(logger.isTraceEnabled())logger.trace("BSS Status listing: \n"+res); + TSIUtils.updateStatusListing(bssInfo,res,configuration.getInternalManager()); + } } @Override @@ -246,10 +253,9 @@ else{ try{ String status=null; - synchronized (lock) { - updateBSSStates(); - status=bssJobStates.get(bssid); - } + updateBSSStates(); + BSSInfo info=bssInfo.get(bssid); + status=info!=null?info.bssState:null; if(status!=null) { logger.debug("Aborting job <"+bssid+"> on legacy TSI"); TSIConnection conn=configuration.getComponentInstanceOfType(TSIConnectionFactory.class).getTSIConnection(job.getClient()); @@ -297,9 +303,25 @@ * returns the number of processes on the BSS (as seen by XNJS) */ public int getNumberOfBSSProcesses() throws ExecutionException{ - synchronized(lock){ - return bssJobStates.size(); + return bssInfo.size(); + } + + public static class BSSInfo{ + String bssID; + String jobID; + String bssState; + + public BSSInfo(){} + + public BSSInfo(String bssID,String jobID, String bssState){ + this.bssID=bssID; + this.jobID=jobID; + this.bssState=bssState; } + + public String toString(){ + return "BSSID="+bssID+" JOBID="+jobID+" STATUS="+bssState; + } } - + } Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/LegacyTSI.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/LegacyTSI.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/LegacyTSI.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -144,8 +144,8 @@ */ public void chmod(String file, Permissions perm) throws ExecutionException { begin(); - commands.append(CHMOD+" u-rwx "+file+"\n"); - commands.append(CHMOD+" u+"+perm.toString()+" "+file+"\n"); + commands.append(CHMOD+" u-rwx \""+file+"\"\n"); + commands.append(CHMOD+" u+"+perm.toString()+" \""+file+"\"\n"); commit(); } @@ -188,13 +188,13 @@ public void cp(String source, String target) throws ExecutionException { begin(); - commands.append(CP+" "+source+" "+target+"\n"); + commands.append(CP+" \""+source+"\" \""+target+"\"\n"); commit(); } public void rename(String source, String target) throws ExecutionException { begin(); - commands.append(MV+" "+source+" "+target+"\n"); + commands.append(MV+" \""+source+"\" \""+target+"\"\n"); commit(); } @@ -203,7 +203,7 @@ */ public void exec(String what, ExecutionContext ec) throws ExecutionException { begin(); - commands.append(CD+" "+ec.getWorkingDirectory()+"\n"); + commands.append(CD+" \""+ec.getWorkingDirectory()+"\"\n"); commands.append(what+"\n"); this.ec=ec; commit(); @@ -212,7 +212,7 @@ public void execAndWait(String what, ExecutionContext ec) throws ExecutionException { try{ begin(); - commands.append(CD+" "+ec.getWorkingDirectory()+"\n"); + commands.append(CD+" \""+ec.getWorkingDirectory()+"\"\n"); if(ec!=null && ec.getStdout()!=null){ what = what+" > "+ec.getStdout(); } @@ -252,7 +252,7 @@ */ public void mkdir(String dir) throws ExecutionException { begin(); - commands.append(MKDIR+" "+dir+"\n"); + commands.append(MKDIR+" \""+dir+"\"\n"); commit(); } @@ -261,7 +261,7 @@ */ public void mkfifo(String dir) throws ExecutionException { begin(); - commands.append(MKFIFO+" "+dir+"\n"); + commands.append(MKFIFO+" \""+dir+"\"\n"); commit(); } @@ -276,7 +276,7 @@ public String getEnvironment(String name)throws ExecutionException{ try{ TSIConnection conn=factory.getTSIConnection(user+" NONE"); - String cmd="echo $"+name; + String cmd="echo ${"+name+"}"; String tsicmd=TSIUtils.makeExecuteScript(cmd,null,getConfiguration()); String res=conn.send(tsicmd); conn.done(); @@ -305,7 +305,7 @@ */ public void rm(String target) throws ExecutionException { begin(); - commands.append(RM+" "+target+"\n"); + commands.append(RM+" \""+target+"\"\n"); commit(); } @@ -314,7 +314,7 @@ */ public void rmdir(String target) throws ExecutionException { begin(); - commands.append(RMDIR+" "+target+"\n"); + commands.append(RMDIR+" \""+target+"\"\n"); commit(); } @@ -453,9 +453,9 @@ public String doLS(String file,boolean normal, boolean recurse)throws ExecutionException{ String loc=TSI_LS; String cmd=PERL+" "+loc; - if(recurse && normal)cmd+=" R "+file; - else if(normal)cmd+=" N "+file; - else cmd+=" A "+file; + if(recurse && normal)cmd+=" R \""+file+"\""; + else if(normal)cmd+=" N \""+file+"\""; + else cmd+=" A \""+file+"\""; logger.debug("Executing tsi ls() command: "+cmd); String res=""; @@ -500,44 +500,13 @@ return f; } - public InputStream getInputStream(final String file) throws ExecutionException{ - return getInputStream_orig(file); - } - - public InputStream getDirectInputStream(final String file) throws ExecutionException,IOException { - final TSIConnection conn; - final long length=readLength(file); - conn=getReadStream(file, 0, length); - final InputStream tsiData=conn.getDataStream(); - return new InputStream(){ - int read=0; - boolean closed=false; - @Override - public void close() throws IOException { - if(closed)return; - conn.getLine(); //U4 NJS/TSI protocol: extra 'ENDOFMESSAGE' line - conn.done(); - } - - @Override - public int read() throws IOException { - read++; - if(read>length){ - close(); - closed=true; - return -1; - } - return tsiData.read(); - } - }; - } - + /** * read file from TSI */ - private InputStream getInputStream_orig(final String file) throws ExecutionException { + public InputStream getInputStream(final String file) throws ExecutionException { //figure out length of file - logger.debug("Reading from "+file); + logger.debug("Reading from '"+file+"'"); return new InputStream(){ //read buffer byte[] buffer=new byte[bufsize]; @@ -662,46 +631,8 @@ throw new IOException(e); } } + - //read a chunk from the TSI - private TSIConnection getReadStream(String file, long offset, long length)throws IOException{ - String tsicmd=TSIUtils.makeGetFileChunkCommand(file,offset,length); - String role=getRole(); - TSIConnection conn=null; - try{ - conn=factory.getTSIConnection(user+" "+role); - String res=conn.send(tsicmd); - if(!res.contains("TSI_OK")){ - throw new ExecutionException("GetFileChunk on legacy TSI failed. Reply was "+res); - } - else { - if(logger.isTraceEnabled()){ - logger.trace("TSI response: '"+res+"'"); - } - } - int av=0; - BufferedReader br=new BufferedReader(new StringReader(res)); - while(true){ - String l=br.readLine(); - if(l==null) break; - if(l.startsWith("TSI_LENGTH")){ - av= Integer.parseInt(l.split(" ")[1]); - break; - } - } - if(av!=length)throw new IllegalArgumentException("Requested data size does not match file size."); - - return conn; - - }catch(Exception e){ - if(conn!=null){ - try { conn.dead(); } catch (Exception e1) {} - } - throw new IOException(e); - } - } - - public OutputStream getOutputStream(String file) throws ExecutionException { return getOutputStream(file,false); } @@ -745,10 +676,13 @@ //write a blob of data to the named file //see XNJS 4 NJS BatchTargetSystem and TSI PutFiles.pm private void writeChunk(String file, byte[] buf, int numBytes, boolean append)throws IOException,TSIUnavailableException{ - if(logger.isDebugEnabled()) + if(logger.isDebugEnabled()){ logger.debug("Write to "+file+", append="+append+", numBytes="+numBytes); - + } String tsicmd=TSIUtils.makePutFilesCommand(append); + if(logger.isTraceEnabled()){ + logger.trace("TSI command: \n"+tsicmd); + } String role =getRole(); TSIConnection conn=factory.getTSIConnection(user+" "+role); String res=conn.send(tsicmd); Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIConnectionFactory.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIConnectionFactory.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIConnectionFactory.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -220,7 +220,6 @@ public void start() { try { - machine=getConfiguration().getProperty(TSI_MACHINE); String portS=getConfiguration().getProperty(TSI_PORT); port=Integer.parseInt(portS); @@ -240,9 +239,8 @@ tsiDescription="classic TSI at "+machine+":"+port+", XNJS listens on port "+replyport; } catch(Exception ex){ - //die, crappy config - ex.printStackTrace(); - throw new Error("Config is messed up, cannot setup TSI Connection factory."); + log.error("Cannot setup TSI Connection Factory", ex); + throw new Error("Config is messed up, cannot setup TSI Connection factory.",ex); } } Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -59,6 +59,7 @@ import de.fzj.unicore.xnjs.ems.event.EventHandler; import de.fzj.unicore.xnjs.jsdl.IGrounder; import de.fzj.unicore.xnjs.jsdl.JSDLUtils; +import de.fzj.unicore.xnjs.legacy.Execution.BSSInfo; import de.fzj.unicore.xnjs.tsi.IReservation; import de.fzj.unicore.xnjs.util.LogUtil; @@ -250,21 +251,16 @@ * </ul> * * @param statesMap - - * a map containing the states, with key=bssid, value=Status, one - * of RUNNING, QUEUED, COMPLETED, SUSPENDED, FROZEN - * @param idMap - - * a map for getting the Action UUID for a bssid, with key=bssid, - * value=UUID - * @param tsiReply - + * a map containing the states, with key=bssid, value=BSSInfo + * @param tsiReply - * the TSI reply to a "get status listing" command * @param handler - * an event handler for receiving change events (can be null) * * @throws Exception */ - public static void updateStatusListing(Map<String, String> statesMap, - Map<String, String> idMap, String tsiReply, EventHandler handler) - throws IOException { + public static void updateStatusListing(Map<String, BSSInfo> statesMap, + String tsiReply, EventHandler handler)throws IOException { /* * From TSI GetStatusListing.pm: First line is "QSTAT", followed by a * line per found job, first word is BSS job identifier and the second @@ -275,6 +271,7 @@ String line = br.readLine(); Set<String> bssIDs = new HashSet<String>(); bssIDs.addAll(statesMap.keySet()); + Set<String> haveStatusForIDs = new HashSet<String>(); if (line == null) throw new IOException("Empty reply from TSI"); line = line.trim(); @@ -286,27 +283,34 @@ break; String[] tok = inner.trim().split(" "); if (tok.length != 2) { - logger - .fatal("Wrong format of QSTAT! Please check the TSI module GetStatusListing.pm!"); + logger.fatal("Wrong format of QSTAT! Please check the TSI module GetStatusListing.pm!"); break; } else { // store in map String bssID = tok[0].trim(); + if(haveStatusForIDs.contains(bssID)){ + //duplicate entry in QSTAT + continue; + } String newValue = tok[1].trim(); - String oldValue = statesMap.get(bssID); - statesMap.put(bssID, newValue); + BSSInfo oldInfo=statesMap.get(bssID); + if(oldInfo==null){ + continue; + } + String oldValue = oldInfo.bssState; + String jobID = oldInfo.jobID; + statesMap.put(bssID,new BSSInfo(bssID, jobID,newValue)); + haveStatusForIDs.add(bssID); if (!newValue.equals(oldValue)) { if (handler != null) { try { - String uuid = idMap.get(bssID); - if (uuid != null) { - if (logger.isDebugEnabled()) { - logger.debug(oldValue + " -> " - + newValue - + ", sending 'continue' for : " - + idMap.get(bssID)); - } - handler.handleEvent(new ContinueProcessingEvent(uuid)); + if (logger.isDebugEnabled()) { + logger.debug(oldValue + " -> " + + newValue + + ", sending 'continue' for : " + + jobID); } + handler.handleEvent(new ContinueProcessingEvent(jobID)); + } catch (Exception ee) { LogUtil.logException("Error sending change event",ee,logger); } @@ -317,21 +321,20 @@ } // now check for entries that do not have a new state -> remove old state for (String s : bssIDs) { - statesMap.remove(s); if (handler != null) { try { - String uuid = idMap.get(s); + String uuid = statesMap.get(s).jobID; if (uuid != null) { if (logger.isDebugEnabled()) { - logger.debug("Sending 'continue' for : " + uuid); + logger.debug("Entry disappeared from QSTAT listing, sending 'continue' for : " + uuid); } handler.handleEvent(new ContinueProcessingEvent(uuid)); } } catch (Exception ee) { - logger.warn("Error sending change event: " - + ee.getMessage()); + LogUtil.logException("Error sending change event",ee,logger); } } + statesMap.remove(s); } } else { // no valid QSTAT listing @@ -339,7 +342,6 @@ "No valid QSTAT listing received. TSI replied: " + line); } } - /** * read a line from a TSI LS result, skipping irrelevant lines * @@ -611,14 +613,14 @@ // total number of processors) if (nodes == -1 || nodes * processors == 1) { commands.append("#TSI_NODES NONE\n"); + commands.append("#TSI_TOTAL_PROCESSORS " + total + "\n"); commands.append("#TSI_PROCESSORS " + processors + "\n"); } else { commands.append("#TSI_NODES " + nodes + "\n"); commands.append("#TSI_PROCESSORS_PER_NODE " + processors + "\n"); commands.append("#TSI_TOTAL_PROCESSORS " + total + "\n"); - commands.append("#for backwards compatibility"); - commands.append("#TSI_PROCESSORS " + total + "\n"); + commands.append("#TSI_PROCESSORS " + processors + "\n"); } commands.append("#TSI_HOST_NAME " + hostname + "\n"); commands.append("#TSI_QUEUE " + queue + "\n"); Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/persistence/AbstractActionStore.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/persistence/AbstractActionStore.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/persistence/AbstractActionStore.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -66,13 +66,11 @@ protected String id; - protected final Map<String,Long> accessTimes=new ConcurrentHashMap<String,Long>(); - - protected final Queue<String> queue=new ConcurrentLinkedQueue<String>(){ + protected final Queue<QueueEntry> queue=new ConcurrentLinkedQueue<QueueEntry>(){ private static final long serialVersionUID=1l; @Override - public boolean add(String o) { + public boolean add(QueueEntry o) { if(contains(o))return true; return super.add(o); } @@ -120,7 +118,10 @@ //fill initial queue try{ - queue.addAll(getUniqueIDs()); + for(String i: getUniqueIDs()){ + queue.add(new QueueEntry(i)); + } + }catch(PersistenceException pe){ logger.error("Can't read IDs from persistence.", pe); } @@ -157,16 +158,16 @@ */ public synchronized Action getNextActionForProcessing()throws TimeoutException,PersistenceException { long start=System.currentTimeMillis(); - String key=queue.peek(); - if(key==null)return null; + QueueEntry q=queue.peek(); + if(q==null)return null; + String key=q.actionID; Long now=System.currentTimeMillis(); - Long last=accessTimes.get(key); - if(last!=null && last>now-updateInterval){ + long last=q.lastAccessed; + if(last>now-updateInterval){ return null; } Action act=getForUpdate(key); - queue.remove(key); - accessTimes.put(key, now); + queue.remove(q); long duration=System.currentTimeMillis()-start; meanRPT=(meanRPT*n_req + duration)/(n_req+1); n_req++; @@ -215,11 +216,8 @@ try{ doStore(value); states.put(key,value.getStatus()); - if(!process){ - accessTimes.remove(key); - } if(process){ - queue.add(key); + queue.add(new QueueEntry(key)); } }catch(Exception e){ throw new StorageException(e); @@ -233,7 +231,6 @@ public void remove(String key){ try{ - accessTimes.remove(key); states.remove(key); doRemove(key); }catch(Exception e){ @@ -309,5 +306,21 @@ return 0; } + + public static class QueueEntry { + final String actionID; + + final long lastAccessed; + + public QueueEntry(String actionID, long lastAccessed){ + this.actionID=actionID; + this.lastAccessed=lastAccessed; + } + + public QueueEntry(String actionID){ + this(actionID,System.currentTimeMillis()); + } + + } } Modified: xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/functional/PerfFileIOLegacyTSI.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/functional/PerfFileIOLegacyTSI.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/functional/PerfFileIOLegacyTSI.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -41,7 +41,6 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; -import de.fzj.unicore.xnjs.ems.ExecutionContext; import de.fzj.unicore.xnjs.legacy.LegacyTSI; import de.fzj.unicore.xnjs.tsi.TSI; @@ -94,58 +93,6 @@ f.delete(); } - public void xtestReadDirect()throws Exception{ - String tmpdir=System.getProperty("java.io.tmpdir"); - LegacyTSI tsi=(LegacyTSI)xnjs.getConfig().getTargetSystemInterface(null); - tsi.begin(); - String tmp="test_"+System.currentTimeMillis()+""; - tsi.mkdir(tmpdir+File.separator+tmp); - tsi.commit(); - String theLine="test123"; - //write some junk to the file - File f=new File(tmpdir+"/"+tmp+"/out"); - FileWriter fw=new FileWriter(f); - for(int i=0;i<size;i++){ - fw.write(theLine+"\n"); - } - fw.close(); - System.out.println("length="+f.length()); - long start=System.currentTimeMillis(); - InputStream is=tsi.getDirectInputStream(tmpdir+"/"+tmp+"/out"); - assertNotNull(is); - BufferedReader br=new BufferedReader(new InputStreamReader(is)); - int c=0; - String line; - while(true){ - line=br.readLine(); - if(line==null)break; - assertEquals(theLine,line); - c++; - } - long duration=System.currentTimeMillis()-start; - System.out.println("new style: "+duration+" ms., "+f.length()/duration+" kb/sec."); - br.close(); - is.close(); - f.delete(); - checkAfterReadDirect(); - } - - private void checkAfterReadDirect()throws Exception{ - String tmpdir=System.getProperty("java.io.tmpdir"); - LegacyTSI tsi=(LegacyTSI)xnjs.getConfig().getTargetSystemInterface(null); - tsi.begin(); - String tmp="test_"+System.currentTimeMillis()+""; - tsi.mkdir(tmpdir+File.separator+tmp); - tsi.exec("echo tsi > out",new ExecutionContext()); - tsi.commit(); - InputStream is=tsi.getInputStream(tmpdir+"/"+tmp+"/out"); - assertNotNull(is); - BufferedReader br=new BufferedReader(new InputStreamReader(is)); - assertTrue(br.readLine().contains("tsi")); - br.close(); - is.close(); - } - public void testWrite()throws Exception{ long total=0; int i=0; Modified: xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/legacy/TestLegacyTSI.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/legacy/TestLegacyTSI.java 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/test/java/de/fzj/unicore/xnjs/legacy/TestLegacyTSI.java 2009-06-26 08:00:19 UTC (rev 4813) @@ -8,6 +8,7 @@ import de.fzj.unicore.xnjs.ems.event.EventHandler; import de.fzj.unicore.xnjs.ems.event.XnjsEvent; import de.fzj.unicore.xnjs.io.XnjsFile; +import de.fzj.unicore.xnjs.legacy.Execution.BSSInfo; public class TestLegacyTSI extends TestCase implements EventHandler{ @@ -35,10 +36,10 @@ public void testParseStatusListing(){ try{ String s1="QSTAT \n 2795100 RUNNING\n 2795100 RUNNING \n"; - Map<String,String>st=new HashMap<String, String>(); - Map<String,String>jobs=new HashMap<String, String>(); - TSIUtils.updateStatusListing(st,jobs,s1,null); - assertEquals("RUNNING",st.get("2795100")); + Map<String,BSSInfo>st=new HashMap<String, BSSInfo>(); + st.put("2795100", new BSSInfo("2795100","j1","UNKNOWN")); + TSIUtils.updateStatusListing(st,s1,null); + assertEquals("RUNNING",st.get("2795100").bssState); }catch(Exception e){ e.printStackTrace(); fail(); @@ -50,15 +51,15 @@ eventsReceived=0; long entries=100000; StringBuilder s1=new StringBuilder("QSTAT \n"); - Map<String,String>jobs=new HashMap<String, String>(); + Map<String,BSSInfo>st=new HashMap<String, BSSInfo>(); + for(int i=1;i<=entries;i++){ s1.append(i+" RUNNING\n"); - jobs.put(String.valueOf(i), "1234"); + st.put(String.valueOf(i), new BSSInfo(String.valueOf(i),"j"+i,"1234")); } - Map<String,String>st=new HashMap<String, String>(); long start=System.currentTimeMillis(); - TSIUtils.updateStatusListing(st,jobs,s1.toString(),this); + TSIUtils.updateStatusListing(st,s1.toString(),this); long end=System.currentTimeMillis(); System.out.println("Parsing qstat "+entries+" entries took "+(end-start)+ " msec."); assertEquals(entries,eventsReceived); @@ -68,63 +69,8 @@ } } - public void testEventGeneration(){ - try{ - eventsReceived=0; - long entries=100000; - StringBuilder s1=new StringBuilder("QSTAT \n"); - Map<String,String>jobs=new HashMap<String, String>(); - for(int i=1;i<=entries;i++){ - s1.append(i+" RUNNING\n"); - jobs.put(String.valueOf(i), "1234"); - } - Map<String,String>st=new HashMap<String, String>(); - for(int i=1;i<=entries;i++){ - st.put(Integer.toString(i),"RUNNING"); - } - long start=System.currentTimeMillis(); - TSIUtils.updateStatusListing(st,jobs,s1.toString(),this); - long end=System.currentTimeMillis(); - System.out.println("Parsing qstat "+entries+" entries took "+(end-start)+ " msec."); - assertEquals(0,eventsReceived); - - //parse second listing where half the entries have disappeared - s1=new StringBuilder("QSTAT \n"); - - for(int i=1;i<=entries/2;i++){ - s1.append(i+" RUNNING\n"); - } - int oldSize=st.size(); - System.out.println("Old Status map contains "+oldSize+" entries."); - start=System.currentTimeMillis(); - TSIUtils.updateStatusListing(st,jobs,s1.toString(),this); - end=System.currentTimeMillis(); - int newSize=st.size(); - System.out.println("Updated Status map contains "+newSize+" entries."); - assertEquals(entries/2, newSize); - System.out.println("Parsing qstat "+entries/2+" entries took "+(end-start)+ " msec."); - System.out.println("Received "+eventsReceived+ " change events."); - assertEquals(entries/2,eventsReceived); - - - //check that event is generated when new entry appears - eventsReceived=0; - for(int i=0;i<10;i++){ - s1.append(entries+i+" RUNNING\n"); - - } - TSIUtils.updateStatusListing(st,jobs,s1.toString(),this); - - System.out.println("Received "+eventsReceived+ " change events."); - assertEquals(1,eventsReceived); - - }catch(Exception e){ - e.printStackTrace(); - fail(); - } - } + - private int eventsReceived=0; public void handleEvent(XnjsEvent xe){ Modified: xnjs/trunk/xnjs-module-core/src/test/resources/ems/example4.jsdl =================================================================== --- xnjs/trunk/xnjs-module-core/src/test/resources/ems/example4.jsdl 2009-06-25 13:50:58 UTC (rev 4812) +++ xnjs/trunk/xnjs-module-core/src/test/resources/ems/example4.jsdl 2009-06-26 08:00:19 UTC (rev 4813) @@ -4,7 +4,7 @@ <jsdl:Application> <jsdl-posix:POSIXApplication> <jsdl-posix:Executable>/bin/sleep</jsdl-posix:Executable> - <jsdl-posix:Argument>2</jsdl-posix:Argument> + <jsdl-posix:Argument>20</jsdl-posix:Argument> </jsdl-posix:POSIXApplication> </jsdl:Application> </jsdl:JobDescription> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |