SciFlo Workflow Definition
Page Navigation: |
---|
|
Confidence Level High This article been formally reviewed and is signed off on by a relevant subject matter expert. |
---|
A Worflow Definition document tells SciFlo:
What to run
how to run it
what the inputs and outputs are
The order of the steps
You can find sample workflow definitions here: https://github.com/hysds/chimera/tree/develop/chimera/configs/pge_configs/examples
Workflow Definition Base
When writing the workflow definition document start with the following template:
So what are we doing in the template above?
Ln 2-4: Declaring sciflo namespaces
Ln 5: Specifying an ID for the workflow e.g “URGENT_RESPONSE_WORKFLOW“
Ln 6: Specifying a name for the workflow e.g “Urgent Response Pipeline“
Ln 8: Specifying a description for the workflow e.g “Workflow for Urgent response. It runs from L0 to L2 PGEs“
Ln 9 - 11: Within the
<sf:inputs>
block, you can list the inputs to the workflow job that should be
available to any of the steps of the workflow. This is different than inputs for the workflow
steps.Ln 12-14: Within the
<sf:outputs>
block, you will reference the output of one of the steps (likely the
final step) in the workflow as the output of the workflow job.Ln 15 - 16: In the
sf:processes
block, you’ll be defining the order and definition of every step - known
as the process
Workflow Inputs
<sf:inputs>
<sf_context>_context.json</sf_context>
</sf:inputs>
You can list the inputs to the workflow job in the format <tag_name>input value</tag_name>
The tag name will be available to use as a variable for all the processes in the workflow.
For example, below is the definition of a process. I want to use to have the _context.json
as an input to a process called ORBIT_PGE
. So I will list the tag name sf_context
in the list on inputs as <sf_context/>
(Ln 3).
<sf:process id="ORBIT_PGE">
<sf:inputs>
<sf_context/>
<pge_orbit_job_params from="@#previous"/>
<pge_config_file>/path/to/configs/PGE_ORBIT.json</pge_config_file>
<sys_config_file>/path/to/configs/sys.config.json</sys_config_file>
</sf:inputs>
...
</sf:process>
Workflow Process
Let’s look at how to define a process. To do so you will need the following information:
process id
inputs
outputs
operation description
operation command
<sf:process id="input_pp_orbit">
<sf:inputs>
<sf_context/>
<pge_config_file>/path/to/configs/PGE_ORBIT.json</pge_config_file>
<sys_config_file>/path/to/configs/sys.config.json</sys_config_file>
</sf:inputs>
<sf:outputs>
<pge_orbit_job_params/>
</sf:outputs>
<sf:operator>
<sf:description>Pre processing step for Orbit PGE</sf:description>
<sf:op>
<sf:binding>python:/path/to/input_preprocessor.py?input_preprocessor.process</sf:binding>
</sf:op>
</sf:operator>
</sf:process>
Process ID
This is the unique identifier for the workflow process. In the example above, it’s input_pp_orbit
.
Process Inputs
You specify the list of inputs in the sf:inputs
block. We follow the format mentioned earlier of <tag_name>input value</tag_name>
Your inputs can be:
Previously defined things like the <sf_context/>
A constant string or integer value, e.g.
<threshold_val>0.5</threshold_val>
<purpose>urgent_response<purpose>
The path to a file, e.g
<pge_config_file>/abspath/to/configs/PGE_ORBIT.json</pge_config_file>
An output of another process, e.g.
You can refer to the output of the previous process, where
PGE_L0A_job_params
is declared as the process’ssf:outputs
<PGE_L0A_job_params from="@#previous"/>
If you want to refer to the output by explicitly mentioning the process’s name then it would look like:
<PGE_L0A_job_params from="@#input_pp_L0A"/>
The inputs are treated positionally, i.e. they should be listed in the order the function called in the binding accesses them.
Example:
The function called with these inputs is: process(sf_context, chimera_config_file, pge_config_filepath, settings_file)
Process Operation
In the sf:operator
block you need to specify:
Description of the operation
The operation
The operations are declared in SciFlo by specifying a binding.
The binding can be:
function call e.g. a python function
job submission to Mozart
Python Function
To call a python function, you need to specify the python script and python function to call in the following way:
e.g.
Job Submission
This submits a standalone job to mozart. So it will picked up by a worker which may or may not run on the same node as the SciFlo job. The format of a SciFlo binding for job submission is:
The job submission binding needs to specify:
job queue=
"jobs_processed"
Don’t change the job_queue, it’s the internal queue used by sciflo. The job payload constructed within the run_pge_docker script contains the PGE job's queue name. It is determined from a config file.
async mode: specifies whether the job should be run asynchronously or not.
False - Sciflo process will hold the workflow from moving forward and wait for the result of the job execution.
True - Sciflo process will submit the job and the workflow will go on to the next process (if any).
binding
parallel:python:?python_script.function_name
parallel
is specifying that the job will be run in parallelthe function called here should return the job payload to submit
e.g.
Process Outputs
You specify the list of inputs in the sf:outputs
block.
Whatever is returned from the operation performed in this process will be stored in your output_var_name
For example,
For the binding <sf:binding job_queue="jobs_processed" async="false">parallel:python:?run_pge_docker.submit_pge_job</sf:binding>
The python function’s return statement is:
The output for this process is:
Workflow Output
Within the sf:outputs
block, you can list the output of the workflow job in the format <output_name from="@#process_id"/>
The output_name
should match the name of the output defined the process that is referenced in from="@#process_id"
Here is an example:
The PGE_L0A
process defines its output as job_id_PGE_L0A
.
Now we want this to be the final step of the workflow. So we would say the job_id_PGE_L0A
from process PGE_L0A
is the output of the workflow. It looks like this:
Have Questions? Ask a HySDS Developer: |
Anyone can join our public Slack channel to learn more about HySDS. JPL employees can join #HySDS-Community
|
JPLers can also ask HySDS questions at Stack Overflow Enterprise
|