Menu

CallStage examples

Help
maxdaros
2005-01-07
2013-04-16
  • maxdaros

    maxdaros - 2005-01-07

    Hi all,

    are there CallStage examples.

    I need to call different pipelines from a main pipeline with a router stage.

    Thans a lot in advance.

    Massimo

     
    • vanstalle

      vanstalle - 2005-01-07

      Hi,

      here is an example; i have a pipeline name "sharedExtract" which has all my database access stages; these stages are called by other pipelines.

      Jan

      <?xml version="1.0" encoding="iso-8859-1"?>
      <pipeline>
          <documentation>pipeline with stages called by other pipelines</documentation>
          <pipeline-name>sharedExtract</pipeline-name>
          <dynamic>
              <entry-stage>entry</entry-stage>
              <!-- STAGES: Defines the stages -->
              <stage-inst>
                  <stage-name>entry</stage-name>
                  <stage-desc>This does nothing</stage-desc>
                  <stage-type>Null</stage-type>
              </stage-inst>

              <stage-inst>
                  <stage-name>person</stage-name>
                  <stage-desc>les attributs bruts de la personne</stage-desc>
                  <stage-type>SqlEnrich</stage-type>
                  <option>
                      <option-name>resourceName</option-name>
                      <option-value>AdminformsDatasource</option-value>
                  </option>
                  <option>
                      <option-name>sqlScript</option-name>
                      <option-value/>
                      <sub-option>
                          <option-name>person</option-name>
                          <option-value>
                              SELECT whatever...
                          </option-value>
                      </sub-option>
                  </option>
              </stage-inst>

              <connection>
                  <source>entry</source>
                  <sink>person</sink>
              </connection>
             
              <connection>
                  <source>person</source>
                  <sink>null</sink>
              </connection>
      </pipeline>

      ---------- calling pipeline stage ----------

              <stage-inst>
                  <stage-name>getPerson</stage-name>
                  <stage-desc>this call the shared pipeline stage</stage-desc>
                  <stage-type>CallStage</stage-type>
                  <option>
                      <option-name>discardResults</option-name>
                      <option-value>false</option-value>
                  </option>
                  <option>
                      <option-name>callStage</option-name>
                      <option-value>sharedExtract.person</option-value>
                  </option>
              </stage-inst>

       
    • maxdaros

      maxdaros - 2005-01-07

      Hi Jan,

      I'm trying to understand how to make the call to the target pipeline stage.

      Here is my config file of Dbloader pipeline....

      entryStage                       = decompress

      decompress.stageType             = Decompress
      decompress.compressType          = zip
      decompress.nextStage             = addAttrib
      decompress.errorStage            = errorHandler
      decompress.failOnError           = false
      decompress.tracked               = true

      addAttrib.stageType              = Enrich
      addAttrib.enrichScript.unzipped  = ${document.name.substring(0,7)}.txt
      addAttrib.nextStage              = fWriter0
      addAttrib.errorStage             = errorHandler
      addAttrib.failOnError            = false
      addAttrib.tracked                = true

      fWriter0.stageType               = FileWriter
      fWriter0.nextStage               = router
      fWriter0.outputFile              = ../../Unzipped/${document.get("unzipped")}
      fWriter0.errorStage              = errorHandler
      fWriter0.failOnError             = false
      fWriter0.tracked                 = false

      # continua il dbload solo per gli MTL* (per ATI* e OPC* non  ancora stata definita la pipeline)
      # In futuro definire 3 diversi tronconi di pipeline con 3 if differenti e con else (il nextSTage senza test) null
      router.stageType                 = Router
      router.nextStage.convert         = #if ("MTL" == (${document.name.substring(0,3)}))true#end
      router.nextStage.callOpcPipeline = #if ("OPC" == (${document.name.substring(0,3)}))true#end
      router.nextStage                 = null
      router.errorStage                = errorHandler
      router.failOnError               = false
      router.tracked                   = false

      callOpcPipeline.stageType        = CallStage
      callOpcPipeline.test             = true
      callOpcPipeline.discardResults   = false
      callOpcPipeline.callStage        = OpcDbloader.convert
      callOpcPipeline.nextStage        = null
      callOpcPipeline.errorStage       = errorHandler
      callOpcPipeline.failOnError      = false
      callOpcPipeline.tracked          = false

      convert.stageType                = FlatToXml
      convert.nextStage                = fWriter
      convert.flatToXmlFile            = ../../Pipelines/Dbloader/conversion/file-convert.xml
      convert.errorStage               = errorHandler
      convert.failOnError              = false
      convert.tracked                  = false

      fWriter.stageType                = FileWriter
      fWriter.nextStage                = transform
      fWriter.outputFile               = ../../Pipelines/Dbloader/outputs/XmlOutput.xml
      fWriter.errorStage               = errorHandler
      fWriter.failOnError              = false
      fWriter.tracked                  = false

      transform.stageType              = XslTransform
      transform.nextStage              = fWriter2
      transform.transformationFile     = ../../Pipelines/Dbloader/conversion/transform.xsl
      transform.errorStage             = errorHandler
      transform.failOnError            = false
      transform.tracked                = false

      fWriter2.stageType               = FileWriter
      fWriter2.nextStage               = transform2
      fWriter2.outputFile              = ../../Pipelines/Dbloader/outputs/XmlOutput2.xml
      fWriter2.errorStage              = errorHandler
      fWriter2.failOnError             = false
      fWriter2.tracked                 = false

      transform2.stageType             = XslTransform
      transform2.nextStage             = fWriter3
      transform2.transformationFile    = ../../Pipelines/Dbloader/conversion/transform2.xsl
      transform2.errorStage            = errorHandler
      transform2.failOnError           = false
      transform2.tracked               = false

      fWriter3.stageType               = FileWriter
      fWriter3.nextStage               = dbwriter
      fWriter3.outputFile              = ../../Pipelines/Dbloader/outputs/SqlInserts.sql
      fWriter3.errorStage              = errorHandler
      fWriter3.failOnError             = false
      fWriter3.tracked                 = false

      dbwriter.stageType               = SqlWriter
      dbwriter.nextStage               = null
      dbwriter.messageTag              = messageTagErrorSql
      dbwriter.resourceName            = jdbcConnection
      dbwriter.sql                     = $document.contents
      dbwriter.failOnFirst             = false
      dbwriter.useBatch                = true
      dbwriter.errorStage              = errorHandler
      dbwriter.failOnError             = false
      dbwriter.tracked                 = true

      errorHandler.stageType           = SmtpWriter
      errorHandler.smtpHost            = smtp.libero.it
      errorHandler.smtpFrom            = DRGdbloader@simbologica.it
      errorHandler.smtpTo              = maxdaros@tiscali.it
      errorHandler.smtpSubject         = Document: Ticket: ${ticket.Value}
      errorHandler.nextStage           = null
      errorHandler.tracked             = true

      At the router stage I wish to call another pipeline ( this is OpcDbloader, stage convert) if the file name begins with OPC (this means another file raw format, then another set of tranformation files, then another set of insert sql in a different db table).

      When babeldoc ancounters the router stage, it can't find the OpcDbloader pipeline.
      Is there anything to do with BABELDOC_USER or CLASSPATH environment variables? Any suggestion?

      The following is the the log file of the error

      D:\DRG\Scanners\DRGScanner>babeldoc scanner -s ./config.properties
      <2004-11-07 15:35:53,620> INFO  [main] :  Initializing workers:
      <2004-11-07 15:35:53,811> INFO  [main] :  scanner (ftp) configured...
      <2004-11-07 15:35:53,811> INFO  [main] :  Starting workers...
      <2004-11-07 15:35:54,161> INFO  [main] :  scanner started...
      <2004-11-07 15:36:24,285> INFO  [scanner] :  Document OPC1217.zip enqueued for pipeline Dbloader
      <2004-11-07 15:36:24,435> INFO  [Thread-0] :  Processing document...
      <2004-11-07 15:36:24,885> INFO  [Thread-0] :  PipelineStage name: decompress
      <2004-11-07 15:36:24,966> INFO  [Thread-0] :  PipelineStage name: addAttrib
      <2004-11-07 15:36:25,026> INFO  [Thread-0] :  PipelineStage name: fWriter0
      <2004-11-07 15:36:25,066> INFO  [Thread-0] :  PipelineStage name: router
      <2004-11-07 15:36:25,096> INFO  [Thread-0] :  PipelineStage name: callOpcPipeline
      <2004-11-07 15:36:25,116> ERROR [Thread-0] :  [DefaultPipelineStageErrorHandler.handlePipelineStageError] PipelineStage
      name: callOpcPipeline Error: com.babeldoc.core.pipeline.PipelineException: PipelineStage: ../OpcDbloader/OpcDbloader.co
      vert not found
      <2004-11-07 15:36:25,116> INFO  [Thread-0] :  PipelineStage name: errorHandler
      Exception in thread "Thread-0" java.lang.NullPointerException
              at com.sun.mail.handlers.text_plain.writeTo(text_plain.java:99)
              at javax.activation.ObjectDataContentHandler.writeTo(DataHandler.java:849)
              at javax.activation.DataHandler.writeTo(DataHandler.java:305)
              at javax.mail.internet.MimeUtility.getEncoding(MimeUtility.java:181)
              at javax.mail.internet.MimeBodyPart.updateHeaders(MimeBodyPart.java:1055)
              at javax.mail.internet.MimeMessage.updateHeaders(MimeMessage.java:1841)
              at javax.mail.internet.MimeMessage.saveChanges(MimeMessage.java:1822)
              at javax.mail.Transport.send(Transport.java:80)
              at com.babeldoc.core.pipeline.stage.SmtpWriterPipelineStage.sendSmtpMessage(Unknown Source)
              at com.babeldoc.core.pipeline.stage.SmtpWriterPipelineStage.process(Unknown Source)
              at com.babeldoc.core.pipeline.PipelineStage.processStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResult(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResults(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResult(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResults(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResult(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResults(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResult(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResults(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResult(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStageResults(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.processPipelineStage(Unknown Source)
              at com.babeldoc.core.pipeline.processor.SyncPipelineStageProcessor.process(Unknown Source)
              at com.babeldoc.core.pipeline.PipelineStageFactory.process(Unknown Source)
              at com.babeldoc.core.pipeline.PipelineFactory.process(Unknown Source)
              at com.babeldoc.core.pipeline.PipelineFactoryFactory.process(Unknown Source)
              at com.babeldoc.core.pipeline.feeder.SynchronousFeeder.process(Unknown Source)
              at com.babeldoc.core.pipeline.feeder.AsynchronousFeeder.actuallyProcess(Unknown Source)
              at com.babeldoc.core.pipeline.feeder.AsynchronousFeeder$1.run(Unknown Source)
              at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(Unknown Source)
              at java.lang.Thread.run(Thread.java:595)

      Many thanks.

      Massimo

       
      • Sherman Wood

        Sherman Wood - 2005-01-07

        You cannot call a stage within a pipeline with CallStage.

        Try just:

        callOpcPipeline.callStage = OpcDbloader

        and make the convert stage the first step.

        Sherman

         
        • vanstalle

          vanstalle - 2005-01-07

          Sherman,

          That's exactly what I am doing (calling a stage within another pipeline) and it works perfectly; mayby there are some differences between a simple and xml pipeline definition ?

          I suppose that the OpcDbloader.convert is the stage "convert" in the pipeline "OpcDbloader" .

          Jan

           
          • Sherman Wood

            Sherman Wood - 2005-01-07

            Let me correct myself.

            I looked at the Babeldoc code and I can see that if you do CallStage with just a <pipeline name>, which runs the pipeline starting at the entry stage, or <pipeline name>.<stage name>, which will start running the <pipeline name> at the <stage name>, just as maxdros has done.

            Sherman

             
    • maxdaros

      maxdaros - 2005-01-07

      Hi all,

      I've performed some tests today, and I've understood this:

      I' can call different pipelines from another pipeline configuring a config.properties with multiple definitions of the pipelines that I wish to call.

      here an example of config.properties and Dbloader.properties, MtlDbLoader.properties and OpcDbloader.properties files..... files

      config.properties define 3 different pipelines
      1) Dbloader = decompress --> addAttrib --> router is the main pipeline that unzip the files received and call different pipelines based on the names of the files
      2) MtlDbloader and 3) OpcDbloader performs a set of file transformations and the goal is to load text files into a db tables

      # config.properties:
      Dbloader.type               = simple
      Dbloader.configFile         = pipeline/Dbloader
      Dbloader.processor.type     = threadpool
      Dbloader.processor.poolSize = 10

      MtlDbloader.type            = simple
      MtlDbloader.configFile      = pipeline/MtlDbloader

      OpcDbloader.type            = simple
      OpcDbloader.configFile      = pipeline/OpcDbloader

      .......
      # Dbloader.properties:
      entryStage                       = decompress

      decompress.stageType             = Decompress
      decompress.compressType          = zip
      decompress.nextStage             = addAttrib
      decompress.errorStage            = errorHandler
      decompress.failOnError           = false
      decompress.tracked               = false

      addAttrib.stageType              = Enrich
      addAttrib.enrichScript.unzipped  = ${document.name.substring(0,7)}.txt
      addAttrib.nextStage              = fWriter0
      addAttrib.errorStage             = errorHandler
      addAttrib.failOnError            = false
      addAttrib.tracked                = false

      fWriter0.stageType               = FileWriter
      fWriter0.nextStage               = router
      fWriter0.outputFile              = ../../Unzipped/${document.get("unzipped")}
      fWriter0.errorStage              = errorHandler
      fWriter0.failOnError             = false
      fWriter0.tracked                 = false

      # Inoltra l'elaborazione a tre differenti pipelines in base al nome ( e struttura ) del file
      router.stageType                 = Router
      router.nextStage.callMtlPipeline = #if ("MTL" == (${document.name.substring(0,3)}))true#end
      router.nextStage.callOpcPipeline = #if ("OPC" == (${document.name.substring(0,3)}))true#end
      router.nextStage                 = null
      router.errorStage                = errorHandler
      router.failOnError               = false
      router.tracked                   = false

      callMtlPipeline.stageType        = CallStage
      callMtlPipeline.test             = true
      callMtlPipeline.discardResults   = false
      callMtlPipeline.callStage        = MtlDbloader.convert
      callMtlPipeline.nextStage        = null
      callMtlPipeline.errorStage       = errorHandler
      callMtlPipeline.failOnError      = false
      callMtlPipeline.tracked          = false

      callOpcPipeline.stageType        = CallStage
      callOpcPipeline.test             = true
      callOpcPipeline.discardResults   = false
      callOpcPipeline.callStage        = OpcDbloader.convert
      callOpcPipeline.nextStage        = null
      callOpcPipeline.errorStage       = errorHandler
      callOpcPipeline.failOnError      = false
      callOpcPipeline.tracked          = false

      errorHandler.stageType           = SmtpWriter
      errorHandler.smtpHost            = smtp.libero.it
      errorHandler.smtpFrom            = DRGdbloader@simbologica.it
      errorHandler.smtpTo              = maxdaros@tiscali.it
      errorHandler.smtpSubject         = Document: Ticket: ${ticket.Value}
      errorHandler.nextStage           = null
      errorHandler.tracked             = true

      # MtlDbloader.properties:
      entryStage                       = convert

      convert.stageType                = FlatToXml
      convert.nextStage                = fWriter
      convert.flatToXmlFile            = ../../Pipelines/Dbloader/conversion/MTL-file-convert.xml
      convert.errorStage               = errorHandler
      convert.failOnError              = false
      convert.tracked                  = false

      fWriter.stageType                = FileWriter
      fWriter.ignored                  = true
      fWriter.nextStage                = transform
      fWriter.outputFile               = ../../Pipelines/Dbloader/outputs/MTL-XmlOutput.xml
      fWriter.errorStage               = errorHandler
      fWriter.failOnError              = false
      fWriter.tracked                  = false

      transform.stageType              = XslTransform
      transform.nextStage              = fWriter2
      transform.transformationFile     = ../../Pipelines/Dbloader/conversion/MTL-transform.xsl
      transform.errorStage             = errorHandler
      transform.failOnError            = false
      transform.tracked                = false

      fWriter2.stageType               = FileWriter
      fWriter2.ignored                 = true
      fWriter2.nextStage               = transform2
      fWriter2.outputFile              = ../../Pipelines/Dbloader/outputs/MTL-XmlOutput2.xml
      fWriter2.errorStage              = errorHandler
      fWriter2.failOnError             = false
      fWriter2.tracked                 = false

      transform2.stageType             = XslTransform
      transform2.nextStage             = fWriter3
      transform2.transformationFile    = ../../Pipelines/Dbloader/conversion/MTL-transform2.xsl
      transform2.errorStage            = errorHandler
      transform2.failOnError           = false
      transform2.tracked               = false

      fWriter3.stageType               = FileWriter
      fWriter3.ignored                 = true
      fWriter3.nextStage               = dbwriter
      fWriter3.outputFile              = ../../Pipelines/Dbloader/outputs/MTL-SqlInserts.sql
      fWriter3.errorStage              = errorHandler
      fWriter3.failOnError             = false
      fWriter3.tracked                 = false

      dbwriter.stageType               = SqlWriter
      dbwriter.nextStage               = null
      dbwriter.messageTag              = messageTagErrorSql
      dbwriter.resourceName            = jdbcConnection
      dbwriter.sql                     = $document.contents
      dbwriter.failOnFirst             = false
      dbwriter.useBatch                = true
      dbwriter.errorStage              = errorHandler
      dbwriter.failOnError             = false
      dbwriter.tracked                 = true

      errorHandler.stageType           = SmtpWriter
      errorHandler.smtpHost            = smtp.libero.it
      errorHandler.smtpFrom            = DRGdbloader@simbologica.it
      errorHandler.smtpTo              = maxdaros@tiscali.it
      errorHandler.smtpSubject         = Document: Ticket: ${ticket.Value}
      errorHandler.nextStage           = null
      errorHandler.tracked             = true

      # OpcDbloader.properties:
      entryStage                       = convert

      convert.stageType                = FlatToXml
      convert.nextStage                = fWriter
      convert.flatToXmlFile            = ../../Pipelines/Dbloader/conversion/OPC-file-convert.xml
      convert.errorStage               = errorHandler
      convert.failOnError              = false
      convert.tracked                  = false

      fWriter.stageType                = FileWriter
      fWriter.ignored                  = true
      fWriter.nextStage                = transform
      fWriter.outputFile               = ../../Pipelines/Dbloader/outputs/OPC-XmlOutput.xml
      fWriter.errorStage               = errorHandler
      fWriter.failOnError              = false
      fWriter.tracked                  = false

      transform.stageType              = XslTransform
      transform.nextStage              = fWriter2
      transform.transformationFile     = ../../Pipelines/Dbloader/conversion/OPC-transform.xsl
      transform.errorStage             = errorHandler
      transform.failOnError            = false
      transform.tracked                = false

      fWriter2.stageType               = FileWriter
      fWriter2.ignored                 = true
      fWriter2.nextStage               = transform2
      fWriter2.outputFile              = ../../Pipelines/Dbloader/outputs/OPC-XmlOutput2.xml
      fWriter2.errorStage              = errorHandler
      fWriter2.failOnError             = false
      fWriter2.tracked                 = false

      transform2.stageType             = XslTransform
      transform2.nextStage             = fWriter3
      transform2.transformationFile    = ../../Pipelines/Dbloader/conversion/OPC-transform2.xsl
      transform2.errorStage            = errorHandler
      transform2.failOnError           = false
      transform2.tracked               = false

      fWriter3.stageType               = FileWriter
      fWriter3.ignored                 = true
      fWriter3.nextStage               = dbwriter
      fWriter3.outputFile              = ../../Pipelines/Dbloader/outputs/OPC-SqlInserts.sql
      fWriter3.errorStage              = errorHandler
      fWriter3.failOnError             = false
      fWriter3.tracked                 = false

      dbwriter.stageType               = SqlWriter
      dbwriter.nextStage               = null
      dbwriter.messageTag              = messageTagErrorSql
      dbwriter.resourceName            = jdbcConnection
      dbwriter.sql                     = $document.contents
      dbwriter.failOnFirst             = false
      dbwriter.useBatch                = true
      dbwriter.errorStage              = errorHandler
      dbwriter.failOnError             = false
      dbwriter.tracked                 = true

      errorHandler.stageType           = SmtpWriter
      errorHandler.smtpHost            = smtp.libero.it
      errorHandler.smtpFrom            = DRGdbloader@simbologica.it
      errorHandler.smtpTo              = maxdaros@tiscali.it
      errorHandler.smtpSubject         = Document: Ticket: ${ticket.Value}
      errorHandler.nextStage           = null
      errorHandler.tracked             = true

      This is an example of pipeline type=simple
      I hope this is useful for some reader.

      Massimo

       

Log in to post a comment.

Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.