Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Page Navigation:

Table of Contents

(blue star) Confidence Level High  This article been formally reviewed and is signed off on by a relevant subject matter expert. 


A Worflow Definition document tells SciFlo:

  • What to run

  • how to run it

  • what the inputs and outputs are

  • The order of the steps

You can find sample workflow definitions here: https://github.com/hysds/chimera/tree/develop/chimera/configs/pge_configs/examples

Workflow Definition Base

When writing the workflow definition document start with the following template:

So what are we doing in the template above?

  • Ln 2-4: Declaring sciflo namespaces

  • Ln 5: Specifying an ID for the workflow e.g “URGENT_RESPONSE_WORKFLOW“

  • Ln 6: Specifying a name for the workflow e.g “Urgent Response Pipeline“

  • Ln 8: Specifying a description for the workflow e.g “Workflow for Urgent response. It runs from L0 to L2 PGEs“

  • Ln 9 - 11: Within the <sf:inputs> block, you can list the inputs to the workflow job that should be
    available to any of the steps of the workflow. This is different than inputs for the workflow
    steps.

  • Ln 12-14: Within the <sf:outputs> block, you will reference the output of one of the steps (likely the
    final step) in the workflow as the output of the workflow job.

  • Ln 15 - 16: In the sf:processes block, you’ll be defining the order and definition of every step - known
    as the process

Workflow Inputs

Code Block
languagexml
<sf:inputs>
  <sf_context>_context.json</sf_context>
</sf:inputs>

You can list the inputs to the workflow job in the format <tag_name>input value</tag_name>

The tag name will be available to use as a variable for all the processes in the workflow.

For example, below is the definition of a process. I want to use to have the _context.json as an input to a process called ORBIT_PGE. So I will list the tag name sf_context in the list on inputs as <sf_context/> (Ln 3).

Code Block
languagexml
<sf:process id="ORBIT_PGE">
        <sf:inputs>
          <sf_context/>
          <pge_orbit_job_params from="@#previous"/>
          <pge_config_file>/path/to/configs/PGE_ORBIT.json</pge_config_file>
          <sys_config_file>/path/to/configs/sys.config.json</sys_config_file>
        </sf:inputs>
        ...
</sf:process>

Workflow Process

Let’s look at how to define a process. To do so you will need the following information:

  1. process id

  2. inputs

  3. outputs

  4. operation description

  5. operation command

Code Block
languagexml
<sf:process id="input_pp_orbit">
  <sf:inputs>
    <sf_context/>
    <pge_config_file>/path/to/configs/PGE_ORBIT.json</pge_config_file>
    <sys_config_file>/path/to/configs/sys.config.json</sys_config_file>
  </sf:inputs>
  <sf:outputs>
    <pge_orbit_job_params/>
  </sf:outputs>
  <sf:operator>
    <sf:description>Pre processing step for Orbit PGE</sf:description>
    <sf:op>
      <sf:binding>python:/path/to/input_preprocessor.py?input_preprocessor.process</sf:binding>
    </sf:op>
  </sf:operator>
</sf:process>

Process ID

This is the unique identifier for the workflow process. In the example above, it’s input_pp_orbit.

Process Inputs

You specify the list of inputs in the sf:inputs block. We follow the format mentioned earlier of <tag_name>input value</tag_name>

Your inputs can be:

Previously defined things like the <sf_context/>

  • A constant string or integer value, e.g.

    <threshold_val>0.5</threshold_val>

    <purpose>urgent_response<purpose>

  • The path to a file, e.g

    <pge_config_file>/abspath/to/configs/PGE_ORBIT.json</pge_config_file>

  • An output of another process, e.g.

    • You can refer to the output of the previous process, where PGE_L0A_job_params is declared as the process’s sf:outputs

      <PGE_L0A_job_params from="@#previous"/>

    • If you want to refer to the output by explicitly mentioning the process’s name then it would look like:

      <PGE_L0A_job_params from="@#input_pp_L0A"/>

The inputs are treated positionally, i.e. they should be listed in the order the function called in the binding accesses them.

Example:

Code Block
<sf:inputs>
  <sf_context/>
  <chimera_config_file>/home/ops/verdi/ops/nisar-pcm/nisar_chimera/configs/chimera_config.yaml</chimera_config_file>
  <pge_config_file>/home/ops/verdi/ops/nisar-pcm/nisar_chimera/configs/pge_configs/PGE_L0A.yaml</pge_config_file>
  <settings_file>/home/ops/verdi/ops/nisar-pcm/conf/settings.yaml</settings_file>
</sf:inputs>

The function called with these inputs is: process(sf_context, chimera_config_file, pge_config_filepath, settings_file)

Process Operation

Code Block
languagexml
<sf:operator>
    <sf:description>Pre processing step for Orbit PGE</sf:description>
    <sf:op>
      <sf:binding>python:/path/to/input_preprocessor.py?input_preprocessor.process</sf:binding>
    </sf:op>
</sf:operator>

In the sf:operator block you need to specify:

  • Description of the operation

  • The operation

The operations are declared in SciFlo by specifying a binding.

The binding can be:

  • function call e.g. a python function

  • job submission to Mozart

Python Function

To call a python function, you need to specify the python script and python function to call in the following way:

Code Block
<sf:binding>python:/path/to/python_script.py?python_script.function_name</sf:binding>

e.g.

Code Block
languagexml
<sf:binding>python:/home/ops/verdi/ops/iems_chimera/chimera/input_preprocessor.py?input_preprocessor.process</sf:binding>

Job Submission

This submits a standalone job to mozart. So it will picked up by a worker which may or may not run on the same node as the SciFlo job. The format of a SciFlo binding for job submission is:

Code Block
languagexml
<sf:binding job_queue="jobs_processed" async="false">parallel:python:?python_script.function_name</sf:binding>

The job submission binding needs to specify:

  • job queue="jobs_processed"

Note

Don’t change the job_queue, it’s the internal queue used by sciflo. The job payload constructed within the run_pge_docker script contains the PGE job's queue name. It is determined from a config file.

  • async mode: specifies whether the job should be run asynchronously or not.

    • False - Sciflo process will hold the workflow from moving forward and wait for the result of the job execution.

    • True - Sciflo process will submit the job and the workflow will go on to the next process (if any).

  • binding

    • parallel:python:?python_script.function_name

    • parallel is specifying that the job will be run in parallel

    • the function called here should return the job payload to submit

e.g.

Code Block
languagexml
 <sf:binding job_queue="jobs_processed" async="false">parallel:python:?run_pge_docker.submit_pge_job</sf:binding>

Process Outputs

You specify the list of inputs in the sf:outputs block.

Code Block
<sf:outputs>
    <output_var_name/>
</sf:outputs>

Whatever is returned from the operation performed in this process will be stored in your output_var_name

For example,

For the binding <sf:binding job_queue="jobs_processed" async="false">parallel:python:?run_pge_docker.submit_pge_job</sf:binding>

The python function’s return statement is:

Code Block
submit_pge_job(sf_context, runconfig, pge_config_file, sys_config_file, wuid=None, job_num=None):
    return job_payload

The output for this process is:

Code Block
<sf:outputs>
  <mozart_job_payload/>
</sf:outputs>

Workflow Output

Code Block
languagexml
<sf:outputs>
  <output_name from="@#process_id"/>
</sf:outputs>

Within the sf:outputs block, you can list the output of the workflow job in the format <output_name from="@#process_id"/>

The output_name should match the name of the output defined the process that is referenced in from="@#process_id"

Here is an example:

The PGE_L0A process defines its output as job_id_PGE_L0A.

Code Block
<sf:process id="PGE_L0A">
  <sf:inputs>
    <sf_context/>
    <PGE_L0A_job_params from="@#previous"/>
    <pge_config_file>/home/ops/verdi/ops/nisar-pcm/nisar_chimera/configs/pge_configs/PGE_L0A.yaml</pge_config_file>
    <settings_file>/home/ops/verdi/ops/nisar-pcm/conf/settings.yaml</settings_file>
    <chimera_config_file>/home/ops/verdi/ops/nisar-pcm/nisar_chimera/configs/chimera_config.yaml</chimera_config_file>
  </sf:inputs>
  <sf:outputs>
    <job_id_PGE_L0A/>
  </sf:outputs>
  <sf:operator>
    <sf:description>Run PGE_L0A on AWS worker</sf:description>
    <sf:op>
      <sf:binding job_queue="jobs_processed" async="false">parallel:python:?run_pge_docker.submit_pge_job</sf:binding>
    </sf:op>
  </sf:operator>
</sf:process>

Now we want this to be the final step of the workflow. So we would say the job_id_PGE_L0A from process PGE_L0A is the output of the workflow. It looks like this:

Code Block
languagexml
<sf:flow id="PGE_L0A">
    <sf:title>PGE_L0A</sf:title>
    <sf:icon>http://sciflo.jpl.nasa.gov/smap_sciflo/web/thumbnails/merged_data.png</sf:icon>
    <sf:description>Run PGE_L0A.</sf:description>
    <sf:inputs>
        <sf_context>_context.json</sf_context>
    </sf:inputs>
    <sf:outputs>
      <job_id_PGE_L0A from="@#PGE_L0A"/>
    </sf:outputs>
    <sf:processes>
    ...
    </sf:processes>
  </sf:flow>
</sf:sciflo>


(lightbulb) Have Questions? Ask a HySDS Developer:

Anyone can join our public Slack channelto learn more about HySDS. JPL employees can join #HySDS-Community

(blue star)

JPLers can also ask HySDS questions atStack Overflow Enterprise

(blue star)

Live Search
placeholderSearch HySDS Wiki