Preprocessor and Postprocessor Functions
Gerald Manipon edited this page on May 16 · 7 revisions
Confidence Level TBD This article has not been reviewed for accuracy, timeliness, or completeness. Check that this information is valid before acting on it. |
---|
HySDS provides a plugin facility to hook in arbitrary function calls before (pre-processor
functions) and after (post-processor
functions) the execution of a docker command in a job. HySDS provides builtin pre-processor functions that runs by default for each job (hysds.utils.localize_urls
, hysds.utils.mark_localized_datasets
, hysds.utils.validate_checksum_files
) to localize any inputs specified in the job payload. It also provides builtin post-processor functions that runs by default as well (hysds.utils.publish_datasets
) to search the work directory for any HySDS datasets to publish.
Function Definition
Pre-processor and post-processor functions are required to take 2 arguments: the job dict and context dict. These functions can do what they need using information stored in the job and context dicts. The function must return a boolean result, either True
or False
. In the case of pre-processor functions, the docker command for the job is only executed if all pre-processor functions return a True
. In the case of post-processor functions, any that return a False
will be logged but the job will not be considered a failure. In both cases, if the function raises an exception, then the job is a failure.
Pre-processor Functions
By default, job-specs
have an implicit pre-processor function defined, hysds.utils.localize_urls
. Thus the following job-spec
:
{
"command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
"disk_usage":"1GB",
"imported_worker_files": {
"/home/ops/.netrc": "/home/ops/.netrc",
"/home/ops/.aws": "/home/ops/.aws",
"/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
},
"params" : [
{
"name": "uri",
"destination": "positional"
},
{
"name": "uri",
"destination": "localize"
}
]
}
implicitly provides the "pre" parameter as follows:
{
"command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
"disk_usage":"1GB",
"imported_worker_files": {
"/home/ops/.netrc": "/home/ops/.netrc",
"/home/ops/.aws": "/home/ops/.aws",
"/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
},
"pre": [ "hysds.utils.localize_urls" ],
"params" : [
{
"name": "uri",
"destination": "positional"
},
{
"name": "uri",
"destination": "localize"
}
]
}
To disable this builtin feature, specify the "disable_pre_builtins" parameter as follows:
{
"command":"/home/ops/ariamh/ariaml/extractFeatures.sh",
"disk_usage":"1GB",
"imported_worker_files": {
"/home/ops/.netrc": "/home/ops/.netrc",
"/home/ops/.aws": "/home/ops/.aws",
"/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf"
},
"disable_pre_builtins": true,
"params" : [
{
"name": "uri",
"destination": "positional"
},
{
"name": "uri",
"destination": "localize"
}
]
}
In this case, even though there exists a parameter "uri" with destination "localize", the hysds.utils.localize_urls
pre-processor will not run. To specify additional pre-processor functions, define them using the "pre" parameter like so:
Because the default builtin pre-processor functions aren't disabled, the effect of the above job-spec
is this:
Post-processor Functions
By default, job-specs
have an implicit post-processor function defined, hysds.utils.publish_datasets
. Thus the following job-spec
:
implicitly provides the "post" parameter as follows:
To disable this builtin feature, specify the "disable_post_builtins" parameter as follows:
In this case, even though the docker command may create a HySDS dataset in the job's working directory, the hysds.utils.publish_datasets
post-processor will not run. To specify additional post-processor functions, define them using the "post" parameter like so:
Because the default builtin post-processor functions aren't disabled, the effect of the above job-spec
is this:
publish_datasets
By default, the hysds.utils.publish_datasets
post-processor function will search for any HySDS datasets under a job’s work directory and run the hysds.dataset_ingest.ingest
function with the force
flag set to False
. Thus if a dataset was previously published the ingest will fail with a no-clobber
error. This ensures data immutability and subsequent jobs that utilize that dataset and its associated metadata as input can be assured the data was not modified.
Can we disable this feature so that products can clobber previously ingested products?
In the context of data lineage and provenance, this is a bad idea. The ideal approach would be to create a new product/dataset with an incremented version or product counter and ingest that. However, in the case of testing during HySDS job/PGE development or in I&T, enabling the clobbering of products can make life easier. To force ingest, we leave it up to the HySDS job/PGE developer to implement the adding of the _force_ingest
flag to the _context.json
of the job:
When this is set in _context.json
, the hysds.utils.publish_datasets
post-processor function will set the force
flag to True
when calling thehysds.dataset_ingest.ingest
function on all the HySDS datasets found under the job directory.
triage
HySDS provides an additional builtin post-processor function for providing triage of failed jobs: hysds.utils.triage
. It is not enabled by default and must be explicitly set:
The above job-spec
results in 2 post-processing functions being called after the docker command for the job executes: hysds.utils.publish_datasets
and hysds.utils.triage
. In both cases, the functions perform the proper checks up-front before continuing on with their functionality. For example, hysds.utils.publish_datasets
checks the exit code of the docker command by inspecting the job dict and only proceeds with searching for HySDS datasets if the exit code was 0
. For hysds.utils.triage
, the function checks that the exit code was not a 0
and continues with the creation and publishing of the triage dataset if so. If the job was triaged, there will be a _triaged.json
file left in the job's work directory which contains the JSON result returned by the GRQ ingest REST call.
What gets triaged?
By default, all _*
and *.log
files in the root of the job work directory are triaged.
Can we triage other files as well?
The behavior of the triage function can be modified by updating certain fields in the _context.json
of the job's work directory. Within the docker command that is called, the process can open up the _context.json
and add a top-level parameter named _triage_additional_globs
as a list of glob patterns that will be triaged in addition to the default files. For example, a python script that is called as a docker command for the job can add these files for triage:
Triage is enabled in the job-spec
. Can we disable triage at runtime?
Yes. Similar to the addition of other files to triage, you can add a top-level parameter named _triage_disabled
to disable triage:
Can we customize the job ID format for a triage job?
Yes. By default, the job ID format for triage jobs uses the following python format string:
triaged_job-{job[job_info][id]}-{job[task_id]}
This string format is then passed both the job
payload (e.g. contents of _job.json) and context
payload (e.g. contents of _context.json) at format time.
Similar to the way you can modify the behavior of triage in the previous 2 questions, you can add a top-level parameter named _triage_id_format
to override the default format string:
Parallelization
By default PGEs will localize and publish datasets one-by-one using the hysds.localize.localize_urls
& hysds.dataset_ingest_bulk.publish_datasets
function
if you want to speed up the process, your PGE can utilize the parallelized version; hysds.localize.localize_urls_parallel
& hysds.dataset_ingest_bulk.publish_datasets_parallel
the parallelized function utilizes multiprocessing (through celery’s
billiard
library)CPUs used will depend on how many available CPUs on the worker and how many datasets to publish
parallelization will scale up depending on how many CPUs on the worker; more CPUs → faster localization/publish
to use the parallelized localization/publish the PGE’s job spec would have to be changed:
example L0A_TE
job (localizing 626 VCID input files and publishing 252 L0A_L_RRST products) logs taken from PCM log file run_sciflo_L0A_TE.log
on a 16-core worker:
About 7 minutes for all VCID files + anc/aux files.publishing (logs taken from verdi log file nisar-job_worker-sciflo-l0a_te.log
):
About 7 minutes for all L0A_L_RRST products which includes h5 files, *.met.json, *.dataset.json, log, runconfig, iso.xml, etc.