62 lines
2.0 KiB
Python
62 lines
2.0 KiB
Python
import dataclasses
|
|
import datetime
|
|
import unittest
|
|
|
|
from google.protobuf import duration_pb2 as durationpb
|
|
from google.protobuf import json_format
|
|
|
|
from crossplane.function import logging, resource, response
|
|
from crossplane.function.proto.v1beta1 import run_function_pb2 as fnv1beta1
|
|
|
|
|
|
class TestResponse(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
logging.configure(level=logging.Level.DISABLED)
|
|
|
|
def test_to(self) -> None:
|
|
@dataclasses.dataclass
|
|
class TestCase:
|
|
reason: str
|
|
req: fnv1beta1.RunFunctionRequest
|
|
ttl: datetime.timedelta
|
|
want: fnv1beta1.RunFunctionResponse
|
|
|
|
cases = [
|
|
TestCase(
|
|
reason="Tag, desired, and context should be copied.",
|
|
req=fnv1beta1.RunFunctionRequest(
|
|
meta=fnv1beta1.RequestMeta(tag="hi"),
|
|
desired=fnv1beta1.State(
|
|
resources={
|
|
"ready-composed-resource": fnv1beta1.Resource(),
|
|
}
|
|
),
|
|
context=resource.dict_to_struct({"cool-key": "cool-value"}),
|
|
),
|
|
ttl=datetime.timedelta(minutes=10),
|
|
want=fnv1beta1.RunFunctionResponse(
|
|
meta=fnv1beta1.ResponseMeta(
|
|
tag="hi", ttl=durationpb.Duration(seconds=60 * 10)
|
|
),
|
|
desired=fnv1beta1.State(
|
|
resources={
|
|
"ready-composed-resource": fnv1beta1.Resource(),
|
|
}
|
|
),
|
|
context=resource.dict_to_struct({"cool-key": "cool-value"}),
|
|
),
|
|
),
|
|
]
|
|
|
|
for case in cases:
|
|
got = response.to(case.req, case.ttl)
|
|
self.assertEqual(
|
|
json_format.MessageToJson(case.want),
|
|
json_format.MessageToJson(got),
|
|
"-want, +got",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|