Workflow Engine
aka: OIE (OpenGroupware Integration Engine)
The OIE executes CPM, which can be created directly, or compiled from a BPMN (Business Process Markup Notation) such as BPML. The default behavior of a Workflow-Process? is to except BPML and compile that to CPM (currently the only implemented behavior).
Classes
Actions
OIE actions are command pattern objects descended from the ActionCommand class provided by the workflow Logic bundle; ActionCommand itself descends from the Command? class provided by the Core module. All actions consume the same five named parameters:
| input (Required) | The Message? entity the action should use as input. |
| label (Optional) | The label to be applied to the output message of the action, this may be None which will result in an unlabeled message. |
| parameters (Required) | A dictionary, contains the parameters provided to the action. |
| process (Required) | The process? within which the action is performed. |
| uuid (Required) | The UUID of the action within the process's? action sequence. |
| state (Required) | The state dictionary of the current process; actions may store things here to create persistent values; however, this should be used minimally - actions are meant to processes messages in a filter (in-out) model. |
An action is executed in the following sequence:
- parse_action_parameters() - Inspect the value of _params setting any required attributes. As much as possible action parameters should have common defaults.
- verify_action() - Raises an exception if required parameters are not defined.
- do_prepare() - Opens input (_rfile) and output (_wfile) handles. Output is opened to a temporary file.
- do_action() - Perform the action, this method is always implemented by descendents of ActionCommand, the base do_action() performs no action.
- Close input and output handles.
- Import output file into OIE as a Message? optionally with the specified label. If a Message? with the specified label already exists within the process the contents of that Message? is replaced with the output of the current command.
- do_epilogue() - Perform any required clean-up. Remember that at this point input (_rfile) and output (_wfile) handles are already closed.
- Set return value, default is (True, True). The values of the return are _continue and _proceed. If the value of _continue is False the process will enter a parked state, otherwise the process will continue to execute. If the value of _proceed is True the process will proceed to the next action in the process's sequence, if the current action is the last action the process will end as completed. A _proceed value of False will result in the current action being executed again (until it completes with a _proceeed value of True).
Actions are a special use-case of the Command class; actions must not reimplement any methods inherited from Command such as parse_parameters() or run(). Commands? intended to be used as actions within an OIE process? should only implement: parse_action_parameters(), verify_action(), do_prepare(), do_action(), and/or do_epilogue(). An action will typically only need to implement parse_action_parameters() and do_action().
acceptTaskAction? addColumnAction Add a column (attribute) to a record oriented Standard XML message. assignAction Assign a value to a message; either a static value or one derived via an XPath query. completeTask? createTaskAction? getEntityAction Retrieve an entity by id. Entity will the represented in Omphalos XML. ldapSearchAction Query an LDAP DSA. Results are encoded as DSMLv1 (support for DSMLv2 is planned). listContactsAction listEnterprisesAction queueProcessAction Queue a new Process? for execution. readAction Read a message using a format?. Results are encoded as StandardXML. readJSONAction Convert JSON data to XML. regularExpressionFindAction Extract data using a regular expression. rejectTask? rowTemplateAction Create a new message of arbitrary format from a StandardXML row element using a template. sendMailAction Send an electronic mail message. setValueAction? sqlDeleteAction Removes records from an SQL RDBMS table using a StandardXML input message. sqlExecuteAction Execute an arbitrary SQL statement. sqlInsertAction Perform an SQL insert from a StandardXML input message. sqlSelectAction Select values from an SQL RDBMS. Results are encoded as StandardXML. sqlUpdateAction Update records in an SQL RDBMS table from a StandardXML input message. sqlUpsertAction startAction This action is always the first action when a process? is first started. taskAction Perform an action (comment, reject, complete, archive) on a Task? entity. taskCommentAction? transformAction Perform an XSLT transform of an XML message, translateAction TBD writeAction Create or update a message using a format?. xpathAction Create an XML message derived from selected portions of the input message. xpathTestAction? Outputs a value of YES if the provided XPath matches elements of the input message, otherwise outputs a value of NO.
Flow Control
OIE supports foreach and switch flow control structures. These are implemented not as actions but as a core part of the Process Service. Actions are not permitted to alter the sequence of workflow except by raising exceptions.
Commands
The commands provided by the Worklow Logic bundle are:
| format::new? | Create a new named Format? entity. |
| message::new? | Create a new Message? entity. |
| process::new | Create a new Process? entity derived from a specified Route?. |
| route::new | Create a new route? entity. |
| format::delete? | Delete a Format? entity. |
| message::delete? | Delete a Message? entity. |
| process::delete | Delete a Process? entity. This also deletes all the messages contained in the process. |
| route::delete | Delete a Route? entity. This does not delete any related Process? entities. |
| format::get? | Retrieve a Format? entity by name or enumerate all the Format? entities defined in the server. | | |
| message::get? | Retrieve a Message? entity. |
| message::get-text? | Retrieve the raw content of a Message? entity. |
| message::get-handle? | Retrieve a file-like handle to the content of a Message? entity. |
| process::get | Retrieve a Process? entity. |
| process::get-input-message | Retrieve the input Message? entity for the specified Process?. |
| process::get-output-message | Retrieve the output Message? entity for the specified Process? if the process is completer. If the process is not yet complete this command will return a None. |
| process::get-schedule | |
| process::get-messages | Retrieve all the Message? entities associate with the specified Process?. |
| route::get-processes? | Retrieve all the Process? entities derived from the specified Route?. |
| route::get | Retrieve a Route? entity. |
| process::schedule | Schedule a process for execution at a specific date and time, at an interval, or at a chronological pattern. |
| process::signal | Post as signal to an existing, potentially waiting, process. |
| process::start | Signal the workflow executor service to start, or restart, the indicated process. This command requires the current Context to be connected to the service bus. |
| process::unschedule | Remove a job from the scheduler. |
| message::set? | Update a Message? entity. |
| route::set | Update a Route? entity. |
Data Sources
All actions that make use of external backend systems use factory objects from Foundation such as LDAPConnectionFactory and SQLConnectionFactory to access those resources.
Processes
The workflow executor will spin up a process for each Process? for which it receives a start; the process is a Process Service; each is treated as a worker. The worker will initialize the process (if required) and commence execution. One iteration of each of the process's actions will be executed between checking the workers message queue. The worker will shut down when the Process? is complete, failed, or enters a parked status. If an exception occurs during action execution the worker will log the exception to the server's Log and commence executing the process' exception context sub-route. While executing the exception the worker does not check its message queue; once the exception context is complete the worker will shutdown.
Services
The OpenGroupware Integration Engine is comprised of a collection of services that communicate via the AMQ bus.
- coils.workflow.executor - Manages workflow workers including starting new workers and reporting to the rest of the system when a worker completes, fails, or parks.
- coils.workflow.queueManager - Manages the list of queued (not running) workflows, primarily informing the coils.workflow.executor service to start-up new workers for queued processes when worker capacity is available.
- coils.workflow.archivist - Cleans up after completed processes.
- coils.workflow.scheduler - Schedules processes for execution based on date, interval, or chronology.
These services depend on the availability of the coils.clock service.