pipelines/sdk/python/kfp/deprecated/dsl/artifact.py

216 lines
7.7 KiB
Python

# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for MLMD artifact in KFP SDK."""
from typing import Any, Optional
from absl import logging
import importlib
import yaml
from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.deprecated.dsl import serialization_utils
from kfp.deprecated.dsl import artifact_utils
KFP_ARTIFACT_ONTOLOGY_MODULE = 'kfp.dsl.ontology_artifacts'
DEFAULT_ARTIFACT_SCHEMA = 'title: kfp.Artifact\ntype: object\nproperties:\n'
class Artifact(object):
"""KFP Artifact Python class.
Artifact Python class/object mainly serves following purposes in different
period of its lifecycle.
1. During compile time, users can use Artifact class to annotate I/O types of
their components.
2. At runtime, Artifact objects provide helper function/utilities to access
the underlying RuntimeArtifact pb message, and provide additional layers
of validation to ensure type compatibility for fields specified in the
instance schema.
"""
TYPE_NAME = "kfp.Artifact"
# Initialization flag to support setattr / getattr behavior.
_initialized = False
def __init__(self, instance_schema: Optional[str] = None):
"""Constructs an instance of Artifact.
Setups up self._metadata_fields to perform type checking and
initialize RuntimeArtifact.
"""
if self.__class__ == Artifact:
if not instance_schema:
raise ValueError(
'The "instance_schema" argument must be set for Artifact.')
self._instance_schema = instance_schema
else:
if instance_schema:
raise ValueError(
'The "instance_schema" argument must not be passed for Artifact \
subclass: {}'.format(self.__class__))
# setup self._metadata_fields
self.TYPE_NAME, self._metadata_fields = artifact_utils.parse_schema(
self._instance_schema)
# Instantiate a RuntimeArtifact pb message as the POD data structure.
self._artifact = pipeline_spec_pb2.RuntimeArtifact()
# Stores the metadata for the Artifact.
self.metadata = {}
self._artifact.type.CopyFrom(
pipeline_spec_pb2.ArtifactTypeSchema(
instance_schema=self._instance_schema))
self._initialized = True
@property
def type_schema(self) -> str:
"""Gets the instance_schema for this Artifact object."""
return self._instance_schema
def __getattr__(self, name: str) -> Any:
"""Custom __getattr__ to allow access to artifact metadata."""
if name not in self._metadata_fields:
raise AttributeError(
'No metadata field: {} in artifact.'.format(name))
return self.metadata[name]
def __setattr__(self, name: str, value: Any):
"""Custom __setattr__ to allow access to artifact metadata."""
if not self._initialized:
object.__setattr__(self, name, value)
return
metadata_fields = {}
if self._metadata_fields:
metadata_fields = self._metadata_fields
if name not in self._metadata_fields:
if (name in self.__dict__ or
any(name in c.__dict__ for c in self.__class__.mro())):
# Use any provided getter / setter if available.
object.__setattr__(self, name, value)
return
# In the case where we do not handle this via an explicit getter /
# setter, we assume that the user implied an artifact attribute store,
# and we raise an exception since such an attribute was not explicitly
# defined in the Artifact PROPERTIES dictionary.
raise AttributeError('Cannot set an unspecified metadata field:{} \
on artifact. Only fields specified in instance schema can be \
set.'.format(name))
# Type checking to be performed during serialization.
self.metadata[name] = value
def _update_runtime_artifact(self):
"""Verifies metadata is well-formed and updates artifact instance."""
artifact_utils.verify_schema_instance(self._instance_schema,
self.metadata)
if len(self.metadata) != 0:
metadata_protobuf_struct = struct_pb2.Struct()
metadata_protobuf_struct.update(self.metadata)
self._artifact.metadata.CopyFrom(metadata_protobuf_struct)
@property
def type(self):
return self.__class__
@property
def type_name(self):
return self.TYPE_NAME
@property
def uri(self) -> str:
return self._artifact.uri
@uri.setter
def uri(self, uri: str) -> None:
self._artifact.uri = uri
@property
def name(self) -> str:
return self._artifact.name
@name.setter
def name(self, name: str) -> None:
self._artifact.name = name
@property
def runtime_artifact(self) -> pipeline_spec_pb2.RuntimeArtifact:
self._update_runtime_artifact()
return self._artifact
@runtime_artifact.setter
def runtime_artifact(self, artifact: pipeline_spec_pb2.RuntimeArtifact):
self._artifact = artifact
def serialize(self) -> str:
"""Serializes an Artifact to JSON dict format."""
self._update_runtime_artifact()
return json_format.MessageToJson(self._artifact, sort_keys=True)
@classmethod
def get_artifact_type(cls) -> str:
"""Gets the instance_schema according to the Python schema spec."""
result_map = {'title': cls.TYPE_NAME, 'type': 'object'}
return serialization_utils.yaml_dump(result_map)
@classmethod
def get_ir_type(cls) -> pipeline_spec_pb2.ArtifactTypeSchema:
return pipeline_spec_pb2.ArtifactTypeSchema(
instance_schema=cls.get_artifact_type())
@classmethod
def get_from_runtime_artifact(
cls, artifact: pipeline_spec_pb2.RuntimeArtifact) -> Any:
"""Deserializes an Artifact object from RuntimeArtifact message."""
instance_schema = yaml.safe_load(artifact.type.instance_schema)
type_name = instance_schema['title'][len('kfp.'):]
result = None
try:
artifact_cls = getattr(
importlib.import_module(KFP_ARTIFACT_ONTOLOGY_MODULE),
type_name)
result = artifact_cls()
except (AttributeError, ImportError, ValueError) as err:
logging.warning('Failed to instantiate Ontology Artifact:{} \
instance'.format(type_name))
if not result:
# Otherwise generate a generic Artifact object.
result = Artifact(instance_schema=artifact.type.instance_schema)
result.runtime_artifact = artifact
result.metadata = json_format.MessageToDict(artifact.metadata)
return result
@classmethod
def deserialize(cls, data: str) -> Any:
"""Deserializes an Artifact object from JSON dict."""
artifact = pipeline_spec_pb2.RuntimeArtifact()
json_format.Parse(data, artifact, ignore_unknown_fields=True)
return cls.get_from_runtime_artifact(artifact)