chore: update mlmd removal proto/schema design (#12253)

These changes make various enhancements to the initial mlmd design proposal.
See the changes and their descriptions for additional context.

Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
This commit is contained in:
Humair Khan 2025-09-17 12:52:35 -04:00 committed by GitHub
parent 0f014261ac
commit 6e6d0641eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 383 additions and 340 deletions

View File

@ -1,12 +1,67 @@
// Copyright 2025 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3"; syntax = "proto3";
option go_package = "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client";
package kubeflow.pipelines.backend.api.v2beta1;
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
import "protoc-gen-openapiv2/options/annotations.proto";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
schemes: [1, 2], // http + https
responses: {
key: "default";
value: {
schema: {
json_schema: {
ref: ".google.rpc.Status";
}
}
}
}
// Use bearer token for authorizing access to artifact service.
// Kubernetes client library(https://kubernetes.io/docs/reference/using-api/client-libraries/)
// uses bearer token as default for authorization. The section below
// ensures security definition object is generated in the swagger definition.
// For more details see https://github.com/OAI/OpenAPI-Specification/blob/3.0.0/versions/2.0.md#securityDefinitionsObject
security_definitions: {
security: {
key: "Bearer";
value: {
type: TYPE_API_KEY;
in: IN_HEADER;
name: "Authorization";
}
}
}
};
service ArtifactService { service ArtifactService {
// Finds all artifacts within the specified namespace. // Finds all artifacts within the specified namespace.
// Namespace field is required. In multi-user mode, the caller
rpc ListArtifacts(ListArtifactRequest) returns (ListArtifactResponse) { rpc ListArtifacts(ListArtifactRequest) returns (ListArtifactResponse) {
option (google.api.http) = { option (google.api.http) = {
get: "/apis/v2beta1/artifacts" get: "/apis/v2beta1/artifacts"
}; };
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "list_artifacts"
summary: "Finds all artifacts within the specified namespace."
tags: "ArtifactService"
};
} }
// Finds a specific Artifact by ID. // Finds a specific Artifact by ID.
@ -14,13 +69,10 @@ service ArtifactService {
option (google.api.http) = { option (google.api.http) = {
get: "/apis/v2beta1/artifacts/{artifact_id}" get: "/apis/v2beta1/artifacts/{artifact_id}"
}; };
} option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "get_artifact"
// Updates an existing artifact. summary: "Finds a specific Artifact by ID."
rpc UpdateArtifact(UpdateArtifactRequest) returns (Artifact) { tags: "ArtifactService"
option (google.api.http) = {
put: "/apis/v2beta1/artifacts/{artifact.artifact_id}"
body: "artifact"
}; };
} }
@ -28,6 +80,27 @@ service ArtifactService {
option (google.api.http) = { option (google.api.http) = {
get: "/apis/v2beta1/artifact_tasks" get: "/apis/v2beta1/artifact_tasks"
}; };
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "list_artifact_tasks"
summary: "Lists artifact-task relationships."
tags: "ArtifactService"
};
}
// Creates an artifact-task relationship.
// While we always create an artifact-task link when an artifact is created,
// In the case of Importer, we only create a link (and not an artifact)
// if Reimport = false.
rpc CreateArtifactTask(CreateArtifactTaskRequest) returns (ArtifactTask) {
option (google.api.http) = {
post: "/apis/v2beta1/artifact_tasks"
body: "*"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "create_artifact_task"
summary: "Creates an artifact-task relationship."
tags: "ArtifactService"
};
} }
// Creates a new artifact. // Creates a new artifact.
@ -36,40 +109,25 @@ service ArtifactService {
post: "/apis/v2beta1/artifacts" post: "/apis/v2beta1/artifacts"
body: "*" body: "*"
}; };
} option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "create_artifact"
// Logs a metric for a specific task. summary: "Creates a new artifact."
rpc LogMetric(LogMetricRequest) returns (Metric) { tags: "ArtifactService"
option (google.api.http) = {
post: "/apis/v2beta1/metrics"
body: "*"
};
}
// Gets a metric by task ID and name.
rpc GetMetric(GetMetricRequest) returns (Metric) {
option (google.api.http) = {
get: "/apis/v2beta1/metrics/{task_id}/{name}"
};
}
// Lists all metrics.
rpc ListMetrics(ListMetricsRequest) returns (ListMetricsResponse) {
option (google.api.http) = {
get: "/apis/v2beta1/metrics"
}; };
} }
} }
message CreateArtifactRequest { message CreateArtifactRequest {
// Required. The artifact to create. // Required. The artifact to create.
Artifact artifact = 1; Artifact artifact = 1;
}
message UpdateArtifactRequest { // An artifact is always created in the context of a
// Required. The artifact to update. The artifact_id field is required. // run.
Artifact artifact = 1; string run_id = 2;
string task_id = 3;
ArtifactTaskType type = 4;
string producer_task_name = 5;
string producer_key = 6;
} }
message GetArtifactRequest { message GetArtifactRequest {
@ -77,7 +135,6 @@ message GetArtifactRequest {
string artifact_id = 1; string artifact_id = 1;
} }
// Note: This follows the same format as other List operations in KFP backend
message ListArtifactRequest { message ListArtifactRequest {
// Optional input. Namespace for the artifacts. // Optional input. Namespace for the artifacts.
string namespace = 1; string namespace = 1;
@ -116,15 +173,15 @@ message ListArtifactTasksRequest {
// Optional, filter artifact task by a set of task_ids // Optional, filter artifact task by a set of task_ids
// We can also likely just rely on filter for this and omit this field // We can also likely just rely on filter for this and omit this field
repeated string task_ids = 1; repeated string task_ids = 1;
// Optional input, filter artifact task by a set of run_ids // Optional, filter artifact task by a set of run_ids
repeated string run_ids = 2; repeated string run_ids = 2;
// Optional, filter artifact task by a set of artifact_ids // Optional, filter artifact task by a set of artifact_ids
// We can also likely just rely on filter for this and omit this field // We can also likely just rely on filter for this and omit this field
repeated string artifact_ids = 3; repeated string artifact_ids = 3;
// Optional. Only list artifact tasks that have artifacts of this type. // Optional. Only list artifact tasks that have artifacts of this type.
ArtifactTasksType type = 4; ArtifactTaskType type = 4;
string page_token = 5; string page_token = 5;
int32 page_size = 6; int32 page_size = 6;
string sort_by = 7; string sort_by = 7;
@ -132,89 +189,61 @@ message ListArtifactTasksRequest {
} }
message ListArtifactTasksResponse { message ListArtifactTasksResponse {
repeated ArtifactTasks artifact_tasks = 1; repeated ArtifactTask artifact_tasks = 1;
int32 total_size = 2; int32 total_size = 2;
string next_page_token = 3; string next_page_token = 3;
} }
message LogMetricRequest { // Request to create an artifact-task relationship
// Required. The metric to log. message CreateArtifactTaskRequest {
Metric metric = 1; // Required. The artifact-task relationship to create.
ArtifactTask artifact_task = 1;
} }
message GetMetricRequest { // Describes the I/O relationship between
// Required. Task UUID that owns this metric // this Artifact and Task
string task_id = 1; enum ArtifactTaskType {
// Required. Name of the metric
string name = 2;
}
message ListMetricsRequest {
// Optional input, filter metrics by a set of task_ids
repeated string task_ids = 1;
// Optional input, filter metrics by a set of run_ids
repeated string run_ids = 2;
// Optional input. Namespace for the metrics.
string namespace = 3;
// A page token to request the results page.
string page_token = 4;
// The number of metrics to be listed per page.
int32 page_size = 5;
// Sorting order in form of "field_name", "field_name asc" or "field_name desc".
string sort_by = 6;
// A url-encoded, JSON-serialized filter protocol buffer.
string filter = 7;
}
message ListMetricsResponse {
// The list of metrics returned.
repeated Metric metrics = 1;
// The total number of metrics available.
int32 total_size = 2;
// Token to retrieve the next page of results.
string next_page_token = 3;
}
enum ArtifactTasksType {
INPUT = 0; INPUT = 0;
OUTPUT = 1; OUTPUT = 1;
} }
message ArtifactTasks { message ArtifactTask {
// Output only. The unique server generated id of the ArtifactTask.
string id = 1; string id = 1;
string artifact_id = 2; string artifact_id = 2;
string task_id = 3; string run_id = 3;
string task_id = 4;
ArtifactTaskType type = 5;
ArtifactTasks type = 4; // The task that produced this artifact
google.protobuf.Timestamp created_at = 5; // For example in the case of a pipeline channel
} // that is an output artifact you might have as
// input something like the following in the IR:
enum MetricType { // taskOutputArtifact:
METRIC_INPUT = 0; // outputArtifactKey: output_dataset
METRIC_OUTPUT = 1; // producerTask: create-dataset
} // These fields are used to track this lineage.
//
message Metric { // For outputs, the producer task is the component name
// Required. Task UUID that owns this metric // of the task that produced the artifact.
string task_id = 1; string producer_task_name = 6;
// Required. Name of the metric // The key is often the parameter name used
string name = 2; // as input/output on the component, but
// can also take on the value of other values.
// Required. Schema of the metric // For example:
enum Schema { // * "param-#" when using parameters in a ParallelFor
Metric = 0; // * "Output" when using Pythonic Artifacts
ClassificationMetric = 1; //
SlicedClassificationMetric = 2; // For outputs, the key is the name of the parameter
} // in the component spec (found in OutputDefinitions)
Schema schema = 3; // used to output the artifact.
// Value can be double or a valid json, string producer_key = 7;
// but not string_value, bool_value, null_value
// API server validation will be needed
google.protobuf.Value value = 4;
google.protobuf.Timestamp created_at = 5;
// Required. Type of the metric (input/output)
MetricType type = 6;
// The parameter name for the input/output artifact
// This maybe the same as the Artifact name if the
// artifact name is not specified. It is used to
// resolve artifact pipeline channels.
string artifact_key = 8;
} }
// Note to be confused with RuntimeArtifact in pipelinespec // Note to be confused with RuntimeArtifact in pipelinespec
@ -222,38 +251,47 @@ message Artifact {
// Output only. The unique server generated id of the artifact. // Output only. The unique server generated id of the artifact.
// Note: Updated id name to be consistent with other api naming patterns (with prefix) // Note: Updated id name to be consistent with other api naming patterns (with prefix)
string artifact_id = 1; string artifact_id = 1;
// The client provided name of the artifact.
// Required. The client provided name of the artifact.
// Note: it seems in MLMD when name was set, it had to be unique for that type_id // Note: it seems in MLMD when name was set, it had to be unique for that type_id
// this restriction is removed here // this restriction is removed here
// If this is a "Metric" artifact, the name of the metric
// is treated as the Key in its K/V pair.
string name = 2; string name = 2;
string description = 3;
enum ArtifactType { enum ArtifactType {
Artifact = 0; // default; treated as "not set"
Model = 1; // reject if unset.
Dataset = 2; TYPE_UNSPECIFIED = 0;
HTML = 3;
Markdown = 4; Artifact = 1;
Model = 2;
Dataset = 3;
HTML = 4;
Markdown = 5;
Metric = 6;
ClassificationMetric = 7;
SlicedClassificationMetric = 8;
} }
// The name of an ArtifactType. E.g. Dataset // Required. The name of an ArtifactType. E.g. Dataset
ArtifactType type = 3; ArtifactType type = 4;
// The uniform resource identifier of the physical artifact. // The uniform resource identifier of the physical artifact.
// May be empty if there is no physical artifact. // May be empty if there is no physical artifact.
string uri = 4; optional string uri = 5;
// User provided custom properties which are not defined by its type.
map<string, Value> metadata = 5; // Optional. User provided custom properties which are not defined by its type.
map<string, google.protobuf.Value> metadata = 6;
// Used primarily for metrics
optional double number_value = 7;
// Output only. Create time of the artifact in millisecond since epoch. // Output only. Create time of the artifact in millisecond since epoch.
// Note: The type and name is updated from mlmd artifact to be consistent with other backend apis. // Note: The type and name is updated from mlmd artifact to be consistent with other backend apis.
google.protobuf.Timestamp created_at = 6; google.protobuf.Timestamp created_at = 8;
// New field: string namespace = 9;
string namespace = 7;
// In KFP only the Live state is ever used
// As such we don't port this field over, default assumption is all artifacts are live
// we can add more states if we need to in the future
// State state = 8;
// Fields not included from mlmd artifact are: state, external_id, properties, system_metadata, last_update_time_since_epoch, type_id
// Reference: https://raw.githubusercontent.com/kubeflow/pipelines/refs/heads/master/third_party/ml-metadata/ml_metadata/proto/metadata_store.proto
} }

View File

@ -16,12 +16,19 @@
"tasks": [ "tasks": [
{ {
"task_id": "task_id_1", "task_id": "task_id_1",
// This should match the component task names created during sdk compilation
// UI can use this to look up matching tasks in the UI.
// In the case of task_groups this would take on names like: "condition-branches-1, for-loop-1, etc."
"run_id": "run_id", "run_id": "run_id",
"name": "task_name",
"display_name": "train-model", "display_name": "train-model",
"create_time": "2025-08-08T15:15:41Z", "create_time": "2025-08-08T15:15:41Z",
"start_time": "2025-08-08T15:17:36Z", "start_time": "2025-08-08T15:17:36Z",
"end_time": "2025-08-08T15:18:24Z", "end_time": "2025-08-08T15:18:24Z",
"state": "SUCCEEDED", "status": "SUCCEEDED",
"status_metadata": {
"custom_message": "this is a custom message"
},
"state_history": [], "state_history": [],
// in non Runtime types, we expect only one pod // in non Runtime types, we expect only one pod
"pods": [ "pods": [
@ -37,54 +44,26 @@
} }
], ],
"inputs": { "inputs": {
"metrics": [
{
"key": "accuracy_score",
"value": 2123.22,
"task_id": "task_id_1",
"schema": "Metric",
"created_at": "2025-08-08T15:15:41Z",
"type": "METRIC_INPUT"
},
{
"schema": "ClassificationMetrics",
"values": [
{
"confidenceThreshold": 0.999999,
"falsePositiveRate": 0,
"recall": 0
},
{
"confidenceThreshold": 1,
"falsePositiveRate": 0,
"recall": 0.33962264150943394
}],
"task_id": "task_id_1",
"created_at": "2025-08-08T15:15:41Z",
"type": "METRIC_INPUT"
}
],
"artifacts": [ "artifacts": [
{ {
"input_type": "ResolvedInput", // Parameter name is only applicable on a
"name": "input_dataset", // runtime task type
"parameter_name": "some_param",
"producer_task_name": "create_metric",
"producer_key": "my_metric_out_param_name",
"value": { "value": {
"artifact_id": "5", "artifact_id": "1",
"name": "my_dataset", "name": "my_metric",
"type": "Model", "type": "Metric",
"uri": "minio://mlpipeline/v2/artifacts/pipeline/9e68aca5-3afa-4028-8777-f697d858053f/input_dataset", "custom_properties": {},
"custom_properties": { "number_value": 23.21,
"my_data": ["some", "data"], "created_at": "2025-08-08T15:15:41Z"
"more_data": { "can_be": "anythingJSON"}
},
"created_at": "2025-08-08T15:15:41Z",
"namespace": "some_namespace"
} }
}, },
{ {
"input_type": "PipelineChannel", "name": "input_dataset",
"producer_task_name": "output_artifact", "producer_task_name": "create_dataset",
"producer_parameter_id": "some_id", "producer_key": "my_dataset_out_param_name",
"value": { "value": {
"artifact_id": "5", "artifact_id": "5",
"name": "my_dataset", "name": "my_dataset",
@ -94,37 +73,33 @@
"my_data": ["some", "data"], "my_data": ["some", "data"],
"more_data": { "can_be": "anythingJSON"} "more_data": { "can_be": "anythingJSON"}
}, },
"created_at": "2025-08-08T15:15:41Z", "created_at": "2025-08-08T15:15:41Z"
"namespace": "some_namespace"
} }
} }
], ],
"parameters": [ "parameters": [
{ {
// For Runtime tasks we expect to have resolved input parameters
"input_type": "ResolvedInput",
"name": "min_max_scaler", "name": "min_max_scaler",
"value": "false" "value": "false"
}, },
{ {
// For non Runtime tasks, we may get PipelineChannels as input parameters "value": "this",
// PipelineChannel can be of the following forms: "producer": {
// * pipelinechannel--output-msg-output_artifact // For tasks, we may get PipelineChannels as input parameters
// * pipelinechannel--output-msg-Output // PipelineChannel can be of the following forms:
// We convert these to a more machine readable format: // * pipelinechannel--output-msg-output_artifact
"input_type": "PipelineChannel", // * pipelinechannel--output-msg-Output
"producer_task_name": "output-msg", "task_name": "output-msg",
// We call it ID because it's not always a parameter "name" so producer_parameter_name would be misleading // We call it ID because it's not always a parameter "name" so producer_parameter_name would be misleading
// (e.g. in the case of loops pipelinechannel--loop-item-param-1), if we can infer the name of the iteration // (e.g. in the case of loops pipelinechannel--loop-item-param-1), if we can infer the name of the iteration
// param then "name" would be better. // param then "name" would be better.
"producer_parameter_id": "a_msg", "key": "a_msg"
"value": "this" }
} }
] ]
}, },
// Same structure as inputs // Same structure as inputs
"outputs": { "outputs": {
"metrics": [],
"artifacts": [], "artifacts": [],
// At first this is not intuitive, but there are output parameter pipeline channels, like in the case of dsl.collected. // At first this is not intuitive, but there are output parameter pipeline channels, like in the case of dsl.collected.
// This is defined in the pipeline spec, though we don't surface these pipeline channels today in the runtime executions. // This is defined in the pipeline spec, though we don't surface these pipeline channels today in the runtime executions.
@ -132,16 +107,22 @@
// the api should support it regardless. // the api should support it regardless.
"parameters": [] "parameters": []
}, },
"child_tasks": [{"pod_name": "pipeline-j9t66-382940577"}], "child_tasks": [
// Add these new fields
"child_task_ids": [
{ {
"name": "some_task", "task_id": "pipeline-j9t66-382940577",
"id": "task_id_1" "name": "child_task_name",
}, "pods": [
{ {
"name": "another_task", "name": "child_task_pod1",
"id": "task_id_2" "uid" : "some_uid_b",
"type": "EXECUTOR"
},
{
"name": "child_task_pod2",
"uid" : "some_uid_b",
"type": "DRIVER"
}
]
} }
], ],
"type": "LOOP", "type": "LOOP",
@ -149,10 +130,6 @@
"iteration_index": 2, "iteration_index": 2,
// Included for LoopCount, iteration_index & iteration_count are mutually exclusive // Included for LoopCount, iteration_index & iteration_count are mutually exclusive
"iteration_count": 2, "iteration_count": 2,
// This should match the component task names created during sdk compilation
// UI can use this to look up matching tasks in the UI.
// In the case of task_groups this would take on names like: "condition-branches-1, for-loop-1, etc."
"name": "task_name",
"cache_fingerprint": "0d32871640a827e4abaec95747b8780602f38f2f66d447ee70af3a7310d5849e", "cache_fingerprint": "0d32871640a827e4abaec95747b8780602f38f2f66d447ee70af3a7310d5849e",
} }
], ],

View File

@ -8,24 +8,38 @@ import "artifacts.proto";
// to its parent Run. A user that can only "create" a run should not have access to "update" for that // to its parent Run. A user that can only "create" a run should not have access to "update" for that
// the run, without explicitly having that verb. // the run, without explicitly having that verb.
service RunService { service RunService {
rpc CreateTask(CreateTaskRequest) returns (PipelineTaskDetail) { rpc CreateTask(CreateTaskRequest) returns (PipelineTaskDetail) {
option (google.api.http) = { option (google.api.http) = {
post: "/apis/v2beta1/task" post: "/apis/v2beta1/tasks"
body: "task" body: "task"
}; };
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "create_task"
summary: "Creates a new task."
tags: "RunService"
};
} }
rpc UpdateTask(UpdateTaskRequest) returns (PipelineTaskDetail) { rpc UpdateTask(UpdateTaskRequest) returns (PipelineTaskDetail) {
option (google.api.http) = { option (google.api.http) = {
patch: "/apis/v2beta1/task/{task_id}" patch: "/apis/v2beta1/tasks/{task_id}"
body: "task" body: "task"
}; };
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "update_task"
summary: "Updates an existing task."
tags: "RunService"
};
} }
rpc GetTask(GetTaskRequest) returns (PipelineTaskDetail) { rpc GetTask(GetTaskRequest) returns (PipelineTaskDetail) {
option (google.api.http) = { option (google.api.http) = {
get: "/apis/v2beta1/task/{task_id}" get: "/apis/v2beta1/tasks/{task_id}"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "get_task"
summary: "Gets a specific task by ID."
tags: "RunService"
}; };
} }
@ -33,15 +47,14 @@ service RunService {
option (google.api.http) = { option (google.api.http) = {
get: "/apis/v2beta1/tasks" get: "/apis/v2beta1/tasks"
}; };
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
operation_id: "list_tasks"
summary: "Lists tasks with optional filtering."
tags: "RunService"
};
} }
} }
// Task requests should share the rbac with their parent run
// Meaning that if you have get/update/create rights on the parent run
// it should trickle down do that task
// In the create task case, if you have create permissions on the target run
// you can create the task for that run
message CreateTaskRequest { message CreateTaskRequest {
PipelineTaskDetail task = 1; PipelineTaskDetail task = 1;
} }
@ -56,10 +69,13 @@ message GetTaskRequest {
} }
message ListTasksRequest { message ListTasksRequest {
// Optional, List all tasks with this parent task. // Required. Must specify either parent_id or run_id to filter tasks.
string parent_id = 1; oneof parent_filter {
// Optional. List all tasks for this run. // List all tasks with this parent task.
string run_id = 2; string parent_id = 1;
// List all tasks for this run.
string run_id = 2;
}
int32 page_size = 3; int32 page_size = 3;
string page_token = 4; string page_token = 4;
@ -73,90 +89,53 @@ message ListTasksResponse {
int32 total_size = 3; int32 total_size = 3;
} }
// The field numbers are re-numbered // Runtime information of a task execution.
// this can have strong implications
// please review carefully.
message PipelineTaskDetail { message PipelineTaskDetail {
//-------------------------- string name = 1;
// Pre-Existing fields // User specified name of a task that is defined in
//-------------------------- // [Pipeline.spec][].
// Output only. string display_name = 2;
string task_id = 1;
string run_id = 2;
string display_name = 3;
google.protobuf.Timestamp create_time = 4;
google.protobuf.Timestamp start_time = 5;
google.protobuf.Timestamp end_time = 6;
RuntimeState state = 7;
int64 execution_id = 8;
google.rpc.Status error = 9;
string parent_task_id = 10;
repeated RuntimeStatus state_history = 11;
enum PodType { // System-generated ID of a task.
string task_id = 3;
// ID of the parent run.
string run_id = 4;
// Name of the corresponding pod assigned by the orchestration engine.
// Also known as node_id.
enum TaskPodType {
DRIVER = 0; DRIVER = 0;
EXECUTOR = 1; EXECUTOR = 1;
} }
message PodInfo { message TaskPod {
string name = 1; string name = 1;
string uid = 2; string uid = 2;
PodType type = 3; string type = 3;
} }
repeated PodInfo pods = 12; repeated TaskPod pods = 5;
//-------------------------- string cache_fingerprint = 6;
// Remove these fields
//--------------------------
// PipelineTaskExecutorDetail executor_detail = 13; // Creation time of a task.
// map<string, ArtifactList> inputs = 14; google.protobuf.Timestamp create_time = 7;
// map<string, ArtifactList> outputs = 15;
// repeated ChildTask child_tasks = 16;
//-------------------------- // Starting time of a task.
// New fields google.protobuf.Timestamp start_time = 8;
//--------------------------
message ChildTask {
string name = 1;
string id = 2;
}
repeated ChildTask child_tasks_ids = 13;
enum InputType { // Completion time of a task.
ResolvedValue = 0; google.protobuf.Timestamp end_time = 9;
PipelineChannel = 1;
}
message InputOutputs { // Runtime state of a Task
message Parameter { RuntimeState status = 10;
InputType input_type = 1;
string value = 2;
// Fields for Resolved type // Custom status metadata, this can be used to provide
string name = 3; // additional status info for a given task during runtime
map<string, google.protobuf.Value> status_metadata = 11;
// Fields for PipelineChannel type // A sequence of task statuses. This field keeps a record
string producer_task_name = 4; // of state transitions.
string producer_parameter_id = 5; repeated RuntimeStatus state_history = 12;
}
message TaskArtifact {
InputType input_type = 1;
Artifact value = 2;
// Fields for ResolvedValue type
string name = 3;
// Fields for PipelineChannel type
string producer_task_name = 4;
string producer_parameter_id = 5;
}
repeated Parameter parameters = 1;
repeated TaskArtifact artifacts = 2;
repeated Metric metrics = 3; // from artifacts.proto
}
InputOutputs inputs = 14;
InputOutputs outputs = 15;
enum TaskType { enum TaskType {
// Root task replaces Root Execution, it is the top ancestor task to all tasks in the pipeline run // Root task replaces Root Execution, it is the top ancestor task to all tasks in the pipeline run
@ -172,14 +151,80 @@ message PipelineTaskDetail {
// a driver. // a driver.
DAG = 7; DAG = 7;
} }
TaskType type = 16; TaskType type = 13;
// Optional. Applies to type LOOP_ITERATION message TypeAttributes {
int64 iteration_index = 17; // Optional. Applies to type LOOP_ITERATION
// Optional. Applies to type LOOP int64 iteration_index = 1;
int64 iteration_count = 18; // Optional. Applies to type LOOP
string name = 19; int64 iteration_count = 2;
string cache_fingerprint = 20; }
TypeAttributes type_attributes = 14;
// The error that occurred during task execution.
// Only populated when the task is in FAILED or CANCELED state.
google.rpc.Status error = 15;
// ID of the parent task if the task is within a component scope.
// Empty if the task is at the root level.
string parent_task_id = 16;
// A dependent task that requires this one to succeed.
// Represented by either task_id or pod_name.
message ChildTask {
// System-generated ID of a task.
string task_id = 1;
string name = 2;
// Name of the corresponding pod assigned by the orchestration engine.
// Also known as node_id.
repeated TaskPod pods = 3;
}
// Sequence of dependent tasks.
repeated ChildTask child_tasks = 17;
message InputOutputs {
message IOProducer {
string task_name = 1;
// This would be the equivalent of output_parameter_key from the upstream task
// when it's a parameter input, or output_artifact_key when it is an Artifact.
string key = 2;
}
message Parameter {
string value = 1;
// Optional, this is only included on Runtime Tasks when the parameter name is known.
optional string name = 2;
// Not all Parameters have task producers,
// For example they can also be Runtime Constants.
// Whereas in the case of a PipelineChannel, they
// do have a producer.
optional IOProducer producer = 3;
}
message IOArtifact {
// Optional, this is only included on Runtime Tasks when the parameter name is known.
string parameter_name = 1;
Artifact value = 2;
// All IO artifacts have a producer, so the following
// fields are required. In the case of importer
// where the artifact is set to reimport = true
// the name & key are importer-[0-9]+ and "artifact"
IOProducer producer = 3;
}
repeated Parameter parameters = 1;
// Output Only. To create Artifacts for a task are created
// via ArtifactTasks.
repeated IOArtifact artifacts = 2;
}
InputOutputs inputs = 18;
InputOutputs outputs = 19;
} }

View File

@ -6,9 +6,12 @@ CREATE TABLE `artifacts`
-- URI is immutable, reject in API server call if update artifact attempts to change the URI for a pre-existing artifact -- URI is immutable, reject in API server call if update artifact attempts to change the URI for a pre-existing artifact
`Uri` text, `Uri` text,
`Name` varchar(128) DEFAULT NULL, `Name` varchar(128) DEFAULT NULL,
`Description` varchar(128) DEFAULT NULL,
`CreatedAtInSec` bigint NOT NULL DEFAULT '0', `CreatedAtInSec` bigint NOT NULL DEFAULT '0',
`LastUpdateInSec` bigint NOT NULL DEFAULT '0', `LastUpdateInSec` bigint NOT NULL DEFAULT '0',
`Properties` JSON DEFAULT NULL, -- equivalent to mlmd custom properties `Metadata` JSON DEFAULT NULL, -- equivalent to mlmd custom properties
`NumberValue` bigint DEFAULT NULL, -- used for metrics
PRIMARY KEY (`UUID`), PRIMARY KEY (`UUID`),
KEY idx_type_namespace (`Namespace`, `Type`), KEY idx_type_namespace (`Namespace`, `Type`),
KEY idx_created_timestamp (`CreatedAtInSec`), KEY idx_created_timestamp (`CreatedAtInSec`),
@ -18,17 +21,25 @@ CREATE TABLE `artifacts`
-- Analogous to an mlmd Event, except it is specific to artifacts <-> tasks (instead of executions) -- Analogous to an mlmd Event, except it is specific to artifacts <-> tasks (instead of executions)
CREATE TABLE `artifact_tasks` CREATE TABLE `artifact_tasks`
( (
`UUID` varchar(191) NOT NULL, `UUID` varchar(191) NOT NULL,
`ArtifactID` varchar(191) NOT NULL, `ArtifactID` varchar(191) NOT NULL,
`TaskID` varchar(191) NOT NULL, `TaskID` varchar(191) NOT NULL,
-- 0 for INPUT, 1 for OUTPUT -- 0 for INPUT, 1 for OUTPUT
`Type` int NOT NULL, `Type` int NOT NULL,
`RunUUID` varchar(191) NOT NULL,
`ProducerTaskName` varchar(128) NOT NULL,
`ProducerKey` varchar(128) NOT NULL,
`ArtifactKey` varchar(128) NOT NULL,
PRIMARY KEY (`UUID`), PRIMARY KEY (`UUID`),
UNIQUE KEY `UniqueLink` (`ArtifactID`,`TaskID`,`Type`), UNIQUE KEY `UniqueLink` (`ArtifactID`,`TaskID`,`Type`),
KEY `idx_link_task_id` (`TaskID`), KEY `idx_link_task_id` (`TaskID`),
KEY `idx_link_artifact_id` (`ArtifactID`), KEY `idx_link_artifact_id` (`ArtifactID`),
KEY `idx_created_timestamp` (`CreatedAtInSec`), KEY `idx_created_timestamp` (`CreatedAtInSec`),
KEY `idx_run_uuid` (`RunUUID`),
CONSTRAINT `fk_artifact_tasks_run_details` FOREIGN KEY (`RunUUID`) REFERENCES `run_details` (`UUID`) ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT fk_artifact_tasks_tasks FOREIGN KEY (TaskID) REFERENCES tasks (UUID) ON DELETE CASCADE ON UPDATE CASCADE, CONSTRAINT fk_artifact_tasks_tasks FOREIGN KEY (TaskID) REFERENCES tasks (UUID) ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT fk_artifact_tasks_artifacts FOREIGN KEY (ArtifactID) REFERENCES artifacts (UUID) ON DELETE CASCADE ON UPDATE CASCADE CONSTRAINT fk_artifact_tasks_artifacts FOREIGN KEY (ArtifactID) REFERENCES artifacts (UUID) ON DELETE CASCADE ON UPDATE CASCADE
); );
@ -39,27 +50,20 @@ CREATE TABLE `tasks`
`Namespace` varchar(63) NOT NULL, -- updated to 63 (max namespace size in k8s) `Namespace` varchar(63) NOT NULL, -- updated to 63 (max namespace size in k8s)
-- This is used for searching for cached_fingerprints today -- This is used for searching for cached_fingerprints today
-- likely to prevent caching across pipelines -- likely to prevent caching across pipelines
`PipelineName` varchar(128) NOT NULL, `PipelineName` varchar(128) NOT NULL,
`RunUUID` varchar(191) NOT NULL, `RunUUID` varchar(191) NOT NULL,
`PodNames` json NOT NULL, -- This is broken today and will need to be fixed `Pods` json NOT NULL, -- This is broken today and will need to be fixed
`CreatedAtInSec` bigint NOT NULL, `CreatedAtInSec` bigint NOT NULL,
`StartedInSec` bigint DEFAULT '0', `StartedInSec` bigint DEFAULT '0',
`FinishedInSec` bigint DEFAULT '0', `FinishedInSec` bigint DEFAULT '0',
`Fingerprint` varchar(255) NOT NULL, `Fingerprint` varchar(255) NOT NULL,
`Name` varchar(128) DEFAULT NULL, `Name` varchar(128) DEFAULT NULL,
`ParentTaskUUID` varchar(191) DEFAULT NULL, `DisplayName` varchar(128) DEFAULT NULL,
`State` varchar(64) DEFAULT NULL, `ParentTaskUUID` varchar(191) DEFAULT NULL,
`StateHistory` json, `Status` varchar(64) DEFAULT NULL,
-- Remove the following: `StatusMetadata` json DEFAULT NULL,
-- `MLMDExecutionID` varchar(255) NOT NULL, `InputParameters` json,
-- `MLMDInputs` longtext, `OutputParameters` json,
-- `MLMDOutputs` longtext,
-- `ChildrenPods` longtext,
-- `Payload` longtext,
-- New fields:
`InputParameters` json,
`OutputParameters` json,
-- Corresponds to the executions created for each driver pod, which result in a Node on the Run Graph. -- Corresponds to the executions created for each driver pod, which result in a Node on the Run Graph.
-- E.g values are: Runtime, Condition, Loop, etc. -- E.g values are: Runtime, Condition, Loop, etc.
`Type` varchar(64) NOT NULL, `Type` varchar(64) NOT NULL,
@ -77,24 +81,3 @@ CREATE TABLE `tasks`
CONSTRAINT `fk_tasks_parent_task` FOREIGN KEY (`ParentTaskUUID`) REFERENCES tasks (`UUID`) ON DELETE CASCADE ON UPDATE CASCADE, CONSTRAINT `fk_tasks_parent_task` FOREIGN KEY (`ParentTaskUUID`) REFERENCES tasks (`UUID`) ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT `tasks_RunUUID_run_details_UUID_foreign` FOREIGN KEY (`RunUUID`) REFERENCES `run_details` (`UUID`) ON DELETE CASCADE ON UPDATE CASCADE CONSTRAINT `tasks_RunUUID_run_details_UUID_foreign` FOREIGN KEY (`RunUUID`) REFERENCES `run_details` (`UUID`) ON DELETE CASCADE ON UPDATE CASCADE
); );
-- We will also revamp the Metrics table, it is not used today so we can drop it
-- and recreate it as needed without worrying about breaking changes
CREATE TABLE `run_metrics`
(
`TaskID` varchar(191) NOT NULL,
`Name` varchar(128) NOT NULL,
`NumberValue` double DEFAULT NULL,
`Namespace` varchar(63) NOT NULL,
`JsonValue` JSON DEFAULT NULL,
`CreatedAtInSec` bigint NOT NULL,
-- 0 for INPUT, 1 for OUTPUT
`Type` int NOT NULL,
-- Metric, ClassificationMetric, SlicedClassificationMetric
`Schema` varchar(64) NOT NULL,
PRIMARY KEY (`TaskID`, `Name`),
KEY idx_number_value (`NumberValue`),
KEY idx_created_timestamp (`CreatedAtInSec`),
CONSTRAINT fk_run_metrics_tasks FOREIGN KEY (TaskID) REFERENCES tasks (UUID) ON DELETE CASCADE ON UPDATE CASCADE
);