Pipelines Manager¶
This Reactor enables creation and management of Data Catalog Pipelines.
Pipelines¶
Pipelines are defined using document written in the Pipeline JSON schema. Here is a brief, slightly silly example:
{
"name": "Tacobot 9001",
"description": "Creates even better breakfast tacos out of silicon, ozone, and shredded GPUs",
"components": [
{
"id": "e1LqaqMyqq4xB",
"opts": {
"cheese": true,
"salsa": true,
"eggs": false
}
},
{
"id": "bbq-brisket-0.5.0",
"parameters": {
"smoke": true,
"quantity": 4
}
}
],
"processing_levels": [
"1"
],
"accepts": [
"CSV"
],
"produces": [
"PNG"
]
}
A Pipeline is a human-readable name and description, a globally-unique string
identifier, and a list of components that defines some number of Abaco Actors,
Agave Apps, Deployed Containers, and Web Services. Additionally, one or more
data “processing levels” are provided as well as the list of file types
accepted and emitted. Of these fields, only components
is used to create a
Pipeline UUID that connects each Pipeline to its compute jobs.
To make this very tangible: In the example above, if *anything*n in components
changes, the result will have to be a new UUID. If the name, file types,
description, or description change, the UUID remains the same.
Messages¶
All Pipeline management actions are accomplished by sending JSON-formatted messages to the Pipelines Manager Reactor.
Create a New Pipeline¶
A new pipeline can be registered by sending its JSON document to the Pipelines Manager as a message like so:
$ abaco run -m "$(jq -c . my_pipeline.json)" G1p783PxpalBB
gOvQRGRVPPOzZ
# Wait a few seconds
$ abaco logs G1p783PxpalBB gOvQRGRVPPOzZ
EGe6NKeo8Oy5 DEBUG Action selected: create
EGe6NKeo8Oy5 INFO Created pipeline 1064aaf1-459c-5e42-820d-b822aa4b3990 with update token 0df45d5e9e0f31e2
Note the pipeline’s UUID (1064aaf1-459c-5e42-820d-b822aa4b3990
). This is
needed to configure PipelineJobs that reference this Pipeline. Also
note the update token (0df45d5e9e0f31e2
). This is needed to update the
Pipeline at a later date.
Update a Pipeline¶
A pipeline’s components cannot be updated, but its human-readable name and description can be. To accomplish this, edit the pipeline JSON document with new or amended values and send it to the Pipelines Manager along with a valid update token.
$ abaco run -m "$(jq -c . my_pipeline.json)" -q token=0df45d5e9e0f31e2 G1p783PxpalBB
e5QKEW8L0BeZ4
# Wait a few seconds
$ abaco logs G1p783PxpalBB e5QKEW8L0BeZ4
EkBWRvV1gKG1a DEBUG Action selected: update
EkBWRvV1gKG1a INFO Updated pipeline 1064aaf1-459c-5e42-820d-b822aa4b3990 with update token 0df45d5e9e0f31e2
Retire a Pipeline¶
A pipeline cannot be deleted, but it can be retired from active service.
Coming soon…
JSON Schemas¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schema.catalog.sd2e.org/schemas/pipeline_manager_create.json",
"title": "PipelinesDefinition",
"description": "A Pipeline record. POST to PipelinesManager to create.",
"definitions": {
"container_repo": {
"type": "string",
"description": "a Linux container image repository and tag"
},
"agave_app": {
"description": "Agave application",
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "the distinct 'app.id' for the Agave app"
},
"inputs": {
"type": "object",
"description": "predefined inputs for jobs spawned by the app"
},
"parameters": {
"type": "object",
"description": "predefined parameters for the jobs spawned by the app",
"properties": {
"CONTAINER_IMAGE": {
"$ref": "#/definitions/container_repo",
"description": "Linux container image repository and tag for the deployed app"
}
}
}
},
"required": ["id", "inputs", "parameters"]
},
"container": {
"description": "Deployed Linux container",
"type": "object",
"properties": {
"repo": {
"$ref": "#/definitions/container_repo",
"description": "Linux container image repository and tag for the service"
},
"hash": {
"type": "string",
"description": "Linux container image hash"
},
"options": {
"type": "object",
"description": "Deployment options"
}
},
"required": ["repo"]
},
"service": {
"description": "External networked resource",
"type": "object",
"properties": {
"identifier": {
"type": "string",
"description": "An identifier for a specific instance of a service"
},
"uri": {
"type": "string",
"description": "Canonical URI for the resource",
"format": "uri"
},
"options": {
"type": "object",
"description": "Additional descriptive attributes"
}
},
"required": ["uri"]
},
"abaco_actor": {
"description": "Abaco Reactor",
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Distinct 'actor.id' for the Reactor"
},
"repo": {
"$ref": "#/definitions/container_repo",
"description": "Linux container image repository and tag for the deployed actor"
},
"options": {
"type": "object",
"description": "Predefined runtime options for the Reactor"
}
},
"required": ["id", "repo"]
},
"pipeline_type": {
"description": "the general class of action done by the pipeline",
"type": "string",
"enum": ["generic-process", "data-transfer", "metadata-management", "primary-etl", "secondary-etl"],
"default": "primary-etl"
},
"processing_level": {
"description": "a data processing level",
"type": "string",
"enum": ["0", "1", "2", "3"]
},
"collections_level": {
"description": "a data processing level",
"type": "string",
"enum": ["reference", "user_file", "challenge_problem", "experiment", "sample", "measurement", "file", "pipeline", "job", "product"]
}
},
"type": "object",
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"components": {
"description": "an unordered array of apps and actors in the pipeline (required)",
"type": "array",
"items": {
"anyOf": [{
"$ref": "#/definitions/agave_app"
},
{
"$ref": "#/definitions/abaco_actor"
},
{
"$ref": "#/definitions/service"
},
{
"$ref": "#/definitions/container"
}
]
}
},
"collections_levels": {
"type": "array",
"items": {
"$ref": "#/definitions/collections_level"
},
"description": "level(s) of data input that the pipeline acts upon",
"default": []
},
"processing_levels": {
"type": "array",
"description": "level(s) of data product produced by the pipeline",
"items": {
"$ref": "#/definitions/processing_level"
},
"default": []
},
"pipeline_type": {
"items": {
"$ref": "#/definitions/pipeline_type"
}
},
"accepts": {
"type": "array",
"description": "file types accepted by the pipeline",
"items": {
"type": "string"
},
"default": ["*"]
},
"produces": {
"type": "array",
"description": "file types produced by the pipeline",
"items": {
"type": "string"
},
"default": ["*"]
},
"__options": {
"description": "private object for passing runtime options to a pipeline (optional)",
"type": "object"
}
},
"required": ["name", "components"],
"additionalProperties": false
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schema.catalog.sd2e.org/schemas/pipeline_manager_update.json",
"title": "PipelinesDefinitionUpdate",
"description": "Update a Pipeline record",
"type": "object",
"properties": {
"uuid": {
"description": "The job UUID",
"type": "string"
},
"body": {"type": "object"},
"action": {
"type": "string",
"enum": ["update"]
},
"token": {
"description": "an authorization token issued when the pipeline was created",
"type": "string",
"minLength": 16,
"maxLength": 17
},
"__options": {
"type": "object",
"description": "an object used to pass runtime options to a reactor (private, optional)"
}
},
"required": ["uuid", "action", "body", "token"],
"additionalProperties": false
}
|