mirror of https://github.com/docker/docker-py.git
Implement context management, lifecycle and unittests.
Signed-off-by: Anca Iordache <anca.iordache@docker.com>
This commit is contained in:
parent
f2e09ae632
commit
64fdb32ae8
|
@ -15,6 +15,6 @@ matrix:
|
|||
- env: TOXENV=flake8
|
||||
|
||||
install:
|
||||
- pip install tox
|
||||
- pip install tox==2.9.1
|
||||
script:
|
||||
- tox
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
version: '{branch}-{build}'
|
||||
|
||||
install:
|
||||
- "SET PATH=C:\\Python27-x64;C:\\Python27-x64\\Scripts;%PATH%"
|
||||
- "SET PATH=C:\\Python37-x64;C:\\Python37-x64\\Scripts;%PATH%"
|
||||
- "python --version"
|
||||
- "python -m pip install --upgrade pip"
|
||||
- "pip install tox==2.9.1"
|
||||
|
||||
# Build the binary after tests
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
# flake8: noqa
|
||||
from .api import APIClient
|
||||
from .client import DockerClient, from_env
|
||||
from .context import Context
|
||||
from .context import ContextAPI
|
||||
from .tls import TLSConfig
|
||||
from .version import version, version_info
|
||||
|
||||
__version__ = version
|
||||
|
|
|
@ -9,6 +9,18 @@ CONTAINER_LIMITS_KEYS = [
|
|||
'memory', 'memswap', 'cpushares', 'cpusetcpus'
|
||||
]
|
||||
|
||||
DEFAULT_HTTP_HOST = "127.0.0.1"
|
||||
DEFAULT_UNIX_SOCKET = "http+unix:///var/run/docker.sock"
|
||||
DEFAULT_NPIPE = 'npipe:////./pipe/docker_engine'
|
||||
|
||||
BYTE_UNITS = {
|
||||
'b': 1,
|
||||
'k': 1024,
|
||||
'm': 1024 * 1024,
|
||||
'g': 1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
|
||||
INSECURE_REGISTRY_DEPRECATION_WARNING = \
|
||||
'The `insecure_registry` argument to {} ' \
|
||||
'is deprecated and non-functional. Please remove it.'
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
# flake8: noqa
|
||||
from .context import Context
|
||||
from .api import ContextAPI
|
|
@ -0,0 +1,205 @@
|
|||
import json
|
||||
import os
|
||||
|
||||
from docker import errors
|
||||
from docker.context.config import get_meta_dir
|
||||
from docker.context.config import METAFILE
|
||||
from docker.context.config import get_current_context_name
|
||||
from docker.context.config import write_context_name_to_docker_config
|
||||
from docker.context import Context
|
||||
|
||||
|
||||
class ContextAPI(object):
|
||||
"""Context API.
|
||||
Contains methods for context management:
|
||||
create, list, remove, get, inspect.
|
||||
"""
|
||||
DEFAULT_CONTEXT = Context("default")
|
||||
|
||||
@classmethod
|
||||
def create_context(
|
||||
cls, name, orchestrator="swarm", host=None, tls_cfg=None,
|
||||
default_namespace=None, skip_tls_verify=False):
|
||||
"""Creates a new context.
|
||||
Returns:
|
||||
(Context): a Context object.
|
||||
Raises:
|
||||
:py:class:`docker.errors.MissingContextParameter`
|
||||
If a context name is not provided.
|
||||
:py:class:`docker.errors.ContextAlreadyExists`
|
||||
If a context with the name already exists.
|
||||
:py:class:`docker.errors.ContextException`
|
||||
If name is default.
|
||||
|
||||
Example:
|
||||
|
||||
>>> from docker.context import ContextAPI
|
||||
>>> ctx = ContextAPI.create_context(name='test')
|
||||
>>> print(ctx.Metadata)
|
||||
{
|
||||
"Name": "test",
|
||||
"Metadata": {
|
||||
"StackOrchestrator": "swarm"
|
||||
},
|
||||
"Endpoints": {
|
||||
"docker": {
|
||||
"Host": "unix:///var/run/docker.sock",
|
||||
"SkipTLSVerify": false
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
if not name:
|
||||
raise errors.MissingContextParameter("name")
|
||||
if name == "default":
|
||||
raise errors.ContextException(
|
||||
'"default" is a reserved context name')
|
||||
ctx = Context.load_context(name)
|
||||
if ctx:
|
||||
raise errors.ContextAlreadyExists(name)
|
||||
endpoint = "docker" if orchestrator == "swarm" else orchestrator
|
||||
ctx = Context(name, orchestrator)
|
||||
ctx.set_endpoint(
|
||||
endpoint, host, tls_cfg,
|
||||
skip_tls_verify=skip_tls_verify,
|
||||
def_namespace=default_namespace)
|
||||
ctx.save()
|
||||
return ctx
|
||||
|
||||
@classmethod
|
||||
def get_context(cls, name=None):
|
||||
"""Retrieves a context object.
|
||||
Args:
|
||||
name (str): The name of the context
|
||||
|
||||
Example:
|
||||
|
||||
>>> from docker.context import ContextAPI
|
||||
>>> ctx = ContextAPI.get_context(name='test')
|
||||
>>> print(ctx.Metadata)
|
||||
{
|
||||
"Name": "test",
|
||||
"Metadata": {
|
||||
"StackOrchestrator": "swarm"
|
||||
},
|
||||
"Endpoints": {
|
||||
"docker": {
|
||||
"Host": "unix:///var/run/docker.sock",
|
||||
"SkipTLSVerify": false
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
if not name:
|
||||
name = get_current_context_name()
|
||||
if name == "default":
|
||||
return cls.DEFAULT_CONTEXT
|
||||
return Context.load_context(name)
|
||||
|
||||
@classmethod
|
||||
def contexts(cls):
|
||||
"""Context list.
|
||||
Returns:
|
||||
(Context): List of context objects.
|
||||
Raises:
|
||||
:py:class:`docker.errors.APIError`
|
||||
If the server returns an error.
|
||||
"""
|
||||
names = []
|
||||
for dirname, dirnames, fnames in os.walk(get_meta_dir()):
|
||||
for filename in fnames + dirnames:
|
||||
if filename == METAFILE:
|
||||
try:
|
||||
data = json.load(
|
||||
open(os.path.join(dirname, filename), "r"))
|
||||
names.append(data["Name"])
|
||||
except Exception as e:
|
||||
raise errors.ContextException(
|
||||
"Failed to load metafile {}: {}".format(
|
||||
filename, e))
|
||||
|
||||
contexts = [cls.DEFAULT_CONTEXT]
|
||||
for name in names:
|
||||
contexts.append(Context.load_context(name))
|
||||
return contexts
|
||||
|
||||
@classmethod
|
||||
def get_current_context(cls):
|
||||
"""Get current context.
|
||||
Returns:
|
||||
(Context): current context object.
|
||||
"""
|
||||
return cls.get_context()
|
||||
|
||||
@classmethod
|
||||
def set_current_context(cls, name="default"):
|
||||
ctx = cls.get_context(name)
|
||||
if not ctx:
|
||||
raise errors.ContextNotFound(name)
|
||||
|
||||
err = write_context_name_to_docker_config(name)
|
||||
if err:
|
||||
raise errors.ContextException(
|
||||
'Failed to set current context: {}'.format(err))
|
||||
|
||||
@classmethod
|
||||
def remove_context(cls, name):
|
||||
"""Remove a context. Similar to the ``docker context rm`` command.
|
||||
|
||||
Args:
|
||||
name (str): The name of the context
|
||||
|
||||
Raises:
|
||||
:py:class:`docker.errors.MissingContextParameter`
|
||||
If a context name is not provided.
|
||||
:py:class:`docker.errors.ContextNotFound`
|
||||
If a context with the name does not exist.
|
||||
:py:class:`docker.errors.ContextException`
|
||||
If name is default.
|
||||
|
||||
Example:
|
||||
|
||||
>>> from docker.context import ContextAPI
|
||||
>>> ContextAPI.remove_context(name='test')
|
||||
>>>
|
||||
"""
|
||||
if not name:
|
||||
raise errors.MissingContextParameter("name")
|
||||
if name == "default":
|
||||
raise errors.ContextException(
|
||||
'context "default" cannot be removed')
|
||||
ctx = Context.load_context(name)
|
||||
if not ctx:
|
||||
raise errors.ContextNotFound(name)
|
||||
if name == get_current_context_name():
|
||||
write_context_name_to_docker_config(None)
|
||||
ctx.remove()
|
||||
|
||||
@classmethod
|
||||
def inspect_context(cls, name="default"):
|
||||
"""Remove a context. Similar to the ``docker context inspect`` command.
|
||||
|
||||
Args:
|
||||
name (str): The name of the context
|
||||
|
||||
Raises:
|
||||
:py:class:`docker.errors.MissingContextParameter`
|
||||
If a context name is not provided.
|
||||
:py:class:`docker.errors.ContextNotFound`
|
||||
If a context with the name does not exist.
|
||||
|
||||
Example:
|
||||
|
||||
>>> from docker.context import ContextAPI
|
||||
>>> ContextAPI.remove_context(name='test')
|
||||
>>>
|
||||
"""
|
||||
if not name:
|
||||
raise errors.MissingContextParameter("name")
|
||||
if name == "default":
|
||||
return cls.DEFAULT_CONTEXT()
|
||||
ctx = Context.load_context(name)
|
||||
if not ctx:
|
||||
raise errors.ContextNotFound(name)
|
||||
|
||||
return ctx()
|
|
@ -0,0 +1,81 @@
|
|||
import os
|
||||
import json
|
||||
import hashlib
|
||||
|
||||
from docker import utils
|
||||
from docker.constants import IS_WINDOWS_PLATFORM
|
||||
from docker.constants import DEFAULT_UNIX_SOCKET
|
||||
from docker.utils.config import find_config_file
|
||||
|
||||
METAFILE = "meta.json"
|
||||
|
||||
|
||||
def get_current_context_name():
|
||||
name = "default"
|
||||
docker_cfg_path = find_config_file()
|
||||
if docker_cfg_path:
|
||||
try:
|
||||
with open(docker_cfg_path, "r") as f:
|
||||
name = json.load(f).get("currentContext", "default")
|
||||
except Exception:
|
||||
return "default"
|
||||
return name
|
||||
|
||||
|
||||
def write_context_name_to_docker_config(name=None):
|
||||
if name == 'default':
|
||||
name = None
|
||||
docker_cfg_path = find_config_file()
|
||||
config = {}
|
||||
if docker_cfg_path:
|
||||
try:
|
||||
with open(docker_cfg_path, "r") as f:
|
||||
config = json.load(f)
|
||||
except Exception as e:
|
||||
return e
|
||||
current_context = config.get("currentContext", None)
|
||||
if current_context and not name:
|
||||
del config["currentContext"]
|
||||
elif name:
|
||||
config["currentContext"] = name
|
||||
else:
|
||||
return
|
||||
try:
|
||||
with open(docker_cfg_path, "w") as f:
|
||||
json.dump(config, f, indent=4)
|
||||
except Exception as e:
|
||||
return e
|
||||
|
||||
|
||||
def get_context_id(name):
|
||||
return hashlib.sha256(name.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def get_context_dir():
|
||||
return os.path.join(os.path.dirname(find_config_file() or ""), "contexts")
|
||||
|
||||
|
||||
def get_meta_dir(name=None):
|
||||
meta_dir = os.path.join(get_context_dir(), "meta")
|
||||
if name:
|
||||
return os.path.join(meta_dir, get_context_id(name))
|
||||
return meta_dir
|
||||
|
||||
|
||||
def get_meta_file(name):
|
||||
return os.path.join(get_meta_dir(name), METAFILE)
|
||||
|
||||
|
||||
def get_tls_dir(name=None, endpoint=""):
|
||||
context_dir = get_context_dir()
|
||||
if name:
|
||||
return os.path.join(context_dir, "tls", get_context_id(name), endpoint)
|
||||
return os.path.join(context_dir, "tls")
|
||||
|
||||
|
||||
def get_context_host(path=None):
|
||||
host = utils.parse_host(path, IS_WINDOWS_PLATFORM)
|
||||
if host == DEFAULT_UNIX_SOCKET:
|
||||
# remove http+ from default docker socket url
|
||||
return host.strip("http+")
|
||||
return host
|
|
@ -0,0 +1,208 @@
|
|||
import os
|
||||
import json
|
||||
from shutil import copyfile, rmtree
|
||||
from docker.tls import TLSConfig
|
||||
from docker.errors import ContextException
|
||||
from docker.context.config import get_meta_dir
|
||||
from docker.context.config import get_meta_file
|
||||
from docker.context.config import get_tls_dir
|
||||
from docker.context.config import get_context_host
|
||||
|
||||
|
||||
class Context:
|
||||
"""A context."""
|
||||
def __init__(self, name, orchestrator="swarm", host=None, endpoints=None):
|
||||
if not name:
|
||||
raise Exception("Name not provided")
|
||||
self.name = name
|
||||
self.orchestrator = orchestrator
|
||||
if not endpoints:
|
||||
default_endpoint = "docker" if (
|
||||
orchestrator == "swarm"
|
||||
) else orchestrator
|
||||
self.endpoints = {
|
||||
default_endpoint: {
|
||||
"Host": get_context_host(host),
|
||||
"SkipTLSVerify": False
|
||||
}
|
||||
}
|
||||
else:
|
||||
for k, v in endpoints.items():
|
||||
ekeys = v.keys()
|
||||
for param in ["Host", "SkipTLSVerify"]:
|
||||
if param not in ekeys:
|
||||
raise ContextException(
|
||||
"Missing parameter {} from endpoint {}".format(
|
||||
param, k))
|
||||
self.endpoints = endpoints
|
||||
|
||||
self.tls_cfg = {}
|
||||
self.meta_path = "IN MEMORY"
|
||||
self.tls_path = "IN MEMORY"
|
||||
|
||||
def set_endpoint(
|
||||
self, name="docker", host=None, tls_cfg=None,
|
||||
skip_tls_verify=False, def_namespace=None):
|
||||
self.endpoints[name] = {
|
||||
"Host": get_context_host(host),
|
||||
"SkipTLSVerify": skip_tls_verify
|
||||
}
|
||||
if def_namespace:
|
||||
self.endpoints[name]["DefaultNamespace"] = def_namespace
|
||||
|
||||
if tls_cfg:
|
||||
self.tls_cfg[name] = tls_cfg
|
||||
|
||||
def inspect(self):
|
||||
return self.__call__()
|
||||
|
||||
@classmethod
|
||||
def load_context(cls, name):
|
||||
name, orchestrator, endpoints = Context._load_meta(name)
|
||||
if name:
|
||||
instance = cls(name, orchestrator, endpoints=endpoints)
|
||||
instance._load_certs()
|
||||
instance.meta_path = get_meta_dir(name)
|
||||
return instance
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _load_meta(cls, name):
|
||||
metadata = {}
|
||||
meta_file = get_meta_file(name)
|
||||
if os.path.isfile(meta_file):
|
||||
with open(meta_file) as f:
|
||||
try:
|
||||
with open(meta_file) as f:
|
||||
metadata = json.load(f)
|
||||
for k, v in metadata["Endpoints"].items():
|
||||
metadata["Endpoints"][k]["SkipTLSVerify"] = bool(
|
||||
v["SkipTLSVerify"])
|
||||
except (IOError, KeyError, ValueError) as e:
|
||||
# unknown format
|
||||
raise Exception("""Detected corrupted meta file for
|
||||
context {} : {}""".format(name, e))
|
||||
|
||||
return (
|
||||
metadata["Name"], metadata["Metadata"]["StackOrchestrator"],
|
||||
metadata["Endpoints"])
|
||||
return None, None, None
|
||||
|
||||
def _load_certs(self):
|
||||
certs = {}
|
||||
tls_dir = get_tls_dir(self.name)
|
||||
for endpoint in self.endpoints.keys():
|
||||
if not os.path.isdir(os.path.join(tls_dir, endpoint)):
|
||||
continue
|
||||
ca_cert = None
|
||||
cert = None
|
||||
key = None
|
||||
for filename in os.listdir(os.path.join(tls_dir, endpoint)):
|
||||
if filename.startswith("ca"):
|
||||
ca_cert = os.path.join(tls_dir, endpoint, filename)
|
||||
elif filename.startswith("cert"):
|
||||
cert = os.path.join(tls_dir, endpoint, filename)
|
||||
elif filename.startswith("key"):
|
||||
key = os.path.join(tls_dir, endpoint, filename)
|
||||
if all([ca_cert, cert, key]):
|
||||
certs[endpoint] = TLSConfig(
|
||||
client_cert=(cert, key), ca_cert=ca_cert)
|
||||
self.tls_cfg = certs
|
||||
self.tls_path = tls_dir
|
||||
|
||||
def save(self):
|
||||
meta_dir = get_meta_dir(self.name)
|
||||
if not os.path.isdir(meta_dir):
|
||||
os.makedirs(meta_dir)
|
||||
with open(get_meta_file(self.name), "w") as f:
|
||||
f.write(json.dumps(self.Metadata))
|
||||
|
||||
tls_dir = get_tls_dir(self.name)
|
||||
for endpoint, tls in self.tls_cfg.items():
|
||||
if not os.path.isdir(os.path.join(tls_dir, endpoint)):
|
||||
os.makedirs(os.path.join(tls_dir, endpoint))
|
||||
|
||||
ca_file = tls.ca_cert
|
||||
if ca_file:
|
||||
copyfile(ca_file, os.path.join(
|
||||
tls_dir, endpoint, os.path.basename(ca_file)))
|
||||
|
||||
if tls.cert:
|
||||
cert_file, key_file = tls.cert
|
||||
copyfile(cert_file, os.path.join(
|
||||
tls_dir, endpoint, os.path.basename(cert_file)))
|
||||
copyfile(key_file, os.path.join(
|
||||
tls_dir, endpoint, os.path.basename(key_file)))
|
||||
|
||||
self.meta_path = get_meta_dir(self.name)
|
||||
self.tls_path = get_tls_dir(self.name)
|
||||
|
||||
def remove(self):
|
||||
if os.path.isdir(self.meta_path):
|
||||
rmtree(self.meta_path)
|
||||
if os.path.isdir(self.tls_path):
|
||||
rmtree(self.tls_path)
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s: '%s'>" % (self.__class__.__name__, self.name)
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(self.__call__(), indent=2)
|
||||
|
||||
def __call__(self):
|
||||
result = self.Metadata
|
||||
result.update(self.TLSMaterial)
|
||||
result.update(self.Storage)
|
||||
return result
|
||||
|
||||
@property
|
||||
def Name(self):
|
||||
return self.name
|
||||
|
||||
@property
|
||||
def Host(self):
|
||||
if self.orchestrator == "swarm":
|
||||
return self.endpoints["docker"]["Host"]
|
||||
return self.endpoints[self.orchestrator]["Host"]
|
||||
|
||||
@property
|
||||
def Orchestrator(self):
|
||||
return self.orchestrator
|
||||
|
||||
@property
|
||||
def Metadata(self):
|
||||
return {
|
||||
"Name": self.name,
|
||||
"Metadata": {
|
||||
"StackOrchestrator": self.orchestrator
|
||||
},
|
||||
"Endpoints": self.endpoints
|
||||
}
|
||||
|
||||
@property
|
||||
def TLSConfig(self):
|
||||
key = self.orchestrator
|
||||
if key == "swarm":
|
||||
key = "docker"
|
||||
if key in self.tls_cfg.keys():
|
||||
return self.tls_cfg[key]
|
||||
return None
|
||||
|
||||
@property
|
||||
def TLSMaterial(self):
|
||||
certs = {}
|
||||
for endpoint, tls in self.tls_cfg.items():
|
||||
cert, key = tls.cert
|
||||
certs[endpoint] = list(
|
||||
map(os.path.basename, [tls.ca_cert, cert, key]))
|
||||
return {
|
||||
"TLSMaterial": certs
|
||||
}
|
||||
|
||||
@property
|
||||
def Storage(self):
|
||||
return {
|
||||
"Storage": {
|
||||
"MetadataPath": self.meta_path,
|
||||
"TLSPath": self.tls_path
|
||||
}}
|
|
@ -163,3 +163,35 @@ def create_unexpected_kwargs_error(name, kwargs):
|
|||
text.append("got unexpected keyword arguments ")
|
||||
text.append(', '.join(quoted_kwargs))
|
||||
return TypeError(''.join(text))
|
||||
|
||||
|
||||
class MissingContextParameter(DockerException):
|
||||
def __init__(self, param):
|
||||
self.param = param
|
||||
|
||||
def __str__(self):
|
||||
return ("missing parameter: {}".format(self.param))
|
||||
|
||||
|
||||
class ContextAlreadyExists(DockerException):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __str__(self):
|
||||
return ("context {} already exists".format(self.name))
|
||||
|
||||
|
||||
class ContextException(DockerException):
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def __str__(self):
|
||||
return (self.msg)
|
||||
|
||||
|
||||
class ContextNotFound(DockerException):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __str__(self):
|
||||
return ("context '{}' not found".format(self.name))
|
||||
|
|
|
@ -11,6 +11,10 @@ import six
|
|||
|
||||
from .. import errors
|
||||
from .. import tls
|
||||
from ..constants import DEFAULT_HTTP_HOST
|
||||
from ..constants import DEFAULT_UNIX_SOCKET
|
||||
from ..constants import DEFAULT_NPIPE
|
||||
from ..constants import BYTE_UNITS
|
||||
|
||||
if six.PY2:
|
||||
from urllib import splitnport
|
||||
|
@ -18,17 +22,6 @@ if six.PY2:
|
|||
else:
|
||||
from urllib.parse import splitnport, urlparse
|
||||
|
||||
DEFAULT_HTTP_HOST = "127.0.0.1"
|
||||
DEFAULT_UNIX_SOCKET = "http+unix:///var/run/docker.sock"
|
||||
DEFAULT_NPIPE = 'npipe:////./pipe/docker_engine'
|
||||
|
||||
BYTE_UNITS = {
|
||||
'b': 1,
|
||||
'k': 1024,
|
||||
'm': 1024 * 1024,
|
||||
'g': 1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
|
||||
def create_ipam_pool(*args, **kwargs):
|
||||
raise errors.DeprecatedMethod(
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
setuptools==44.0.0 # last version with python 2.7 support
|
||||
coverage==4.5.2
|
||||
flake8==3.6.0
|
||||
mock==1.0.1
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
import os
|
||||
import tempfile
|
||||
import pytest
|
||||
from docker import errors
|
||||
from docker.context import ContextAPI
|
||||
from docker.tls import TLSConfig
|
||||
from .base import BaseAPIIntegrationTest
|
||||
|
||||
|
||||
class ContextLifecycleTest(BaseAPIIntegrationTest):
|
||||
def test_lifecycle(self):
|
||||
assert ContextAPI.get_context().Name == "default"
|
||||
assert not ContextAPI.get_context("test")
|
||||
assert ContextAPI.get_current_context().Name == "default"
|
||||
|
||||
dirpath = tempfile.mkdtemp()
|
||||
ca = tempfile.NamedTemporaryFile(
|
||||
prefix=os.path.join(dirpath, "ca.pem"), mode="r")
|
||||
cert = tempfile.NamedTemporaryFile(
|
||||
prefix=os.path.join(dirpath, "cert.pem"), mode="r")
|
||||
key = tempfile.NamedTemporaryFile(
|
||||
prefix=os.path.join(dirpath, "key.pem"), mode="r")
|
||||
|
||||
# create context 'test
|
||||
docker_tls = TLSConfig(
|
||||
client_cert=(cert.name, key.name),
|
||||
ca_cert=ca.name)
|
||||
ContextAPI.create_context(
|
||||
"test", tls_cfg=docker_tls)
|
||||
|
||||
# check for a context 'test' in the context store
|
||||
assert any([ctx.Name == "test" for ctx in ContextAPI.contexts()])
|
||||
# retrieve a context object for 'test'
|
||||
assert ContextAPI.get_context("test")
|
||||
# remove context
|
||||
ContextAPI.remove_context("test")
|
||||
with pytest.raises(errors.ContextNotFound):
|
||||
ContextAPI.inspect_context("test")
|
||||
# check there is no 'test' context in store
|
||||
assert not ContextAPI.get_context("test")
|
||||
|
||||
ca.close()
|
||||
key.close()
|
||||
cert.close()
|
||||
|
||||
def test_context_remove(self):
|
||||
ContextAPI.create_context("test")
|
||||
assert ContextAPI.inspect_context("test")["Name"] == "test"
|
||||
|
||||
ContextAPI.remove_context("test")
|
||||
with pytest.raises(errors.ContextNotFound):
|
||||
ContextAPI.inspect_context("test")
|
|
@ -0,0 +1,45 @@
|
|||
import unittest
|
||||
import docker
|
||||
import pytest
|
||||
from docker.constants import DEFAULT_UNIX_SOCKET
|
||||
from docker.constants import DEFAULT_NPIPE
|
||||
from docker.constants import IS_WINDOWS_PLATFORM
|
||||
from docker.context import ContextAPI, Context
|
||||
|
||||
|
||||
class BaseContextTest(unittest.TestCase):
|
||||
@pytest.mark.skipif(
|
||||
IS_WINDOWS_PLATFORM, reason='Linux specific path check'
|
||||
)
|
||||
def test_url_compatibility_on_linux(self):
|
||||
c = Context("test")
|
||||
assert c.Host == DEFAULT_UNIX_SOCKET.strip("http+")
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not IS_WINDOWS_PLATFORM, reason='Windows specific path check'
|
||||
)
|
||||
def test_url_compatibility_on_windows(self):
|
||||
c = Context("test")
|
||||
assert c.Host == DEFAULT_NPIPE
|
||||
|
||||
def test_fail_on_default_context_create(self):
|
||||
with pytest.raises(docker.errors.ContextException):
|
||||
ContextAPI.create_context("default")
|
||||
|
||||
def test_default_in_context_list(self):
|
||||
found = False
|
||||
ctx = ContextAPI.contexts()
|
||||
for c in ctx:
|
||||
if c.Name == "default":
|
||||
found = True
|
||||
assert found is True
|
||||
|
||||
def test_get_current_context(self):
|
||||
assert ContextAPI.get_current_context().Name == "default"
|
||||
|
||||
def test_context_inspect_without_params(self):
|
||||
ctx = ContextAPI.inspect_context()
|
||||
assert ctx["Name"] == "default"
|
||||
assert ctx["Metadata"]["StackOrchestrator"] == "swarm"
|
||||
assert ctx["Endpoints"]["docker"]["Host"] in [
|
||||
DEFAULT_NPIPE, DEFAULT_UNIX_SOCKET.strip("http+")]
|
|
@ -101,17 +101,17 @@ class APIErrorTest(unittest.TestCase):
|
|||
assert err.is_error() is True
|
||||
|
||||
def test_create_error_from_exception(self):
|
||||
resp = requests.Response()
|
||||
resp.status_code = 500
|
||||
err = APIError('')
|
||||
resp = requests.Response()
|
||||
resp.status_code = 500
|
||||
err = APIError('')
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
try:
|
||||
create_api_error_from_http_exception(e)
|
||||
except APIError as e:
|
||||
err = e
|
||||
assert err.is_server_error() is True
|
||||
create_api_error_from_http_exception(e)
|
||||
except APIError as e:
|
||||
err = e
|
||||
assert err.is_server_error() is True
|
||||
|
||||
|
||||
class ContainerErrorTest(unittest.TestCase):
|
||||
|
|
Loading…
Reference in New Issue