From: <tho...@us...> - 2014-04-07 12:40:21
|
Revision: 8071 http://sourceforge.net/p/bigdata/code/8071 Author: thompsonbry Date: 2014-04-07 12:40:17 +0000 (Mon, 07 Apr 2014) Log Message: ----------- Bug fix for the windows typeperf collector to change how interrupt handling is performed. It now looks for the innercause in the run() method and is quite if the inner cause has a root cause of an interrupt. See #871 (interrupt handling code review). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java 2014-04-07 12:31:32 UTC (rev 8070) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java 2014-04-07 12:40:17 UTC (rev 8071) @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; +import java.nio.channels.ClosedByInterruptException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; @@ -39,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; import org.apache.log4j.Logger; @@ -52,6 +54,7 @@ import com.bigdata.counters.IRequiredHostCounters; import com.bigdata.util.CSVReader; import com.bigdata.util.CSVReader.Header; +import com.bigdata.util.InnerCause; /** * Collects per-host performance counters on a Windows platform using @@ -68,19 +71,19 @@ */ public class TypeperfCollector extends AbstractProcessCollector { - static protected final Logger log = Logger.getLogger(TypeperfCollector.class); + static private final Logger log = Logger.getLogger(TypeperfCollector.class); - /** - * True iff the {@link #log} level is INFO or less. - */ - final protected static boolean INFO = log.isInfoEnabled(); +// /** +// * True iff the {@link #log} level is INFO or less. +// */ +// final protected static boolean INFO = log.isInfoEnabled(); +// +// /** +// * True iff the {@link #log} level is DEBUG or less. +// */ +// final protected static boolean DEBUG = log.isDebugEnabled(); /** - * True iff the {@link #log} level is DEBUG or less. - */ - final protected static boolean DEBUG = log.isDebugEnabled(); - - /** * Updated each time a new row of data is read from the process and reported * as the last modified time for counters based on that process and * defaulted to the time that we begin to collect performance data. @@ -175,6 +178,7 @@ } + @Override public Double getValue() { final Double value = (Double) vals.get(path); @@ -189,6 +193,7 @@ } + @Override public long lastModified() { return lastModified; @@ -199,7 +204,8 @@ * @throws UnsupportedOperationException * always. */ - public void setValue(Double value, long timestamp) { + @Override + public void setValue(final Double value, final long timestamp) { throw new UnsupportedOperationException(); @@ -225,6 +231,7 @@ * * @throws IOException */ + @Override public List<String> getCommand() { // make sure that our counters have been declared. @@ -243,7 +250,7 @@ // counter names need to be double quoted for the command line. command.add("\"" + decl.getCounterNameForWindows() + "\""); - if(INFO) log.info("Will collect: \"" + if(log.isInfoEnabled()) log.info("Will collect: \"" + decl.getCounterNameForWindows() + "\" as " + decl.getPath()); @@ -255,6 +262,7 @@ } + @Override public AbstractProcessReader getProcessReader() { return new ProcessReader(); @@ -290,9 +298,10 @@ } + @Override public void run() { - if(INFO) + if(log.isInfoEnabled()) log.info(""); try { @@ -300,27 +309,34 @@ // run read(); - } catch (InterruptedException e) { + } catch (Exception e) { - // Note: This is a normal exit. - if(INFO) - log.info("Interrupted - will terminate"); + if (InnerCause.isInnerCause(e, InterruptedException.class)|| + InnerCause.isInnerCause(e, ClosedByInterruptException.class)|| + InnerCause.isInnerCause(e, CancellationException.class) + ) { - } catch (Exception e) { + // Note: This is a normal exit. + if (log.isInfoEnabled()) + log.info("Interrupted - will terminate"); - // Unexpected error. - log.fatal(e.getMessage(), e); + } else { + // Unexpected error. + log.fatal(e.getMessage(), e); + + } + } - if(INFO) + if(log.isInfoEnabled()) log.info("Terminated"); } private void read() throws Exception { - if(INFO) + if(log.isInfoEnabled()) log.info(""); long nsamples = 0; @@ -345,33 +361,34 @@ */ csvReader.setTailDelayMillis(100/* ms */); - try { +// try { - // read headers from the file. - csvReader.readHeaders(); + // read headers from the file. + csvReader.readHeaders(); - } catch (IOException ex) { +// } catch (IOException ex) { +// +// /* +// * Note: An IOException thrown out here often indicates an +// * asynchronous close of of the reader. A common and benign +// * cause of that is closing the input stream because the service +// * is shutting down. +// */ +// +// if (!Thread.interrupted()) +// throw ex; +// +// throw new InterruptedException(); +// +// } - /* - * Note: An IOException thrown out here often indicates an - * asynchronous close of of the reader. A common and benign - * cause of that is closing the input stream because the service - * is shutting down. - */ - - if (!Thread.currentThread().isInterrupted()) - throw ex; - - throw new InterruptedException(); - - } - /* * replace the first header definition so that we get clean * timestamps. */ csvReader.setHeader(0, new Header("Timestamp") { - public Object parseValue(String text) { + @Override + public Object parseValue(final String text) { try { return f.parse(text); @@ -390,7 +407,7 @@ */ { - if(INFO) + if(log.isInfoEnabled()) log.info("setting up headers."); int i = 1; @@ -400,7 +417,7 @@ final String path = decl.getPath(); // String path = hostPathPrefix + decl.getPath(); - if (INFO) + if (log.isInfoEnabled()) log.info("setHeader[i=" + i + "]=" + path); csvReader.setHeader(i++, new Header(path)); @@ -409,13 +426,20 @@ } - if(INFO) + if(log.isInfoEnabled()) log.info("starting row reads"); - final Thread t = Thread.currentThread(); +// final Thread t = Thread.currentThread(); - while (!t.isInterrupted() && csvReader.hasNext()) { + while (true) { + if (Thread.interrupted()) + throw new InterruptedException(); + + if (!csvReader.hasNext()) { + break; + } + try { final Map<String, Object> row = csvReader.next(); @@ -455,7 +479,7 @@ } - if(INFO) + if(log.isInfoEnabled()) log.info("done."); } @@ -466,6 +490,7 @@ * Declares the performance counters to be collected from the Windows * platform. */ + @Override public CounterSet getCounters() { // if (root == null) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |