Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This is the version 1.0 (2019-05-17) Software Design Document (SDD) for the Hybrid Cloud Science Data System (HySDS).

Table of Contents
outlinetrue

Introduction

Purpose

This document provides the high level design information to understand the architecture and design of the core HySDS framework. It is not intended to replace reading the code for details.

Background

The primary responsibility of a science data system (SDS) is to ingest science data as input and process them into higher level science data products as output.

...

keyconstrainttypedescription
commandrequiredstringexecutable path inside container
paramsrequiredarraylist of param objects required to run this job (see param Object below)
imported_worker_filesoptionalobjectmapping of host file/directory into container (see imported_worker_files Object below)
dependency_imagesoptionalarray

list of dependency image objects (see dependency_image Object below)

recommended-queuesoptionalarraylist of recommended queues
disk_usageoptionalstringminimum free disk usage required to run job specified as "\d+(GB|MB|KB)", e.g. "100GB", "20MB", "10KB"
soft_time_limitoptionalintsoft execution time limit in seconds; worker will send a catchable exception to task to allow for cleanup before being killed; effectively a sigterm by the worker to the job; one caveat when determining the soft time limit of your job type: also include time for verdi operations such as docker image loading (on first job), input localization, dataset publishing, triage, etc.
time_limitoptionalinthard execution time limit in seconds; worker will send an uncatchable exception to the task and will force terminate it; effectively a sigkill by the worker to the job; one caveat when determining the hard time limit of your job type: make sure it's at least 60 seconds greater than the soft_time_limit otherwise job states will be orphaned in figaro
preoptionalarraylist of strings specifying pre-processor functions to run; behavior depends on disable_pre_builtins; more info below on Preprocessor Functions
disable_pre_builtinsoptionalbooleanif set to true, default builtin pre-processors (currently [hysds.utils.localize_urls, hysds.utils.mark_localized_datasets, hysds.utils.validate_checksum_files]) are disabled and would need to be specified in pre to run; if not specified or set to false, list of pre-processors specified by pre is appended after the default builtins
postoptionalarraylist of strings specifying post-processor functions to run; behavior depends on disable_post_builtins; more info below on Postprocessor Functions
disable_post_builtinsoptionalbooleanif set to true, default builtin post-processors (currently [hysds.utils.publish_datasets]) are disabled and would need to be specified in post to run; if not specified or set to false, list of post-processors specified by post is appended after the default builtins

...

keykey typevaluevalue type
path to file or directory on hoststringpath to file or directory in containerstring
path to file or directory on hoststringone item list of path to file or directory in containerarray
path to file or directory on hoststringtwo item list of path to file or directory in container and mount mode: ro for read-only and rw for read-write (default is ro)array

Syntax

Code Block
{
  "command": "string",
  "recommended-queues": [ "string" ],
  "disk_usage":"\d+(GB|MB|KB)",
  "imported_worker_files": {
    "string": "string",
    "string": [ "string" ],
    "string": [ "string", "ro" | "rw" ]
  },
  "dependency_images": [
    {
      "container_image_name": "string",
      "container_image_url": "string" | null,
      "container_mappings": {
        "string": "string",
        "string": [ "string" ],
        "string": [ "string", "ro" | "rw" ]
      }
    }
  ],
  "soft_time_limit": int,
  "time_limit": int,
  "disable_pre_builtins": true | false,
  "pre": [ "string" ],
  "disable_post_builtins": true | false,
  "post": [ "string" ],
  "params": [
    {
      "name": "string",
      "destination": "positional" | "localize" | "context"
    }
  ]
}

...

keyconstrainttypedescription
componentoptionalstringcomponent web interface to display this job type in (tosca or figaro); defaults to tosca
labeloptionalstringlabel to be used when this job type is displayed in web interfaces (tosca and figaro); otherwise it will show an automatically generated label based on the string after the "hysds.io.json." of the hysds-io file
submission_typeoptionalstringspecifies if the job should be submitted once per product in query or once per job submission; iteration for a submission of the job for each result in the query or individual for a single submission; defaults to iteration
enable_dedupoptionalbooleanset to true to enable job deduplication; false to disable; defaults to true
action-typeoptionalstringaction type to expose job as; on-demand, trigger, or both; defaults to both
allowed_accountsoptionalarraylist of strings specifying account user IDs allowed to run this job type from the web interfaces (tosca and figaro); if not defined, ops account is the only account that can access this job type; if _all is specified in the list, then all users will be able to access this job type
paramsrequiredarraylist of matching param objects from job-spec required to run this job (see params Object below)

...

keyconstrainttypedescription
namerequiredstringparameter name; should match corresponding parameter name in job-spec
fromrequiredstringvalue for hard-coded parameter value, submitter for user submitted value, dataset_jpath:<jpath> to extract value from ElasticSearch result using a JPath-like notation (see from Specification below)
valuerequired if from is set to valuestringhard-coded parameter value
typeoptionalstringpossible values: text, number, datetime, date, boolean, enum, email, textarea, region, container_version, jobspec_version, hysdsio_version (see Valid Param Types below)
defaultoptionalstringdefault value to use (must be string even if it's a number)
optionaloptionalbooleanparameter is optional and can be left blank
placeholderoptionalstringvalue to use as a hint when displaying the form input
enumerablesrequired if type is enumarraylist of string values to enumerate via a dropdown list in the form input
lambdaoptionalstringa lambda function to process the value during submission
version_regexrequired if type is container_version, jobspec_version or hysdsio_versionstringregex to use to filter on front component of respective container, job-spec or hysds-io ID; e.g. if type=jobspec_version, version_regex=job-time-series and list of all job-specs IDs in the system is ["job-time-series:release-20171103", "job-hello_world:release-20171103", job-time-series:master"], the list will be filtered down to those whose ID matched the regex job-time-series in the front component (string before the first ":"); in this example the resulting filtered set of release tags/branches will be listed as ["release-20171103", "master"] in a dropdown box; similar to type=enum and enumerable set to all release tags for a certain job type

...

valuedescription
texta text string, will be kept as text
numbera real number
datea date in ISO8601 format: YYYY-MM-DD; will be treated as a "text" field for passing into the container
datetimea date with time in ISO8601 format: YYYY-MM-DDTHH:mm:SS.SSS; will be treated as a "text" field
booleantrue or false in a drop down
enumone of a set of options in a drop down; must specify enumerables field to specify the list of possible options; these will be "text" types in the enumerables set
emailan e-mail address, will be treated as "text"
textareasame as text, but displayed larger with the textarea HTML tag
regionauto-populated from the facet view leaflet tool
container_versiona version of an existing container registered in the Mozart API; must define "version_regex" field
jobspec_versiona version of an existing job-spec registered in the Mozart API; must define "version_regex" field
hysdsio_versiona version of an existing hysds-io registered in the Mozart API; must define "version_regex" field

...

The HySDS design provides several points of configurability and extension points, which allows it to be adaptable for specific mission requirements. The goal of HySDS is to provide a scalable and reusable science data system. The re-usable core software is managed as an open source project and is available to the public for use.

SWOT-specific Adaptation

Overview

The SWOT Processing Control & Management (PCM) subsystem extends the functionality provided by HySDS and contains the following functions:

  • Ingest, catalog, and store mission data and ancillary datasets enabling science data processing
  • Ingest mission products retrieved from PO.DAAC, in support of bulk reprocessing
  • Automate science data processing to produce Level 0 through Level 2 KaRIn, AMR (Radiometer), GPS data products, scaling the PCM’s distributed processing system as needed
  • Interface with L0-L2 PGEs and Ancillary Preprocessors
  • Store and catalog L0-L2 science data products and their associated files
  • Monitor data system services, PGE executions, and computing resources by providing operators with a user interface for the PCM system
  • Report data processing and product accounting
  • Make available the science data products for DAAC within the latency requirements
  • Make available data products for CNES within the latency requirements
  • Provide data access to the Science Team

To fulfill the operations need, the PCM is designed to operates automatically 24 hours a day and seven days a week.

The major components of PCM include the following:

  • Resource Manager
  • Workflow Manager
  • Worker Manager
  • Discovery Services
  • Metrics Manager
  • Helper Worker Services
  • Operator Interfaces

Through the adaptation of PCM core function based on HySDS framework, PCM plugs in the SWOT specific functions including the following:

  • PGE-PCM Interfaces
  • External Interfaces to CNES, GDS, and PO.DAAC
  • Customized Operator Interfaces
  • Timer support for data gap processing
  • GBE catalog
  • Product to dataset conversion
  • SWOT-specific reports & metrics collection

The following functional diagram illustrates the PCM components as listed above:

Image Added

The HySDS adaptation of SWOT PCM includes the integration SWOT PGEs into the following processing flow:

Image Added

The Resource Manager (RM)

PCM provides resource management function to SDS for computing power, storage, and their status. This component provisions, manages, and orchestrates computing instances, manages jobs, hosts user interfaces for viewing job status. The RM stores job status information, persists job database beyond PCM services shutdown.

The auto-scaling of computing resources are based on AWS’s Auto Scale feature. This is configured to dynamically allocate and scale the computing instances to support the dynamic variation of daily processing loads.

The Workflow Manager (WM)

The Workflow Manager handles the processes required to convert incoming telemetry into the requisite science data products.  The workflow manager handles the complex order of processes that must operate, as well as the preconditions that must be met to initiate each process.  The SWOT SDS data flow in Figure 3‑1 displays the relationship and order of processes that the SWOT PCM employs to generate Level 0 through Level 2 data products.

The WM handles daily forward processing pipelines. This component automates to trigger workflows based on input dataset rules. The workflows are also automated with time limit to trigger to meet latency requirements. There are daily ~,500 FWD processing jobs and up to 133 K jobs/day during peak bulk reprocessing campaign with the equivalent numbers of data products being generated.The WM supports on-demand and bulk reprocessing.

The WM leveraged from HySDS is based on distributed task/job queue. The jobs are executed on the worker nodes. The WM orchestrates workers that pulls PGE jobs from the queue, executes them, and publishes products. The WM also handles when job processing is interrupted, such as due to spot termination or hardware failures. The system ensures that interrupted jobs are re-queued and processed elsewhere, triaging failed jobs after a set number of retries. This allows operators to investigate failures.

The Discovery Services (DS)

File management is PCM major function. The DS catalogs products and metadata. This component also provides the user’s accesses to view product metadata and browse products. The facet searches support the generation of reports about products and the queries based on defined catalog metadata fields (e.g. product types, granule based, temporal, spatial, etc.)

The Metrics Manager (MM)

For monitoring and reporting purposes, PCM collects and displays worker and job metrics. This component hosts metrics user interface for viewing the resource, products, and job metrics. This allows to generate reports related to SDS processing statistics and product accountability on a defined interval.

The Helper Worker Services

Through the evolution of HySDS, there is already a set of special workers that handle the core processes. The set of workers serve out the non-PGE type jobs such as rule evaluation, e-mail notification, etc.. The key services include the following workers:

  • Ingest Worker
  • Triage Worker

The Ingest Worker automatically ingests files that are routinely and frequently delivered. It detects and retrieves ancillary data from the remote sites or the inbound staging area. PCM uses this function to catalog and store incoming data and extract the metadata. This includes the integrity verification of the ingested data files.

The Triage Worker is to detect early termination of compute resources and resubmit jobs. It also captures the artifacts of failed jobs after a set number of processing attempts.

PGE-PCM Interfaces

A set of PGE-PCM interfaces contains 20+ interfaces, one interface per PGE. PCM and PGE software complies to the interfaces based on each Interface and Control Flow Specification (ICS). Before PCM runs the PGE, PCM determines the location of required and alternate input files based on the selection rules. These input files include lower level product data, ancillary data, and engineering data files. PCM also determines and passes the required parameters to the PGE (e.g. pass and tile information, CRID, etc.). If all required conditions are met, PCM composes and generates the information into an interface file that is passed during the execution of PGE. This file contains parameters, input / output files, working directories, PGE software versions, etc.

When PCM executes a PGE, the following steps occur: 

  1. PCM gathers the required information and generates the RunConfig file that PGE reads
  2. PCM localized inputs needed by PGE
  3. PCM initiates PGE with specified RunConfig
  4. PCM executes PGE and performs post-conditions defined

PCM tracks and monitors the status while PGE is processing. Once the PGE exits, PCM handles the exit conditions. There are two exit scenarios.

  • Success
  • Failure

If the PGE exits successfully, PCM handles the post-conditions including ingestion of the product granule, staging the product to outbound staging areas for CNES and PO.DAAC, update data accounting, and proceeding to the next processor in the pipeline (if not the end of the pipeline).

If the PGE fails, PCM updates the processing status, moves the processing artifacts to the triage area, and either retries the job or notifies to operators.

PCM Granule Boundaries Estimation (GBE) Database

Image Added

PCM uses the GBE to do pass-based, tile-based, and cycle-based processing. Specifically, the GBE PGE reads the MOE file, the Orbit Revolution File, the Along Track Mapping, the Cross Track Mapping, and the Granule Overlap Amount. The GBE PGE uses the times in the Orbit Revolution File and the pass and tile boundaries in the Along Track Mapping and Cross Track Mapping to estimate when the instrument data will cover the geographical regions of the cycles, passes, and tiles.  It also calculates sets of estimates including a percentage of overlap as given in the Granule Overlap Amount, directed by ADT.  The output file contains the actual beginning and ending time estimates for the cycles, passes, and tiles in the along-track direction, and also the sets of overlap start and end times per each product.  PCM uses these time-position mappings to: 

  1. Look up the RangeBeginning and RangeEnding times to give to the L0B PGEs,
  2. Find all L0A_Prime files that overlap with the given RangeBeginning and RangeEnding times
  3. Associate the corresponding granule pass or tile with the given RangeBeginning and RangeEnding times for cataloging and file naming.

External Interface to CNES, GDS, and PO.DAAC

GDS and CNES push the incoming data files to SDS. PCM automatically monitors, detects, and ingests the defined data files at the inbound staging areas. The data files are checked for integrity before they’re ingested into PCM.

In support of the bulk reprocessing, the inputs (e.g., L0 products, ancil/aux data, etc) are retrieved from PO.DAAC. These inputs are deposited in the designated inbound staging area and are subject to the same ingest process.

For archival purpose, PCM makes available the generated data products at the rolling storage and life of instance storage location, notifying the location of the products to PO.DAAC. For data exchange purpose with CNES, PCM make available selected data products at the archive staging location. The CNES and PO.DAAC pull the files to their systems. PCM interfaces to PO.DAAC using Cumulus CNM (Cloud Notification Mechanism) send and recieve messages. The details of the interface mechanisms are defined below:


Image Added

For products generated by CNES, PCM delivers these products to the DAAC using the following process:

  1. Products are placed in the archive staging location by CNES.
  2. PCM generates a CNM-S message which indicates the location of the product. This message is sent to DAAC using AWS Kinesis. 
  3. DAAC ingests the product and generates a CNM-R message, indicating success or failure of ingestion. This message is sent to PCM using an SNS topic which is connected to a AWS Lambda.
  4. PCM processes and stores CNM-R messages. These messages are cataloged and later queried for activities such as bulk reprociessing, where a large number of previously ingested products must be gathered from the DAAC.

Image Added

For products generated by the PCM, messages are delivered to both the DAAC and CNES using the following process:

  1. PCM generates products
  2. PCM creates and copies tar’d SDP datasets to the PCM Archive Staging bucket outgoing directory.
  3. PCM creates CNM-S message for product
    1. PCM immediately creates message and sends to PO.DAAC
      1. PO.DAAC ingests tar file
      2. PO.DAAC returns a CNM-R msg of successful/failed ingestion to PCM SDS via an AWS message service
        1. For success msgs, PCM updates catalog
        2. For failed msgs, PCM SDS troubleshoots
      3. Every X minutes, PCM creates a single CNM-S msg with all the tar files placed in the Archive Bucket outgoing directory and places the CNM-S msg in the PCM Msg Bucket CNM-S directory
    2. CNES polls the PCM Msg Bucket CNM-S directory every Y minutes and pulls the msg(s).
      1. CNES deletes the message
    3. CNES pulls the tar files identified in a msg to CNES
    4. CNES creates one or two CNM-R messages with the success and/or failure of the tar copying activity and places the msg into the PCM Msg Bucket CNM-R directory.
    5. CNES notifies distribution center of product(s)


Metrics & Data Accounting

PCM manages an ElasticSearch database for tracking input products, intermediate products, and all reprocessing campaigns. Metrics collected by the PCM system include:

  • Quantity of products - How many products produced? How many received packets and gaps in packets?
  • Continuity of products - How many files are missing or were not created?
  • Latency of products - How much time to generate outputs?

Metrics are ingested from a variety of sources into the Metrics server's database. Each metrics source generates JSON file with relevant information

  • Job runner on worker node instance
  • Messages received from DAAC
  • Filenames of products received from CNES
  • PGE output

The JSON messages generated from these sources are simple, each with a filename key tying metrics from different sources to a single product. For example:

Code Block
languagejs
titlePCM Data Accounting JSON Example
linenumberstrue
{
    "filename": "",
    "totalPackets": 0, // based on total_packets
    "totalPacketsPriorToDiscard": 0, // based on total_packets - bad_pus - bad_ccsds
    "invalidChecksumDiscardedPackets": 0, // based on bad_ccsds + bad_pus
    "invalidCcsdsHeaderDiscardedPackets": 0, // based on bad_ccsds
    "totalGaps":0, // based on total_packet_gaps
    "gapList": [ { "gapNumber": 0, "lastFrameBeforeGap": "", "nextFrameAfterGap": "" }, // based on total_packet_gaps, packet_gap_time, time format is YYYY-MM-DDThh:mm:ss.sssZ
                 { "gapNumber": 1, "lastFrameBeforeGap": "", "nextFrameAfterGap": "" },
                 { "gapNumber": 2, "lastFrameBeforeGap": "", "nextFrameAfterGap": "" }, etc... ],
}


Information is sent to the Metrics server by these sources, stored in ElasticSearch, and made available in real-time to operators via Kibana. Metric reports are generated by querying the metrics ElasticSearch database and building an XML from the resulting data. Reports can be delivered by email or written to a location as needed. Reports generated at a configurable period such as daily, weekly, or monthly include:

  • Data loss
    • Results generated by PGE to pass via metadata to PCM
    • PCM produces data loss summary report
  • Data Accountability
  • Data Product Latency
  • SDS system statistics
  • Product delivery to PO.DAAC & CNES

Customized Operator Interfaces

The inherited PCM core capability provides helpful and easy-to-view web interfaces including the following:

  • Web application that provides browse, facet-search, and query of data and metadata.
  • Web application that provides tracking of jobs and their status.
  • Metrics Dashboard: web application via Kibana dashboard that provides metrics on processing and celery jobs, celery workers, and Verdi workers,

PCM leverages the inherited user interfaces and customizes as needed.

PCM Deployment

Multiple concurrently operational pipelines for specific processing scenarios including Forward Processing (Persistent), Bulk Processing (Pre-defined input), and ADT Processing (Temporary). Pipelines are deployed in separate AWS accounts. This allows for isolation of permissions, costing, AWS account limits. Certain resources, such as S3 buckets, are shared between accounts. Permissions for read/write access are specific to account and are configureable. PCM's deployment and development process is described in the following diagram:

Screen Shot 2018-04-05 at 3.54.28 PM.pngImage Added

This diagram shows the interactions developers have with with the PCM DEV cluster and how that deployed system flows to I&T and OPS deployments. In this diagram PCM is using Terraform as the PCM Cluster Launcher. PCM's process for any new, operational deployment, is as follows:

  1. System engineer notifies SA team of plan to deploy PCM. SA team provides needed cloud credentials to deploy PCM configuration.
  2. Deployment engineer configures deployment by running PCM deployment configuration tool. Tool generates PCM deployment config. 
  3. Can optionally re-use a pre-generated configuration.
  4. Terraform runs, deploying AWS infrastructure and provisioning PCM components within minutes. Pre-build AMIs are used for EC2 instantiation. 
  5. PCM pulls docker images for PGEs
  6. Terraform completes, providing operator with URLs to operator web interfaces to verify deployment.

When deployed, PCM supports using Amazon EC2 Spot Instance workers:

  • System Engineer specifies price policy (willing to pay per hour).
  • If Spot market price exceeds the specified price, spot instances are terminated with a 2-minute warning.
  • Spot termination likelihood is directly tied to price.

Design for Spot termination notice:

  1. spot_termination_detector is notified of termination by AWS
  2. Sends graceful shutdown signal to node’s docker container
  3. Submits event log that node was marked for termination
  4. Incomplete jobs are tracked in the resource manager as never returning due to timeout tracked in RabbitMQ

Incomplete jobs are re-run on a different EC2 node





Glossary

Acronym/Term

Description

ARIAAdvanced Rapid Imaging and Analysis
ARIA-SGAdvanced Rapid Imaging and Analysis (Earth Observatory of Singapore)
AWSAmazon Web Services
DAACData Active Archive Center
GCPGoogle Cloud Platform
GDSGround Data System
HDDRHigh Density Digital Recorder
HTTP/HTTPSHyperText Transfer Protocol / HyperText Transfer Protocol Secure

HySDS

Hybrid Cloud Science Data System

IDPInterim Digital SAR Processor
JPLJet Propulsion Laboratory
JSONJavaScript Object Notation

NASA

National Aeronautics and Space Administration

NISARNASA-ISRO Synthetic Aperture Radar Mission

PGE

Product Generation Executor/Executable

SARSynthetic Aperture Radar

SDS

Science Data System

SMAPSoil Moisture Active Passive Mission
SSLSecure Sockets Layer
SWOTSurface Water Ocean Topography Mission
WVCCWater Vapor Cloud Climatology
XMLeXtensible Markup Language

...