Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Gerald Manipon edited this page on May 16 · 7 revisions

(blue star) Confidence Level TBD  This article has not been reviewed for accuracy, timeliness, or completeness. Check that this information is valid before acting on it.


HySDS provides a plugin facility to hook in arbitrary function calls before (pre-processorfunctions) and after (post-processor functions) the execution of a docker command in a job. HySDS provides builtin pre-processor functions that runs by default for each job (hysds.utils.localize_urlshysds.utils.mark_localized_datasetshysds.utils.validate_checksum_files) to localize any inputs specified in the job payload. It also provides builtin post-processor functions that runs by default as well (hysds.utils.publish_datasets) to search the work directory for any HySDS datasets to publish.

Function Definition

Pre-processor and post-processor functions are required to take 2 arguments: the job dict and context dict. These functions can do what they need using information stored in the job and context dicts. The function must return a boolean result, either True or False. In the case of pre-processor functions, the docker command for the job is only executed if all pre-processor functions return a True. In the case of post-processor functions, any that return a False will be logged but the job will not be considered a failure. In both cases, if the function raises an exception, then the job is a failure.

Pre-processor Functions

By default, job-specs have an implicit pre-processor function defined, hysds.utils.localize_urls. Thus the following job-spec:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

implicitly provides the "pre" parameter as follows:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "pre": [ "hysds.utils.localize_urls" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

To disable this builtin feature, specify the "disable_pre_builtins" parameter as follows:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "disable_pre_builtins": true,
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

In this case, even though there exists a parameter "uri" with destination "localize", the hysds.utils.localize_urls pre-processor will not run. To specify additional pre-processor functions, define them using the "pre" parameter like so:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "pre": [ "my.custom.preprocessor_function" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

Because the default builtin pre-processor functions aren't disabled, the effect of the above job-spec is this:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "disable_pre_builtins": true,
  "pre": [ "hysds.utils.localize_urls", "my.custom.preprocessor_function" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

Post-processor Functions

By default, job-specs have an implicit post-processor function defined, hysds.utils.publish_datasets. Thus the following job-spec:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

implicitly provides the "post" parameter as follows:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "post": [ "hysds.utils.publish_datasets" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

To disable this builtin feature, specify the "disable_post_builtins" parameter as follows:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "disable_post_builtins": true,
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

In this case, even though the docker command may create a HySDS dataset in the job's working directory, the hysds.utils.publish_datasets post-processor will not run. To specify additional post-processor functions, define them using the "post" parameter like so:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "post": [ "my.custom.postprocessor_function" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

Because the default builtin post-processor functions aren't disabled, the effect of the above job-spec is this:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "disable_pre_builtins": true,
  "post": [ "hysds.utils.publish_datasets", "my.custom.preprocessor_function" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

publish_datasets

By default, the hysds.utils.publish_datasets post-processor function will search for any HySDS datasets under a job’s work directory and run the hysds.dataset_ingest.ingest function with the force flag set to False. Thus if a dataset was previously published the ingest will fail with a no-clobber error. This ensures data immutability and subsequent jobs that utilize that dataset and its associated metadata as input can be assured the data was not modified.

Can we disable this feature so that products can clobber previously ingested products?

In the context of data lineage and provenance, this is a bad idea. The ideal approach would be to create a new product/dataset with an incremented version or product counter and ingest that. However, in the case of testing during HySDS job/PGE development or in I&T, enabling the clobbering of products can make life easier. To force ingest, we leave it up to the HySDS job/PGE developer to implement the adding of the _force_ingest flag to the _context.json of the job:

with open(ctx_file) as f:
ctx = json.load(f)

ctx['_force_ingest'] = True

with open(ctx_file, 'w') as f:
    json.dump(ctx, f, sort_keys=True, indent=2)

When this is set in _context.json, the hysds.utils.publish_datasets post-processor function will set the force flag to True when calling thehysds.dataset_ingest.ingest function on all the HySDS datasets found under the job directory.

triage

HySDS provides an additional builtin post-processor function for providing triage of failed jobs: hysds.utils.triage. It is not enabled by default and must be explicitly set:

{
  "command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
  "disk_usage":"1GB",
  "imported_worker_files": {
    "/home/ops/.netrc": "/home/ops/.netrc",
    "/home/ops/.aws": "/home/ops/.aws",
    "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
  },
  "post": [ "hysds.utils.triage" ],
  "params" : [
    {
      "name": "uri",
      "destination": "positional"
    },
    {
      "name": "uri",
      "destination": "localize"
    }
  ]
}

The above job-spec results in 2 post-processing functions being called after the docker command for the job executes: hysds.utils.publish_datasets and hysds.utils.triage. In both cases, the functions perform the proper checks up-front before continuing on with their functionality. For example, hysds.utils.publish_datasets checks the exit code of the docker command by inspecting the job dict and only proceeds with searching for HySDS datasets if the exit code was 0. For hysds.utils.triage, the function checks that the exit code was not a 0and continues with the creation and publishing of the triage dataset if so. If the job was triaged, there will be a _triaged.json file left in the job's work directory which contains the JSON result returned by the GRQ ingest REST call.

What gets triaged?

By default, all _* and *.log files in the root of the job work directory are triaged.

Can we triage other files as well?

The behavior of the triage function can be modified by updating certain fields in the _context.json of the job's work directory. Within the docker command that is called, the process can open up the _context.json and add a top-level parameter named _triage_additional_globs as a list of glob patterns that will be triaged in addition to the default files. For example, a python script that is called as a docker command for the job can add these files for triage:

with open(ctx_file) as f:
ctx = json.load(f)

ctx['_triage_additional_globs'] = [ 'S1-IFG*', 'AOI_*', 'celeryconfig.py', 'datasets.json' ]

with open(ctx_file, 'w') as f:
    json.dump(ctx, f, sort_keys=True, indent=2)

Triage is enabled in the job-spec. Can we disable triage at runtime?

Yes. Similar to the addition of other files to triage, you can add a top-level parameter named _triage_disabled to disable triage:

with open(ctx_file) as f:
ctx = json.load(f)

ctx['_triage_additional_globs'] = [ 'S1-IFG*', 'AOI_*', 'celeryconfig.py', 'datasets.json' ]

if some_condition:
    ctx['_triage_disabled'] = True

with open(ctx_file, 'w') as f:
    json.dump(ctx, f, sort_keys=True, indent=2)

Can we customize the job ID format for a triage job?

Yes. By default, the job ID format for triage jobs uses the following python format string:

triaged_job-{job[job_info][id]}-{job[task_id]}

This string format is then passed both the job payload (e.g. contents of _job.json) and context payload (e.g. contents of _context.json) at format time.

Similar to the way you can modify the behavior of triage in the previous 2 questions, you can add a top-level parameter named _triage_id_format to override the default format string:

with open(ctx_file) as f:
ctx = json.load(f)

ctx['_triage_id_format'] = "my_custom_triage_job-{job[job_info][id]}-{job[job_info][time_start]}" with open(ctx_file, 'w') as f: json.dump(ctx, f, sort_keys=True, indent=2)

Parallelization

By default PGEs will localize and publish datasets one-by-one using the hysds.localize.localize_urls & hysds.dataset_ingest_bulk.publish_datasets function

if you want to speed up the process, your PGE can utilize the parallelized version; hysds.localize.localize_urls_parallel & hysds.dataset_ingest_bulk.publish_datasets_parallel

  • the parallelized function utilizes multiprocessing (through celery’s billiard library)

  • CPUs used will depend on how many available CPUs on the worker and how many datasets to publish

    • num_procs = min(max(cpu_count() - 2, 1), len(datasets_list))

to use the parallelized localization/publish the PGE’s job spec would have to be changed:

{
  "disable_pre_builtins": true,
  "disable_post_builtins": true,
  "pre": [
    "hysds.localize.localize_urls_parallel",
    "hysds.utils.mark_localized_datasets",
    "hysds.utils.validate_checksum_files"
  ],
  "post": [
    "hysds.dataset_ingest_bulk.publish_datasets_parallel"
  ]
}
  • No labels