From: <st...@us...> - 2009-02-06 20:29:45
|
Revision: 7347 http://smartfrog.svn.sourceforge.net/smartfrog/?rev=7347&view=rev Author: steve_l Date: 2009-02-06 20:29:37 +0000 (Fri, 06 Feb 2009) Log Message: ----------- SFOS-788 write component to submit jobs to a Hadoop cluster SFOS-905 Move from direct inherited configuration to a "Cluster" CD that defines the cluster -a HadoopConfiguration can take and extend a Cluster reference; good for defining job options Modified Paths: -------------- trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/components/submitter/SubmitterImpl.java trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/HadoopConfigurationImpl.java trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/ManagedConfiguration.java trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/conf/components.sf trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/components.sf trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/namenode/components.sf trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/tracker/components.sf Modified: trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/components/submitter/SubmitterImpl.java =================================================================== --- trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/components/submitter/SubmitterImpl.java 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/components/submitter/SubmitterImpl.java 2009-02-06 20:29:37 UTC (rev 7347) @@ -85,7 +85,7 @@ results = sfResolve(ATTR_RESULTS, results, false); //create the job configuration. The cluster reference is optional - jobConf = ManagedConfiguration.createConfiguration(this, false, true, true); + jobConf = ManagedConfiguration.createConfiguration(this, true, false, true); //look for the file boolean fileRequired = sfResolve(ATTR_FILE_REQUIRED, true, true); Modified: trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/HadoopConfigurationImpl.java =================================================================== --- trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/HadoopConfigurationImpl.java 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/HadoopConfigurationImpl.java 2009-02-06 20:29:37 UTC (rev 7347) @@ -24,7 +24,9 @@ import org.smartfrog.services.filesystem.FileSystem; import org.smartfrog.services.hadoop.core.SFHadoopRuntimeException; import org.smartfrog.sfcore.common.SmartFrogException; +import org.smartfrog.sfcore.common.SmartFrogResolutionException; import org.smartfrog.sfcore.prim.PrimImpl; +import org.smartfrog.sfcore.prim.Prim; import org.smartfrog.sfcore.reference.Reference; import org.smartfrog.sfcore.utils.ListUtils; @@ -38,11 +40,12 @@ * ensures the values are there for other components */ -public class HadoopConfigurationImpl extends PrimImpl implements HadoopConfiguration { +public class HadoopConfigurationImpl extends PrimImpl implements HadoopConfiguration, ClusterBound { private ManagedConfiguration managedConf; private boolean readEarly; private static final Reference refRequired = new Reference(ATTR_REQUIRED); + public static final String ERROR_NO_CLUSTER_AND_XML = "Cannot extend an existing cluster and import XML resources or files"; public HadoopConfigurationImpl() throws RemoteException { } @@ -80,24 +83,39 @@ boolean loadDefaults = sfResolve(ATTR_LOAD_DEFAULTS, true, true); List<String> resources = ListUtils.resolveStringList(this, new Reference(ATTR_RESOURCES), true); Vector<String> files = FileSystem.resolveFileList(this, new Reference(ATTR_FILES), null, true, null); - Configuration baseConf = new Configuration(loadDefaults); - //run through all the resources - for (String resource : resources) { - loadXmlResource(baseConf, resource); - } + Prim cluster = sfResolve(ATTR_CLUSTER,(Prim)null,false); + if(cluster!=null) { + //inheriting a cluster + if(!resources.isEmpty() || !files.isEmpty()) { + throw new SmartFrogResolutionException(ERROR_NO_CLUSTER_AND_XML); + } + managedConf = ManagedConfiguration.createConfiguration(this,true,false,loadDefaults); + managedConf.copyPropertiesToPrim(this); - //run through the filenames - for (String filename : files) { - loadXmlFile(baseConf, filename); - } + } else { + //no cluster reference, so create an empty unmanaged configuration and build it up + //then copy its (name,value) pairs into a new ManagedConfiguration that is + //bound to us + Configuration baseConf = new Configuration(loadDefaults); - //this now creates a baseConf which is full of all our values. - //the next step is to override with any in-scope attributes. + //run through all the resources + for (String resource : resources) { + loadXmlResource(baseConf, resource); + } - managedConf = new ManagedConfiguration(this); - managedConf.copyProperties(this, baseConf); + //run through the filenames + for (String filename : files) { + loadXmlFile(baseConf, filename); + } + //this now creates a baseConf which is full of all our values. + //the next step is to override with any in-scope attributes. + + managedConf = new ManagedConfiguration(this); + managedConf.copyProperties(this, baseConf); + } + //dump it to the log at debug level or if the dump attribute is true, in which //case it comes out at INFO level boolean toDump = sfResolve(ATTR_DUMP, true, true); Modified: trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/ManagedConfiguration.java =================================================================== --- trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/ManagedConfiguration.java 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/src/org/smartfrog/services/hadoop/conf/ManagedConfiguration.java 2009-02-06 20:29:37 UTC (rev 7347) @@ -55,7 +55,7 @@ * the default values. This makes the reload process more complex, as it re-evaluates it from a component */ public final class ManagedConfiguration extends JobConf implements PrimSource, - ConfigurationAttributes { + ConfigurationAttributes, Cloneable{ private Prim source; @@ -477,7 +477,7 @@ public String dump() { StringBuilder builder = new StringBuilder(); Properties p = getProps(); - TreeSet<String> ts=new TreeSet(p.keySet()); + TreeSet<String> ts=(TreeSet<String>) new TreeSet(p.keySet()); for (String key : ts) { builder.append(key); builder.append(" \""); @@ -527,16 +527,45 @@ String value = entry.getValue(); setIfUnset(key, value); value = get(key); - try { - target.sfResolveHere(key); - } catch (SmartFrogResolutionException ignored) { - target.sfReplaceAttribute(key, value); - } + addSFAttributeIfNeeded(target, key, value); } reloadConfiguration(); } /** + * Copy all new properties to the target prim as attributes + * @param target target + * @throws SmartFrogRuntimeException failure to read/write attributes + * @throws RemoteException network trouble + */ + public void copyPropertiesToPrim(Prim target) throws SmartFrogRuntimeException, RemoteException { + for (Map.Entry<String, String> entry : this) { + String key = entry.getKey(); + String value = entry.getValue(); + addSFAttributeIfNeeded(target, key, value); + } + } + + + /** + * Add a SmartFrog attribute if it is needed + * @param target target component + * @param key key to set + * @param value value to set + * @throws SmartFrogRuntimeException failure to set the attribute + * @throws RemoteException network problems + */ + private void addSFAttributeIfNeeded(Prim target, String key, String value) + throws RemoteException, SmartFrogRuntimeException { + try { + target.sfResolveHere(key); + } catch (SmartFrogResolutionException ignored) { + target.sfAddAttribute(key, value); + } + } + + + /** * Creates and returns a copy of this object. A configuration reload is triggered, so that datastructures are not * accidentally shared across instances. * @@ -572,7 +601,7 @@ if (size > 0) { StringBuilder text = new StringBuilder(MISSING_ATTRIBUTE + (size > 1 ? "s" : "") + ":"); for (String attr : missing) { - text.append("\""); + text.append('"'); text.append(attr); text.append("\" "); } @@ -598,7 +627,7 @@ * This creates a configuration from a source prim * * @param source source prim - * @param useClusterReferenceFirst if set, resolve {@link ClusterBound#ATTR_CLUSTER} from the source and use that + * @param useClusterReference if set, resolve {@link ClusterBound#ATTR_CLUSTER} from the source and use that * first. * @param clusterRequired if set, the cluster attribute must resolve. * @param loadDefaults flag to say "load the default values" @@ -607,11 +636,11 @@ * @throws RemoteException network problems. These are always passed up */ public static ManagedConfiguration createConfiguration(Prim source, - boolean useClusterReferenceFirst, + boolean useClusterReference, boolean clusterRequired, boolean loadDefaults) throws SmartFrogException, RemoteException { ManagedConfiguration conf = new ManagedConfiguration(loadDefaults, source); - if (useClusterReferenceFirst) { + if (useClusterReference) { //pull in the cluster if requested Prim clusterSource = source.sfResolve(REF_CLUSTER, (Prim) null, clusterRequired); if (clusterSource != null) { Modified: trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/conf/components.sf =================================================================== --- trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/conf/components.sf 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/conf/components.sf 2009-02-06 20:29:37 UTC (rev 7347) @@ -25,10 +25,12 @@ MISSING_ATTRIBUTE CONSTANT "org.smartfrog.services.hadoop.conf.ManagedConfiguration.MISSING_ATTRIBUTE"; HDFS "org.apache.hadoop.hdfs.DistributedFileSystem"; +CONF_DUMP false; + testDefaultConf extends TestCompound { action extends DefaultHadoopConfiguration { - conf.dump true; + conf.dump CONF_DUMP ; } tests extends Sequence { @@ -86,22 +88,11 @@ } -testEmptyConf extends TestCompound { - action extends EmptyHadoopConfiguration { - conf.dump true; - } +testEmptyConf extends ExpectDeploy { - tests extends Sequence { - - } -} - - -testEmptyConf extends TestCompound { - action extends EmptyHadoopConfiguration { - conf.dump true; + conf.dump CONF_DUMP ; } tests extends Sequence { Modified: trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/components.sf =================================================================== --- trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/components.sf 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/components.sf 2009-02-06 20:29:37 UTC (rev 7347) @@ -360,8 +360,8 @@ job extends TestJob { mapred.mapper.new-api true; - cluster LAZY PARENT:PARENT:cluster; - jobTracker LAZY PARENT:PARENT:jobTracker; + cluster LAZY PARENT:cluster; + jobTracker LAZY PARENT:jobTracker; results LAZY PARENT; name "testsubmission"; mapred.input.dir testDirIn; Modified: trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/namenode/components.sf =================================================================== --- trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/namenode/components.sf 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/namenode/components.sf 2009-02-06 20:29:37 UTC (rev 7347) @@ -94,8 +94,11 @@ } tests extends Sequence { + wait extends WaitForWorkersLive { + service LAZY ATTRIB action:namenode; + minCount 0; + } - } Modified: trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/tracker/components.sf =================================================================== --- trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/tracker/components.sf 2009-02-06 20:15:10 UTC (rev 7346) +++ trunk/core/components/hadoop/test/org/smartfrog/services/hadoop/test/system/local/tracker/components.sf 2009-02-06 20:29:37 UTC (rev 7347) @@ -52,8 +52,8 @@ tests extends MapReduceTestSequence { s extends Sleep; - jobTracker LAZY action:jobTracker; - taskTracker LAZY action:taskTracker; + jobTracker LAZY PARENT:action:jobTracker; + taskTracker LAZY PARENT:action:taskTracker; cluster LAZY PARENT:action:cluster; datanode LAZY PARENT:action:datanode; namenode LAZY PARENT:action:namenode; @@ -70,7 +70,7 @@ /** * Start an orphan task tracker. We expect it to be dead afterwards */ -testOrphanTracker extends ExpectDeploy { +testOrphanTracker extends ExpectDeployFailure { description "bring up an orphan Task Tracker, see what happens."; @@ -86,6 +86,7 @@ } } + exceptions [["java.net.ConnectException","Connection refused"]]; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |