Gerald Manipon edited this page on May 16 · 7 revisions
Confidence Level TBD This article has not been reviewed for accuracy, timeliness, or completeness. Check that this information is valid before acting on it. |
---|
HySDS provides a plugin facility to hook in arbitrary function calls before (pre-processor
functions) and after (post-processor
functions) the execution of a docker command in a job. HySDS provides builtin pre-processor functions that runs by default for each job (hysds.utils.localize_urls
, hysds.utils.mark_localized_datasets
, hysds.utils.validate_checksum_files
) to localize any inputs specified in the job payload. It also provides builtin post-processor functions that runs by default as well (hysds.utils.publish_datasets
) to search the work directory for any HySDS datasets to publish.
Function Definition
Pre-processor and post-processor functions are required to take 2 arguments: the job dict and context dict. These functions can do what they need using information stored in the job and context dicts. The function must return a boolean result, either True
or False
. In the case of pre-processor functions, the docker command for the job is only executed if all pre-processor functions return a True
. In the case of post-processor functions, any that return a False
will be logged but the job will not be considered a failure. In both cases, if the function raises an exception, then the job is a failure.
Pre-processor Functions
By default, job-specs
have an implicit pre-processor function defined, hysds.utils.localize_urls
. Thus the following job-spec
:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
implicitly provides the "pre" parameter as follows:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "pre": [ "hysds.utils.localize_urls" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
To disable this builtin feature, specify the "disable_pre_builtins" parameter as follows:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "disable_pre_builtins": true, "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
In this case, even though there exists a parameter "uri" with destination "localize", the hysds.utils.localize_urls
pre-processor will not run. To specify additional pre-processor functions, define them using the "pre" parameter like so:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "pre": [ "my.custom.preprocessor_function" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
Because the default builtin pre-processor functions aren't disabled, the effect of the above job-spec
is this:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "disable_pre_builtins": true, "pre": [ "hysds.utils.localize_urls", "my.custom.preprocessor_function" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
Post-processor Functions
By default, job-specs
have an implicit post-processor function defined, hysds.utils.publish_datasets
. Thus the following job-spec
:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
implicitly provides the "post" parameter as follows:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "post": [ "hysds.utils.publish_datasets" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
To disable this builtin feature, specify the "disable_post_builtins" parameter as follows:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "disable_post_builtins": true, "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
In this case, even though the docker command may create a HySDS dataset in the job's working directory, the hysds.utils.publish_datasets
post-processor will not run. To specify additional post-processor functions, define them using the "post" parameter like so:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "post": [ "my.custom.postprocessor_function" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
Because the default builtin post-processor functions aren't disabled, the effect of the above job-spec
is this:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "disable_pre_builtins": true, "post": [ "hysds.utils.publish_datasets", "my.custom.preprocessor_function" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
publish_datasets
By default, the hysds.utils.publish_datasets
post-processor function will search for any HySDS datasets under a job’s work directory and run the hysds.dataset_ingest.ingest
function with the force
flag set to False
. Thus if a dataset was previously published the ingest will fail with a no-clobber
error. This ensures data immutability and subsequent jobs that utilize that dataset and its associated metadata as input can be assured the data was not modified.
Can we disable this feature so that products can clobber previously ingested products?
In the context of data lineage and provenance, this is a bad idea. The ideal approach would be to create a new product/dataset with an incremented version or product counter and ingest that. However, in the case of testing during HySDS job/PGE development or in I&T, enabling the clobbering of products can make life easier. To force ingest, we leave it up to the HySDS job/PGE developer to implement the adding of the _force_ingest
flag to the _context.json
of the job:
with open(ctx_file) as f: ctx = json.load(f) ctx['_force_ingest'] = True with open(ctx_file, 'w') as f: json.dump(ctx, f, sort_keys=True, indent=2)
When this is set in _context.json
, the hysds.utils.publish_datasets
post-processor function will set the force
flag to True
when calling thehysds.dataset_ingest.ingest
function on all the HySDS datasets found under the job directory.
triage
HySDS provides an additional builtin post-processor function for providing triage of failed jobs: hysds.utils.triage
. It is not enabled by default and must be explicitly set:
{ "command":"/home/ops/ariamh/ariaml/extractFeatures.sh", "disk_usage":"1GB", "imported_worker_files": { "/home/ops/.netrc": "/home/ops/.netrc", "/home/ops/.aws": "/home/ops/.aws", "/home/ops/ariamh/conf/settings.conf": "/home/ops/ariamh/conf/settings.conf" }, "post": [ "hysds.utils.triage" ], "params" : [ { "name": "uri", "destination": "positional" }, { "name": "uri", "destination": "localize" } ] }
The above job-spec
results in 2 post-processing functions being called after the docker command for the job executes: hysds.utils.publish_datasets
and hysds.utils.triage
. In both cases, the functions perform the proper checks up-front before continuing on with their functionality. For example, hysds.utils.publish_datasets
checks the exit code of the docker command by inspecting the job dict and only proceeds with searching for HySDS datasets if the exit code was 0
. For hysds.utils.triage
, the function checks that the exit code was not a 0
and continues with the creation and publishing of the triage dataset if so. If the job was triaged, there will be a _triaged.json
file left in the job's work directory which contains the JSON result returned by the GRQ ingest REST call.
What gets triaged?
By default, all _*
and *.log
files in the root of the job work directory are triaged.
Can we triage other files as well?
The behavior of the triage function can be modified by updating certain fields in the _context.json
of the job's work directory. Within the docker command that is called, the process can open up the _context.json
and add a top-level parameter named _triage_additional_globs
as a list of glob patterns that will be triaged in addition to the default files. For example, a python script that is called as a docker command for the job can add these files for triage:
with open(ctx_file) as f: ctx = json.load(f) ctx['_triage_additional_globs'] = [ 'S1-IFG*', 'AOI_*', 'celeryconfig.py', 'datasets.json' ] with open(ctx_file, 'w') as f: json.dump(ctx, f, sort_keys=True, indent=2)
Triage is enabled in the job-spec
. Can we disable triage at runtime?
Yes. Similar to the addition of other files to triage, you can add a top-level parameter named _triage_disabled
to disable triage:
with open(ctx_file) as f: ctx = json.load(f) ctx['_triage_additional_globs'] = [ 'S1-IFG*', 'AOI_*', 'celeryconfig.py', 'datasets.json' ] if some_condition: ctx['_triage_disabled'] = True with open(ctx_file, 'w') as f: json.dump(ctx, f, sort_keys=True, indent=2)
Can we customize the job ID format for a triage job?
Yes. By default, the job ID format for triage jobs uses the following python format string:
triaged_job-{job[job_info][id]}-{job[task_id]}
This string format is then passed both the job
payload (e.g. contents of _job.json) and context
payload (e.g. contents of _context.json) at format time.
Similar to the way you can modify the behavior of triage in the previous 2 questions, you can add a top-level parameter named _triage_id_format
to override the default format string:
with open(ctx_file) as f: ctx = json.load(f) ctx['_triage_id_format'] = "my_custom_triage_job-{job[job_info][id]}-{job[job_info][time_start]}" with open(ctx_file, 'w') as f: json.dump(ctx, f, sort_keys=True, indent=2)
Parallelization
By default PGEs will localize and publish datasets one-by-one using the hysds.localize.localize_urls
& hysds.dataset_ingest_bulk.publish_datasets
function
if you want to speed up the process, your PGE can utilize the parallelized version; hysds.localize.localize_urls_parallel
& hysds.dataset_ingest_bulk.publish_datasets_parallel
the parallelized function utilizes multiprocessing (through celery’s
billiard
library)CPUs used will depend on how many available CPUs on the worker and how many datasets to publish
num_procs = min(max(cpu_count() - 2, 1), len(datasets_list))
to use the parallelized localization/publish the PGE’s job spec would have to be changed:
{ "disable_pre_builtins": true, "disable_post_builtins": true, "pre": [ "hysds.localize.localize_urls_parallel", "hysds.utils.mark_localized_datasets", "hysds.utils.validate_checksum_files" ], "post": [ "hysds.dataset_ingest_bulk.publish_datasets_parallel" ] }
example L0A_TE
job (localizing 626 VCID input files and publishing 252 L0A_L_RRST products) logs taken from PCM log file run_sciflo_L0A_TE.log
on a 16-core worker:
2023-06-21 17:17:45 nisar_pcm:INFO [nisar_pge_job_submitter.py:149] Localizing s3://s3-us-west-2.amazonaws.com:80/#####/products/NEN_L_RRST/2023/055/NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_17_44_838873000.vc29/NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_17_44_838873000.vc29 2023-06-21 17:17:45 nisar_pcm:INFO [nisar_pge_job_submitter.py:149] Localizing s3://s3-us-west-2.amazonaws.com:80/#####/products/NEN_L_RRST/2023/055/NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_17_45_840036000.vc29/NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_17_45_840036000.vc29 . . . [2023-06-21 17:24:20,133: INFO/isLocked] Checking lock status of s3://s3-us-west-2.amazonaws.com:80/#####/ancillary/LRCLK_UTC/LRCLK_PPAS/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424 in s3://s3-us-west-2.amazonaws.com:80/#####/ancillary/LRCLK_UTC/LRCLK_PPAS/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424.osaka.locked.json [2023-06-21 17:24:20,182: INFO/transfer] Transferring between s3://s3-us-west-2.amazonaws.com:80/#####/ancillary/LRCLK_UTC/LRCLK_PPAS/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424 and /data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z/NISAR_198900_SCLKSCET_LRCLK_PPAS.00424 2023-06-21 17:24:26 nisar_pcm:INFO [nisar_pge_job_submitter.py:210] job_path: <_io.TextIOWrapper name='/data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z/_job.json' mode='r+' encoding='UTF-8'> 2023-06-21 17:24:26 nisar_pcm:INFO [nisar_pge_wrapper.py:153] dep_img_name: nisar_pge/l0a_te:R3.3.1 2023-06-21 17:24:26 nisar_pcm:INFO [nisar_pge_wrapper.py:155] Working Directory: /data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z
About 7 minutes for all VCID files + anc/aux files.publishing (logs taken from verdi log file nisar-job_worker-sciflo-l0a_te.log
):
[2023-06-21 19:24:07,482: INFO/ForkPoolWorker-1] hysds.job_worker.run_job[46c0ff48-bbe3-403c-bb5b-1202d017fcee]: Running post-processor: hysds.dataset_ingest_bulk.publish_datasets_parallel [2023-06-21 19:24:07,510: INFO/ForkPoolWorker-1] hysds.job_worker.run_job[46c0ff48-bbe3-403c-bb5b-1202d017fcee]: multiprocessing procs used: 14 [2023-06-21 19:24:08,357: INFO/ForkPoolWorker-1] hysds.job_worker.run_job[46c0ff48-bbe3-403c-bb5b-1202d017fcee]: Waiting for dataset publishing tasks to complete... [2023-06-21 19:24:09,339: INFO/SpawnPoolWorker-1:13] child process 477 calling self.run() [2023-06-21 19:24:09,339: INFO/SpawnPoolWorker-1:12] child process 476 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:16] child process 480 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:9] child process 473 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:3] child process 467 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:10] child process 474 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:7] child process 471 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:6] child process 470 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:15] child process 479 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:8] child process 472 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:4] child process 468 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:14] child process 478 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:5] child process 469 calling self.run() [2023-06-21 19:24:09,341: INFO/SpawnPoolWorker-1:11] child process 475 calling self.run() [2023-06-21 19:24:09,356: INFO/hysds.log_utils] datasets: /home/ops/verdi/etc/datasets.json [2023-06-21 19:24:09,356: INFO/hysds.log_utils] prod_path: /data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z/output/datasets/NISAR_L0_RRST_VC29_20230127T014408_20230127T014409_D00330_J_002 [2023-06-21 19:24:09,356: INFO/hysds.log_utils] job_path: /data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z . . . [2023-06-21 19:31:25,035: INFO/hysds.log_utils] Uploading /data/work/jobs/2023/06/21/17/15/SCIFLO_L0A_TE__NSDS-2836-NISAR_S198_SGX_SG12_M00_P00526_R00_C01_G00_2023_055_23_33_10_542229000_vc29_state-config-20230621T171122.625389Z/output/datasets/NISAR_L0_RRST_VC29_20230127T020944_20230127T020946_D00330_J_002/NISAR_L0_RRST_VC29_20230127T020944_20230127T020946_D00330_J_002.context.json to s3://s3-us-west-2.amazonaws.com:80/#####/products/L0A_L_RRST/2023/01/27/NISAR_L0_RRST_VC29_20230127T020944_20230127T020946_D00330_J_002/NISAR_L0_RRST_VC29_20230127T020944_20230127T020946_D00330_J_002.context.json. [2023-06-21 19:31:25,426: INFO/hysds.log_utils] Browse publish is configured. [2023-06-21 19:31:26,732: INFO/ForkPoolWorker-1] hysds.job_worker.run_job[46c0ff48-bbe3-403c-bb5b-1202d017fcee]: publishing 352 dataset(s) to Elasticsearch [2023-06-21 19:31:46,080: INFO/ForkPoolWorker-1] hysds.job_worker.run_job[46c0ff48-bbe3-403c-bb5b-1202d017fcee]: queued 352 dataset(s) to dataset_processed
About 7 minutes for all L0A_L_RRST products which includes h5 files, *.met.json, *.dataset.json, log, runconfig, iso.xml, etc.