This commit is contained in:
Khushiyant 2025-03-07 17:37:52 +00:00 committed by GitHub
commit d62dceaded
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 430 additions and 5 deletions

View File

@ -31,12 +31,11 @@ class VolumeApiMixin:
params = {
'filters': utils.convert_filters(filters) if filters else None
}
}
url = self._url('/volumes')
return self._result(self._get(url, params=params), True)
def create_volume(self, name=None, driver=None, driver_opts=None,
labels=None):
def create_volume(self, name=None, driver=None, driver_opts=None, labels=None, cluster_volume_spec=None):
"""
Create and register a named volume
@ -83,11 +82,18 @@ class VolumeApiMixin:
if utils.compare_version('1.23', self._version) < 0:
raise errors.InvalidVersion(
'volume labels were introduced in API 1.23'
)
)
if not isinstance(labels, dict):
raise TypeError('labels must be a dictionary')
data["Labels"] = labels
if cluster_volume_spec is not None:
if utils.compare_version("1.42", self._version) < 0:
raise errors.InvalidVersion(
"cluster volume spec was introduced in API 1.45"
)
data["ClusterVolumeSpec"] = cluster_volume_spec
return self._result(self._post_json(url, data=data), True)
def inspect_volume(self, name):

View File

@ -22,3 +22,10 @@ from .services import (
UpdateConfig,
)
from .swarm import SwarmExternalCA, SwarmSpec
from .volumes import (
AccessibilityRequirement,
AccessMode,
CapacityRange,
ClusterVolumeSpec,
Secret,
)

189
docker/types/volumes.py Normal file
View File

@ -0,0 +1,189 @@
from ..types import Mount
from .base import DictType
def access_mode_type_error(param, param_value, expected):
return TypeError(
f"Invalid type for {param} param: expected {expected} "
f"but found {type(param_value)}"
)
class CapacityRange(DictType):
def __init__(self, **kwargs):
limit_bytes = kwargs.get("limit_bytes", kwargs.get("LimitBytes"))
required_bytes = kwargs.get("required_bytes", kwargs.get("RequiredBytes"))
if limit_bytes is not None:
if not isinstance(limit_bytes, int):
raise access_mode_type_error("limit_bytes", limit_bytes, "int")
if required_bytes is not None:
if not isinstance(required_bytes, int):
raise access_mode_type_error("required_bytes", required_bytes, "int")
super().__init__({"RequiredBytes": required_bytes, "LimitBytes": limit_bytes})
@property
def limit_bytes(self):
return self["LimitBytes"]
@property
def required_bytes(self):
return self["RequiredBytes"]
@limit_bytes.setter
def limit_bytes(self, value):
if not isinstance(value, int):
raise access_mode_type_error("limit_bytes", value, "int")
self["LimitBytes"] = value
@required_bytes.setter
def required_bytes(self, value):
if not isinstance(value, int):
raise access_mode_type_error("required_bytes", value, "int")
self["RequiredBytes"]
class Secret(DictType):
def __init__(self, **kwargs):
key = kwargs.get("key", kwargs.get("Key"))
secret = kwargs.get("secret", kwargs.get("Secret"))
if key is not None:
if not isinstance(key, str):
raise access_mode_type_error("key", key, "str")
if secret is not None:
if not isinstance(secret, str):
raise access_mode_type_error("secret", secret, "str")
super().__init__({"Key": key, "Secret": secret})
@property
def key(self):
return self["Key"]
@property
def secret(self):
return self["Secret"]
@key.setter
def key(self, value):
if not isinstance(value, str):
raise access_mode_type_error("key", value, "str")
self["Key"] = value
@secret.setter
def secret(self, value):
if not isinstance(value, str):
raise access_mode_type_error("secret", value, "str")
self["Secret"]
class AccessibilityRequirement(DictType):
def __init__(self, **kwargs):
requisite = kwargs.get("requisite", kwargs.get("Requisite"))
preferred = kwargs.get("preferred", kwargs.get("Preferred"))
if requisite is not None:
if not isinstance(requisite, list):
raise access_mode_type_error("requisite", requisite, "list")
self["Requisite"] = requisite
if preferred is not None:
if not isinstance(preferred, list):
raise access_mode_type_error("preferred", preferred, "list")
self["Preferred"] = preferred
super().__init__({"Requisite": requisite, "Preferred": preferred})
@property
def requisite(self):
return self["Requisite"]
@property
def preferred(self):
return self["Preferred"]
@requisite.setter
def requisite(self, value):
if not isinstance(value, list):
raise access_mode_type_error("requisite", value, "list")
self["Requisite"] = value
@preferred.setter
def preferred(self, value):
if not isinstance(value, list):
raise access_mode_type_error("preferred", value, "list")
self["Preferred"] = value
class AccessMode(dict):
def __init__(
self,
scope=None,
sharing=None,
mount_volume=None,
availabilty=None,
secrets=None,
accessibility_requirements=None,
capacity_range=None,
):
if scope is not None:
if not isinstance(scope, str):
raise access_mode_type_error("scope", scope, "str")
self["Scope"] = scope
if sharing is not None:
if not isinstance(sharing, str):
raise access_mode_type_error("sharing", sharing, "str")
self["Sharing"] = sharing
if mount_volume is not None:
if not isinstance(mount_volume, str):
raise access_mode_type_error("mount_volume", mount_volume, "str")
self["MountVolume"] = Mount.parse_mount_string(mount_volume)
if availabilty is not None:
if not isinstance(availabilty, str):
raise access_mode_type_error("availabilty", availabilty, "str")
self["Availabilty"] = availabilty
if secrets is not None:
if not isinstance(secrets, list):
raise access_mode_type_error("secrets", secrets, "list")
self["Secrets"] = []
for secret in secrets:
if not isinstance(secret, Secret):
secret = Secret(**secret)
self["Secrets"].append(secret)
if capacity_range is not None:
if not isinstance(capacity_range, CapacityRange):
capacity_range = CapacityRange(**capacity_range)
self["CapacityRange"] = capacity_range
if accessibility_requirements is not None:
if not isinstance(accessibility_requirements, AccessibilityRequirement):
accessibility_requirements = AccessibilityRequirement(
**accessibility_requirements
)
self["AccessibilityRequirements"] = accessibility_requirements
class ClusterVolumeSpec(dict):
def __init__(self, group=None, access_mode=None):
if group:
self["Group"] = group
if access_mode:
if not isinstance(access_mode, AccessMode):
raise TypeError("access_mode must be a AccessMode")
self["AccessMode"] = access_mode
@property
def group(self):
return self["Group"]
@property
def access_mode(self):
return self["AccessMode"]

View File

@ -2,11 +2,27 @@ import pytest
import docker
from ..helpers import requires_api_version
from ..helpers import force_leave_swarm, requires_api_version
from .base import BaseAPIIntegrationTest
class TestVolumes(BaseAPIIntegrationTest):
def setUp(self):
super().setUp()
force_leave_swarm(self.client)
self._unlock_key = None
def tearDown(self):
try:
if self._unlock_key:
self.client.unlock_swarm(self._unlock_key)
except docker.errors.APIError:
pass
force_leave_swarm(self.client)
super().tearDown()
def test_create_volume(self):
name = 'perfectcherryblossom'
self.tmp_volumes.append(name)
@ -73,3 +89,31 @@ class TestVolumes(BaseAPIIntegrationTest):
name = 'shootthebullet'
with pytest.raises(docker.errors.NotFound):
self.client.remove_volume(name)
def test_create_volume_with_cluster_volume(self):
name = "perfectcherryblossom"
self.init_swarm()
spec = docker.types.ClusterVolumeSpec(
group="group_test",
access_mode=docker.types.AccessMode(
scope="multi",
sharing="readonly",
mount_volume="mount_volume",
availabilty="active",
secrets=[],
accessibility_requirements={},
capacity_range={},
),
)
result = self.client.create_volume(
name, driver="local", cluster_volume_spec=spec
)
assert "Name" in result
assert result["Name"] == name
assert "Driver" in result
assert result["Driver"] == "local"
assert "ClusterVolume" in result
assert result["ClusterVolume"]["Spec"]["Group"] == "group_test"
assert "AccessMode" in result["ClusterVolume"]["Spec"]

View File

@ -6,6 +6,10 @@ import pytest
from docker.constants import DEFAULT_DOCKER_API_VERSION
from docker.errors import InvalidArgument, InvalidVersion
from docker.types import (
AccessibilityRequirement,
AccessMode,
CapacityRange,
ClusterVolumeSpec,
ContainerSpec,
EndpointConfig,
HostConfig,
@ -13,6 +17,7 @@ from docker.types import (
IPAMPool,
LogConfig,
Mount,
Secret,
ServiceMode,
Ulimit,
)
@ -491,3 +496,177 @@ class ServicePortsTest(unittest.TestCase):
} in converted_ports
assert len(converted_ports) == 3
class ClusterVolumeSpecTest(unittest.TestCase):
def test_cluster_volume_spec_with_fully_populated_access_mode(self):
secrets = [
Secret(key="api_key1", secret="secret1"),
Secret(key="api_key2", secret="secret2"),
]
capacity_range = CapacityRange(limit_bytes=10240, required_bytes=1024)
accessibility_requirements = AccessibilityRequirement(
requisite=["zone1", "zone2"], preferred=["zone1"]
)
access_mode = AccessMode(
scope="global",
sharing="readonly",
mount_volume="nfs",
availabilty="high",
secrets=secrets,
capacity_range=capacity_range,
accessibility_requirements=accessibility_requirements,
)
cluster_spec = ClusterVolumeSpec(group="production", access_mode=access_mode)
self.assertEqual(cluster_spec["Group"], "production")
self.assertIs(cluster_spec["AccessMode"], access_mode)
self.assertEqual(cluster_spec["AccessMode"]["Scope"], "global")
self.assertEqual(cluster_spec["AccessMode"]["Sharing"], "readonly")
self.assertEqual(cluster_spec["AccessMode"]["MountVolume"]['Target'], "nfs")
self.assertEqual(cluster_spec["AccessMode"]["Availabilty"], "high")
self.assertEqual(len(cluster_spec["AccessMode"]["Secrets"]), 2)
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][0]["Key"], "api_key1")
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][0]["Secret"], "secret1")
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][1]["Key"], "api_key2")
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][1]["Secret"], "secret2")
self.assertEqual(
cluster_spec["AccessMode"]["CapacityRange"]["LimitBytes"], 10240
)
self.assertEqual(
cluster_spec["AccessMode"]["CapacityRange"]["RequiredBytes"], 1024
)
self.assertEqual(
cluster_spec["AccessMode"]["AccessibilityRequirements"]["Requisite"],
["zone1", "zone2"],
)
self.assertEqual(
cluster_spec["AccessMode"]["AccessibilityRequirements"]["Preferred"],
["zone1"],
)
def test_cluster_volume_spec_with_dict_based_access_mode(self):
access_mode = AccessMode(
scope="team",
sharing="readwrite",
secrets=[
{"key": "db_user", "secret": "password123"},
{"key": "api_token", "secret": "abc123xyz"},
],
capacity_range={"limit_bytes": 20480, "required_bytes": 2048},
accessibility_requirements={
"requisite": ["datacenter1"],
"preferred": ["rack1", "rack2"],
},
)
cluster_spec = ClusterVolumeSpec(group="development", access_mode=access_mode)
self.assertEqual(cluster_spec["Group"], "development")
self.assertIsInstance(cluster_spec["AccessMode"]["Secrets"][0], Secret)
self.assertIsInstance(cluster_spec["AccessMode"]["Secrets"][1], Secret)
self.assertIsInstance(
cluster_spec["AccessMode"]["CapacityRange"], CapacityRange
)
self.assertIsInstance(
cluster_spec["AccessMode"]["AccessibilityRequirements"],
AccessibilityRequirement,
)
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][0].key, "db_user")
self.assertEqual(
cluster_spec["AccessMode"]["Secrets"][0].secret, "password123"
)
self.assertEqual(
cluster_spec["AccessMode"]["CapacityRange"].limit_bytes, 20480
)
self.assertEqual(
cluster_spec["AccessMode"]["AccessibilityRequirements"].requisite,
["datacenter1"],
)
def test_cluster_volume_spec_with_minimal_access_mode(self):
access_mode = AccessMode(scope="local", sharing="exclusive")
cluster_spec = ClusterVolumeSpec(group="testing", access_mode=access_mode)
self.assertEqual(cluster_spec["Group"], "testing")
self.assertEqual(cluster_spec["AccessMode"]["Scope"], "local")
self.assertEqual(cluster_spec["AccessMode"]["Sharing"], "exclusive")
self.assertNotIn("Secrets", cluster_spec["AccessMode"])
self.assertNotIn("CapacityRange", cluster_spec["AccessMode"])
self.assertNotIn("AccessibilityRequirements", cluster_spec["AccessMode"])
def test_cluster_volume_spec_with_mixed_nested_objects(self):
access_mode = AccessMode(
scope="container",
secrets=[Secret(key="api_key", secret="secret_value")],
capacity_range=CapacityRange(limit_bytes=5120),
)
cluster_spec = ClusterVolumeSpec(group="staging", access_mode=access_mode)
self.assertEqual(cluster_spec["Group"], "staging")
self.assertEqual(cluster_spec["AccessMode"]["Scope"], "container")
self.assertIn("Secrets", cluster_spec["AccessMode"])
self.assertEqual(len(cluster_spec["AccessMode"]["Secrets"]), 1)
self.assertEqual(cluster_spec["AccessMode"]["Secrets"][0].key, "api_key")
self.assertIn("CapacityRange", cluster_spec["AccessMode"])
self.assertEqual(cluster_spec["AccessMode"]["CapacityRange"].limit_bytes, 5120)
self.assertIsNone(cluster_spec["AccessMode"]["CapacityRange"].required_bytes)
self.assertNotIn("AccessibilityRequirements", cluster_spec["AccessMode"])
def test_cluster_volume_spec_with_pascal_case_input(self):
access_mode = AccessMode(
scope="namespace",
capacity_range={"LimitBytes": 8192, "RequiredBytes": 4096},
accessibility_requirements={"Requisite": ["zone3"], "Preferred": ["zone3"]},
)
cluster_spec = ClusterVolumeSpec(group="utility", access_mode=access_mode)
self.assertEqual(cluster_spec["Group"], "utility")
self.assertEqual(cluster_spec["AccessMode"]["CapacityRange"].limit_bytes, 8192)
self.assertEqual(
cluster_spec["AccessMode"]["CapacityRange"].required_bytes, 4096
)
self.assertEqual(
cluster_spec["AccessMode"]["AccessibilityRequirements"].requisite,
["zone3"],
)
self.assertEqual(
cluster_spec["AccessMode"]["AccessibilityRequirements"].preferred,
["zone3"],
)
def test_cluster_volume_spec_only_group(self):
cluster_spec = ClusterVolumeSpec(group="backup")
self.assertEqual(cluster_spec["Group"], "backup")
self.assertNotIn("access_mode", cluster_spec)
def test_cluster_volume_spec_only_access_mode(self):
access_mode = AccessMode(scope="volume")
cluster_spec = ClusterVolumeSpec(access_mode=access_mode)
self.assertNotIn("Group", cluster_spec)
self.assertEqual(cluster_spec["AccessMode"]["Scope"], "volume")
def test_access_mode_direct_creation_in_cluster_volume_spec(self):
with self.assertRaises(TypeError):
ClusterVolumeSpec(
group="direct", access_mode={"Scope": "direct", "Sharing": "shared"}
)