Added checkpoint support to high level client; added unit tests

This commit is contained in:
Stanislav Dimov 2023-02-03 20:53:34 +00:00
parent 1b6890603f
commit 91d4484684
No known key found for this signature in database
GPG Key ID: 52C3CE2B376F25D2
6 changed files with 301 additions and 8 deletions

View File

@ -3,6 +3,7 @@ from datetime import datetime
from .. import errors
from .. import utils
from ..constants import DEFAULT_DATA_CHUNK_SIZE
from ..models.checkpoints import Checkpoint
from ..types import CancellableStream
from ..types import ContainerConfig
from ..types import EndpointConfig
@ -1189,8 +1190,10 @@ class ContainerApiMixin:
Args:
container (str): The container to start
checkpoint (str): (Experimental) The checkpoint ID from which
to start. Default: None (do not start from a checkpoint)
checkpoint (:py:class:`docker.models.checkpoints.Checkpoint` or
str):
(Experimental) The checkpoint ID from which to start.
Default: None (do not start from a checkpoint)
checkpoint_dir (str): (Experimental) Custom directory in which to
search for checkpoints. Default: None (use default
checkpoint dir)
@ -1211,6 +1214,8 @@ class ContainerApiMixin:
"""
params = {}
if checkpoint:
if isinstance(checkpoint, Checkpoint):
checkpoint = checkpoint.id
params["checkpoint"] = checkpoint
if checkpoint_dir:
params['checkpoint-dir'] = checkpoint_dir

View File

@ -95,6 +95,10 @@ class ImageNotFound(NotFound):
pass
class CheckpointNotFound(NotFound):
pass
class InvalidVersion(DockerException):
pass

View File

@ -0,0 +1,130 @@
from ..errors import CheckpointNotFound
from .resource import Collection
from .resource import Model
class Checkpoint(Model):
""" (Experimental) Local representation of a checkpoint object. Detailed
configuration may be accessed through the :py:attr:`attrs` attribute.
Note that local attributes are cached; users may call :py:meth:`reload`
to query the Docker daemon for the current properties, causing
:py:attr:`attrs` to be refreshed.
"""
id_attribute = 'Name'
@property
def short_id(self):
"""
The ID of the object.
"""
return self.id
def remove(self):
"""
Remove this checkpoint. Similar to the
``docker checkpoint rm`` command.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.container_remove_checkpoint(
self.collection.container_id,
checkpoint=self.id,
checkpoint_dir=self.collection.checkpoint_dir,
)
class CheckpointCollection(Collection):
"""(Experimental)."""
model = Checkpoint
def __init__(self, container_id, checkpoint_dir=None, **kwargs):
#: The client pointing at the server that this collection of objects
#: is on.
super().__init__(**kwargs)
self.container_id = container_id
self.checkpoint_dir = checkpoint_dir
def create(self, checkpoint_id, **kwargs):
"""
Create a new container checkpoint. Similar to
``docker checkpoint create``.
Args:
checkpoint_id (str): The id (name) of the checkpoint
leave_running (bool): Determines if the container should be left
running after the checkpoint is created
Returns:
A :py:class:`Checkpoint` object.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
self.client.api.container_create_checkpoint(
self.container_id,
checkpoint=checkpoint_id,
checkpoint_dir=self.checkpoint_dir,
**kwargs,
)
return Checkpoint(
attrs={"Name": checkpoint_id},
client=self.client,
collection=self
)
def get(self, id):
"""
Get a container checkpoint by id (name).
Args:
id (str): The checkpoint id (name)
Returns:
A :py:class:`Checkpoint` object.
Raises:
:py:class:`docker.errors.NotFound`
If the checkpoint does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
checkpoints = self.list()
for checkpoint in checkpoints:
if checkpoint.id == id:
return checkpoint
raise CheckpointNotFound(
f"Checkpoint with id={id} does not exist"
f" in checkpoint_dir={self.checkpoint_dir}"
)
def list(self):
"""
List checkpoints. Similar to the ``docker checkpoint ls`` command.
Returns:
(list of :py:class:`Checkpoint`)
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.container_checkpoints(
self.container_id, checkpoint_dir=self.checkpoint_dir
)
return [self.prepare_model(checkpoint) for checkpoint in resp or []]
def prune(self):
"""
Remove all checkpoints in this collection.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
for checkpoint in self.list():
checkpoint.remove()

View File

@ -10,6 +10,7 @@ from ..errors import (
)
from ..types import HostConfig
from ..utils import version_gte
from .checkpoints import CheckpointCollection
from .images import Image
from .resource import Collection, Model
@ -261,6 +262,27 @@ class Container(Model):
return self.client.api.get_archive(self.id, path,
chunk_size, encode_stream)
def get_checkpoints(self, checkpoint_dir=None):
"""
Get a collection of all container checkpoints in a given directory.
Similar to the ``docker checkpoint ls`` command.
Args:
checkpoint_dir (str): Custom directory in which to search for
checkpoints. Default: None (use default checkpoint dir)
Returns:
:py:class:`CheckpointCollection`
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return CheckpointCollection(
container_id=self.id,
checkpoint_dir=checkpoint_dir,
client=self.client,
)
def kill(self, signal=None):
"""
Kill or send a signal to the container.

View File

@ -9,10 +9,12 @@ import pytest
from . import fake_api
from ..helpers import requires_api_version
from .api_test import (
BaseAPIClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS,
fake_inspect_container, url_base
)
from .api_test import BaseAPIClientTest
from .api_test import url_prefix
from .api_test import fake_request
from .api_test import DEFAULT_TIMEOUT_SECONDS
from .api_test import fake_inspect_container
from .api_test import url_base
def fake_inspect_container_tty(self, container):
@ -31,14 +33,14 @@ class StartContainerTest(BaseAPIClientTest):
def test_start_container_from_checkpoint(self):
self.client.start(fake_api.FAKE_CONTAINER_ID,
checkpoint="my-checkpoint",
checkpoint=fake_api.FAKE_CHECKPOINT_ID,
checkpoint_dir="/path/to/checkpoint/dir")
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID + '/start')
assert 'data' not in args[1]
assert args[1]["params"]["checkpoint"] == "my-checkpoint"
assert args[1]["params"]["checkpoint"] == fake_api.FAKE_CHECKPOINT_ID
assert args[1]["params"]["checkpoint-dir"] == "/path/to/checkpoint/dir"
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
@ -137,6 +139,110 @@ class StartContainerTest(BaseAPIClientTest):
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
class CheckpointContainerTest(BaseAPIClientTest):
def test_create_container_checkpoint(self):
self.client.container_create_checkpoint(
fake_api.FAKE_CONTAINER_ID,
fake_api.FAKE_CHECKPOINT_ID,
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID + '/checkpoints')
data = json.loads(args[1]["data"])
assert data["CheckpointID"] == fake_api.FAKE_CHECKPOINT_ID
assert data["Exit"] is True
assert "CheckpointDir" not in data
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
def test_create_container_checkpoint_custom_opts(self):
self.client.container_create_checkpoint(
fake_api.FAKE_CONTAINER_ID,
fake_api.FAKE_CHECKPOINT_ID,
fake_api.FAKE_CHECKPOINT_DIR,
leave_running=True
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID + '/checkpoints')
data = json.loads(args[1]["data"])
assert data["CheckpointID"] == fake_api.FAKE_CHECKPOINT_ID
assert data["Exit"] is False
assert data["CheckpointDir"] == fake_api.FAKE_CHECKPOINT_DIR
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
def test_remove_container_checkpoint(self):
self.client.container_remove_checkpoint(
fake_api.FAKE_CONTAINER_ID,
fake_api.FAKE_CHECKPOINT_ID,
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID +
'/checkpoints/' +
fake_api.FAKE_CHECKPOINT_ID)
assert "data" not in args[1]
assert "dir" not in args[1]["params"]
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
def test_remove_container_checkpoint_custom_opts(self):
self.client.container_remove_checkpoint(
fake_api.FAKE_CONTAINER_ID,
fake_api.FAKE_CHECKPOINT_ID,
fake_api.FAKE_CHECKPOINT_DIR,
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID +
'/checkpoints/' +
fake_api.FAKE_CHECKPOINT_ID)
assert "data" not in args[1]
assert args[1]["params"]["dir"] == fake_api.FAKE_CHECKPOINT_DIR
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
def test_container_checkpoints(self):
self.client.container_checkpoints(
fake_api.FAKE_CONTAINER_ID,
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID +
'/checkpoints')
assert "data" not in args[1]
assert "dir" not in args[1]["params"]
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
def test_container_checkpoints_custom_opts(self):
self.client.container_checkpoints(
fake_api.FAKE_CONTAINER_ID,
fake_api.FAKE_CHECKPOINT_DIR,
)
args = fake_request.call_args
assert args[0][1] == (url_prefix + 'containers/' +
fake_api.FAKE_CONTAINER_ID +
'/checkpoints')
assert "data" not in args[1]
assert args[1]["params"]["dir"] == fake_api.FAKE_CHECKPOINT_DIR
assert args[1]['timeout'] == DEFAULT_TIMEOUT_SECONDS
class CreateContainerTest(BaseAPIClientTest):
def test_create_container(self):
self.client.create_container('busybox', 'true')

View File

@ -19,6 +19,8 @@ FAKE_VOLUME_NAME = 'perfectcherryblossom'
FAKE_NODE_ID = '24ifsmvkjbyhk'
FAKE_SECRET_ID = 'epdyrw4tsi03xy3deu8g8ly6o'
FAKE_SECRET_NAME = 'super_secret'
FAKE_CHECKPOINT_ID = "my-checkpoint"
FAKE_CHECKPOINT_DIR = "/my-dir"
# Each method is prefixed with HTTP method (get, post...)
# for clarity and readability
@ -148,6 +150,24 @@ def post_fake_create_container():
return status_code, response
def post_fake_container_create_checkpoint():
status_code = 201
response = ""
return status_code, response
def get_fake_container_checkpoints():
status_code = 200
response = [{"Name": FAKE_CHECKPOINT_ID}]
return status_code, response
def delete_fake_container_remove_checkpoint():
status_code = 204
response = ""
return status_code, response
def get_fake_inspect_container(tty=False):
status_code = 200
response = {
@ -568,6 +588,12 @@ fake_responses = {
post_fake_update_container,
f'{prefix}/{CURRENT_VERSION}/containers/{FAKE_CONTAINER_ID}/exec':
post_fake_exec_create,
(f'{prefix}/{CURRENT_VERSION}/containers/{FAKE_CONTAINER_ID}/checkpoints', "POST"): # noqa: E501
post_fake_container_create_checkpoint,
(f'{prefix}/{CURRENT_VERSION}/containers/{FAKE_CONTAINER_ID}/checkpoints', "GET"): # noqa: E501
get_fake_container_checkpoints,
f'{prefix}/{CURRENT_VERSION}/containers/{FAKE_CONTAINER_ID}/checkpoints/{FAKE_CHECKPOINT_ID}': # noqa: E501
delete_fake_container_remove_checkpoint,
f'{prefix}/{CURRENT_VERSION}/exec/{FAKE_EXEC_ID}/start':
post_fake_exec_start,
f'{prefix}/{CURRENT_VERSION}/exec/{FAKE_EXEC_ID}/json':