Pipelines Manager

This Reactor enables creation and management of Data Catalog Pipelines.

Pipelines

Pipelines are defined using document written in the Pipeline JSON schema. Here is a brief, slightly silly example:

{
    "name": "Tacobot 9001",
    "description": "Creates even better breakfast tacos out of silicon, ozone, and shredded GPUs",
    "components": [
        {
            "id": "e1LqaqMyqq4xB",
            "opts": {
                "cheese": true,
                "salsa": true,
                "eggs": false
            }
        },
        {
            "id": "bbq-brisket-0.5.0",
            "parameters": {
                "smoke": true,
                "quantity": 4
            }
        }
    ],
    "processing_levels": [
        "1"
    ],
    "accepts": [
        "CSV"
    ],
    "produces": [
        "PNG"
    ]
}

A Pipeline is a human-readable name and description, a globally-unique string identifier, and a list of components that defines some number of Abaco Actors, Agave Apps, Deployed Containers, and Web Services. Additionally, one or more data “processing levels” are provided as well as the list of file types accepted and emitted. Of these fields, only components is used to create a Pipeline UUID that connects each Pipeline to its compute jobs.

To make this very tangible: In the example above, if *anything*n in components changes, the result will have to be a new UUID. If the name, file types, description, or description change, the UUID remains the same.

Messages

All Pipeline management actions are accomplished by sending JSON-formatted messages to the Pipelines Manager Reactor.

Create a New Pipeline

A new pipeline can be registered by sending its JSON document to the Pipelines Manager as a message like so:

$ abaco run -m "$(jq -c . my_pipeline.json)" G1p783PxpalBB

gOvQRGRVPPOzZ

# Wait a few seconds

$ abaco logs G1p783PxpalBB gOvQRGRVPPOzZ

EGe6NKeo8Oy5 DEBUG Action selected: create
EGe6NKeo8Oy5 INFO Created pipeline 1064aaf1-459c-5e42-820d-b822aa4b3990 with update token 0df45d5e9e0f31e2

Note the pipeline’s UUID (1064aaf1-459c-5e42-820d-b822aa4b3990). This is needed to configure PipelineJobs that reference this Pipeline. Also note the update token (0df45d5e9e0f31e2). This is needed to update the Pipeline at a later date.

Update a Pipeline

A pipeline’s components cannot be updated, but its human-readable name and description can be. To accomplish this, edit the pipeline JSON document with new or amended values and send it to the Pipelines Manager along with a valid update token.

$ abaco run -m "$(jq -c . my_pipeline.json)" -q token=0df45d5e9e0f31e2 G1p783PxpalBB

e5QKEW8L0BeZ4

# Wait a few seconds

$ abaco logs G1p783PxpalBB e5QKEW8L0BeZ4

EkBWRvV1gKG1a DEBUG Action selected: update
EkBWRvV1gKG1a INFO Updated pipeline 1064aaf1-459c-5e42-820d-b822aa4b3990 with update token 0df45d5e9e0f31e2

Retire a Pipeline

A pipeline cannot be deleted, but it can be retired from active service.

Coming soon…

JSON Schemas

pipeline_manager_create
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
{
	"$schema": "http://json-schema.org/draft-07/schema#",
	"$id": "https://schema.catalog.sd2e.org/schemas/pipeline_manager_create.json",
	"title": "PipelinesDefinition",
	"description": "A Pipeline record. POST to PipelinesManager to create.",
	"definitions": {
		"container_repo": {
			"type": "string",
			"description": "a Linux container image repository and tag"
		},
		"agave_app": {
			"description": "Agave application",
			"type": "object",
			"properties": {
				"id": {
					"type": "string",
					"description": "the distinct 'app.id' for the Agave app"
				},
				"inputs": {
					"type": "object",
					"description": "predefined inputs for jobs spawned by the app"
				},
				"parameters": {
					"type": "object",
					"description": "predefined parameters for the jobs spawned by the app",
					"properties": {
						"CONTAINER_IMAGE": {
							"$ref": "#/definitions/container_repo",
							"description": "Linux container image repository and tag for the deployed app"
						}
					}
				}
			},
			"required": ["id", "inputs", "parameters"]
		},
		"container": {
			"description": "Deployed Linux container",
			"type": "object",
			"properties": {
				"repo": {
					"$ref": "#/definitions/container_repo",
					"description": "Linux container image repository and tag for the service"
				},
				"hash": {
					"type": "string",
					"description": "Linux container image hash"
				},
				"options": {
					"type": "object",
					"description": "Deployment options"
				}
			},
			"required": ["repo"]
		},
		"service": {
			"description": "External networked resource",
			"type": "object",
			"properties": {
				"identifier": {
					"type": "string",
					"description": "An identifier for a specific instance of a service"
				},
				"uri": {
					"type": "string",
					"description": "Canonical URI for the resource",
					"format": "uri"
				},
				"options": {
					"type": "object",
					"description": "Additional descriptive attributes"
				}
			},
			"required": ["uri"]
		},
		"abaco_actor": {
			"description": "Abaco Reactor",
			"type": "object",
			"properties": {
				"id": {
					"type": "string",
					"description": "Distinct 'actor.id' for the Reactor"
				},
				"repo": {
					"$ref": "#/definitions/container_repo",
					"description": "Linux container image repository and tag for the deployed actor"
				},
				"options": {
					"type": "object",
					"description": "Predefined runtime options for the Reactor"
				}
			},
			"required": ["id", "repo"]
		},
		"pipeline_type": {
			"description": "the general class of action done by the pipeline",
			"type": "string",
			"enum": ["generic-process", "data-transfer", "metadata-management", "primary-etl", "secondary-etl"],
			"default": "primary-etl"
		},
		"processing_level": {
			"description": "a data processing level",
			"type": "string",
			"enum": ["0", "1", "2", "3"]
		},
		"collections_level": {
			"description": "a data processing level",
			"type": "string",
			"enum": ["reference", "user_file", "challenge_problem", "experiment", "sample", "measurement", "file", "pipeline", "job", "product"]
		}
	},
	"type": "object",
	"properties": {
		"name": {
			"type": "string"
		},
		"description": {
			"type": "string"
		},
		"components": {
			"description": "an unordered array of apps and actors in the pipeline (required)",
			"type": "array",
			"items": {
				"anyOf": [{
						"$ref": "#/definitions/agave_app"
					},
					{
						"$ref": "#/definitions/abaco_actor"
					},
					{
						"$ref": "#/definitions/service"
					},
					{
						"$ref": "#/definitions/container"
					}
				]
			}
		},
		"collections_levels": {
			"type": "array",
			"items": {
				"$ref": "#/definitions/collections_level"
			},
			"description": "level(s) of data input that the pipeline acts upon",
			"default": []
		},
		"processing_levels": {
			"type": "array",
			"description": "level(s) of data product produced by the pipeline",
			"items": {
				"$ref": "#/definitions/processing_level"
			},
			"default": []
		},
		"pipeline_type": {
			"items": {
				"$ref": "#/definitions/pipeline_type"
			}
		},
		"accepts": {
			"type": "array",
			"description": "file types accepted by the pipeline",
			"items": {
				"type": "string"
			},
			"default": ["*"]
		},
		"produces": {
			"type": "array",
			"description": "file types produced by the pipeline",
			"items": {
				"type": "string"
			},
			"default": ["*"]
		},
		"__options": {
			"description": "private object for passing runtime options to a pipeline (optional)",
			"type": "object"
		}
	},
	"required": ["name", "components"],
	"additionalProperties": false
}
pipeline_manager_update
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
	"$schema": "http://json-schema.org/draft-07/schema#",
	"$id": "https://schema.catalog.sd2e.org/schemas/pipeline_manager_update.json",
	"title": "PipelinesDefinitionUpdate",
	"description": "Update a Pipeline record",
	"type": "object",
	"properties": {
		"uuid": {
			"description": "The job UUID",
			"type": "string"
		},
		"body": {"type": "object"},
		"action": {
			"type": "string",
			"enum": ["update"]
		},
		"token": {
			"description": "an authorization token issued when the pipeline was created",
			"type": "string",
			"minLength": 16,
			"maxLength": 17
		},
		"__options": {
			"type": "object",
			"description": "an object used to pass runtime options to a reactor (private, optional)"
		}
	},
	"required": ["uuid", "action", "body", "token"],
	"additionalProperties": false
}