Merge pull request #2506 from ulyssessouza/4.2.0-release

4.2.0 release
This commit is contained in:
Ulysses Souza 2020-02-06 11:43:03 +01:00 committed by GitHub
commit 1d1532f0be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 720 additions and 39 deletions

View File

@ -15,6 +15,6 @@ matrix:
- env: TOXENV=flake8
install:
- pip install tox
- pip install tox==2.9.1
script:
- tox

2
Jenkinsfile vendored
View File

@ -32,7 +32,7 @@ def buildImages = { ->
def getDockerVersions = { ->
def dockerVersions = ["17.06.2-ce"]
wrappedNode(label: "ubuntu && !zfs") {
wrappedNode(label: "ubuntu && !zfs && amd64") {
def result = sh(script: """docker run --rm \\
--entrypoint=python \\
${imageNamePy3} \\

View File

@ -1,8 +1,9 @@
version: '{branch}-{build}'
install:
- "SET PATH=C:\\Python27-x64;C:\\Python27-x64\\Scripts;%PATH%"
- "SET PATH=C:\\Python37-x64;C:\\Python37-x64\\Scripts;%PATH%"
- "python --version"
- "python -m pip install --upgrade pip"
- "pip install tox==2.9.1"
# Build the binary after tests

View File

@ -1,6 +1,9 @@
# flake8: noqa
from .api import APIClient
from .client import DockerClient, from_env
from .context import Context
from .context import ContextAPI
from .tls import TLSConfig
from .version import version, version_info
__version__ = version

View File

@ -9,6 +9,18 @@ CONTAINER_LIMITS_KEYS = [
'memory', 'memswap', 'cpushares', 'cpusetcpus'
]
DEFAULT_HTTP_HOST = "127.0.0.1"
DEFAULT_UNIX_SOCKET = "http+unix:///var/run/docker.sock"
DEFAULT_NPIPE = 'npipe:////./pipe/docker_engine'
BYTE_UNITS = {
'b': 1,
'k': 1024,
'm': 1024 * 1024,
'g': 1024 * 1024 * 1024
}
INSECURE_REGISTRY_DEPRECATION_WARNING = \
'The `insecure_registry` argument to {} ' \
'is deprecated and non-functional. Please remove it.'

View File

@ -0,0 +1,3 @@
# flake8: noqa
from .context import Context
from .api import ContextAPI

205
docker/context/api.py Normal file
View File

@ -0,0 +1,205 @@
import json
import os
from docker import errors
from docker.context.config import get_meta_dir
from docker.context.config import METAFILE
from docker.context.config import get_current_context_name
from docker.context.config import write_context_name_to_docker_config
from docker.context import Context
class ContextAPI(object):
"""Context API.
Contains methods for context management:
create, list, remove, get, inspect.
"""
DEFAULT_CONTEXT = Context("default")
@classmethod
def create_context(
cls, name, orchestrator="swarm", host=None, tls_cfg=None,
default_namespace=None, skip_tls_verify=False):
"""Creates a new context.
Returns:
(Context): a Context object.
Raises:
:py:class:`docker.errors.MissingContextParameter`
If a context name is not provided.
:py:class:`docker.errors.ContextAlreadyExists`
If a context with the name already exists.
:py:class:`docker.errors.ContextException`
If name is default.
Example:
>>> from docker.context import ContextAPI
>>> ctx = ContextAPI.create_context(name='test')
>>> print(ctx.Metadata)
{
"Name": "test",
"Metadata": {
"StackOrchestrator": "swarm"
},
"Endpoints": {
"docker": {
"Host": "unix:///var/run/docker.sock",
"SkipTLSVerify": false
}
}
}
"""
if not name:
raise errors.MissingContextParameter("name")
if name == "default":
raise errors.ContextException(
'"default" is a reserved context name')
ctx = Context.load_context(name)
if ctx:
raise errors.ContextAlreadyExists(name)
endpoint = "docker" if orchestrator == "swarm" else orchestrator
ctx = Context(name, orchestrator)
ctx.set_endpoint(
endpoint, host, tls_cfg,
skip_tls_verify=skip_tls_verify,
def_namespace=default_namespace)
ctx.save()
return ctx
@classmethod
def get_context(cls, name=None):
"""Retrieves a context object.
Args:
name (str): The name of the context
Example:
>>> from docker.context import ContextAPI
>>> ctx = ContextAPI.get_context(name='test')
>>> print(ctx.Metadata)
{
"Name": "test",
"Metadata": {
"StackOrchestrator": "swarm"
},
"Endpoints": {
"docker": {
"Host": "unix:///var/run/docker.sock",
"SkipTLSVerify": false
}
}
}
"""
if not name:
name = get_current_context_name()
if name == "default":
return cls.DEFAULT_CONTEXT
return Context.load_context(name)
@classmethod
def contexts(cls):
"""Context list.
Returns:
(Context): List of context objects.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
names = []
for dirname, dirnames, fnames in os.walk(get_meta_dir()):
for filename in fnames + dirnames:
if filename == METAFILE:
try:
data = json.load(
open(os.path.join(dirname, filename), "r"))
names.append(data["Name"])
except Exception as e:
raise errors.ContextException(
"Failed to load metafile {}: {}".format(
filename, e))
contexts = [cls.DEFAULT_CONTEXT]
for name in names:
contexts.append(Context.load_context(name))
return contexts
@classmethod
def get_current_context(cls):
"""Get current context.
Returns:
(Context): current context object.
"""
return cls.get_context()
@classmethod
def set_current_context(cls, name="default"):
ctx = cls.get_context(name)
if not ctx:
raise errors.ContextNotFound(name)
err = write_context_name_to_docker_config(name)
if err:
raise errors.ContextException(
'Failed to set current context: {}'.format(err))
@classmethod
def remove_context(cls, name):
"""Remove a context. Similar to the ``docker context rm`` command.
Args:
name (str): The name of the context
Raises:
:py:class:`docker.errors.MissingContextParameter`
If a context name is not provided.
:py:class:`docker.errors.ContextNotFound`
If a context with the name does not exist.
:py:class:`docker.errors.ContextException`
If name is default.
Example:
>>> from docker.context import ContextAPI
>>> ContextAPI.remove_context(name='test')
>>>
"""
if not name:
raise errors.MissingContextParameter("name")
if name == "default":
raise errors.ContextException(
'context "default" cannot be removed')
ctx = Context.load_context(name)
if not ctx:
raise errors.ContextNotFound(name)
if name == get_current_context_name():
write_context_name_to_docker_config(None)
ctx.remove()
@classmethod
def inspect_context(cls, name="default"):
"""Remove a context. Similar to the ``docker context inspect`` command.
Args:
name (str): The name of the context
Raises:
:py:class:`docker.errors.MissingContextParameter`
If a context name is not provided.
:py:class:`docker.errors.ContextNotFound`
If a context with the name does not exist.
Example:
>>> from docker.context import ContextAPI
>>> ContextAPI.remove_context(name='test')
>>>
"""
if not name:
raise errors.MissingContextParameter("name")
if name == "default":
return cls.DEFAULT_CONTEXT()
ctx = Context.load_context(name)
if not ctx:
raise errors.ContextNotFound(name)
return ctx()

81
docker/context/config.py Normal file
View File

@ -0,0 +1,81 @@
import os
import json
import hashlib
from docker import utils
from docker.constants import IS_WINDOWS_PLATFORM
from docker.constants import DEFAULT_UNIX_SOCKET
from docker.utils.config import find_config_file
METAFILE = "meta.json"
def get_current_context_name():
name = "default"
docker_cfg_path = find_config_file()
if docker_cfg_path:
try:
with open(docker_cfg_path, "r") as f:
name = json.load(f).get("currentContext", "default")
except Exception:
return "default"
return name
def write_context_name_to_docker_config(name=None):
if name == 'default':
name = None
docker_cfg_path = find_config_file()
config = {}
if docker_cfg_path:
try:
with open(docker_cfg_path, "r") as f:
config = json.load(f)
except Exception as e:
return e
current_context = config.get("currentContext", None)
if current_context and not name:
del config["currentContext"]
elif name:
config["currentContext"] = name
else:
return
try:
with open(docker_cfg_path, "w") as f:
json.dump(config, f, indent=4)
except Exception as e:
return e
def get_context_id(name):
return hashlib.sha256(name.encode('utf-8')).hexdigest()
def get_context_dir():
return os.path.join(os.path.dirname(find_config_file() or ""), "contexts")
def get_meta_dir(name=None):
meta_dir = os.path.join(get_context_dir(), "meta")
if name:
return os.path.join(meta_dir, get_context_id(name))
return meta_dir
def get_meta_file(name):
return os.path.join(get_meta_dir(name), METAFILE)
def get_tls_dir(name=None, endpoint=""):
context_dir = get_context_dir()
if name:
return os.path.join(context_dir, "tls", get_context_id(name), endpoint)
return os.path.join(context_dir, "tls")
def get_context_host(path=None):
host = utils.parse_host(path, IS_WINDOWS_PLATFORM)
if host == DEFAULT_UNIX_SOCKET:
# remove http+ from default docker socket url
return host.strip("http+")
return host

208
docker/context/context.py Normal file
View File

@ -0,0 +1,208 @@
import os
import json
from shutil import copyfile, rmtree
from docker.tls import TLSConfig
from docker.errors import ContextException
from docker.context.config import get_meta_dir
from docker.context.config import get_meta_file
from docker.context.config import get_tls_dir
from docker.context.config import get_context_host
class Context:
"""A context."""
def __init__(self, name, orchestrator="swarm", host=None, endpoints=None):
if not name:
raise Exception("Name not provided")
self.name = name
self.orchestrator = orchestrator
if not endpoints:
default_endpoint = "docker" if (
orchestrator == "swarm"
) else orchestrator
self.endpoints = {
default_endpoint: {
"Host": get_context_host(host),
"SkipTLSVerify": False
}
}
else:
for k, v in endpoints.items():
ekeys = v.keys()
for param in ["Host", "SkipTLSVerify"]:
if param not in ekeys:
raise ContextException(
"Missing parameter {} from endpoint {}".format(
param, k))
self.endpoints = endpoints
self.tls_cfg = {}
self.meta_path = "IN MEMORY"
self.tls_path = "IN MEMORY"
def set_endpoint(
self, name="docker", host=None, tls_cfg=None,
skip_tls_verify=False, def_namespace=None):
self.endpoints[name] = {
"Host": get_context_host(host),
"SkipTLSVerify": skip_tls_verify
}
if def_namespace:
self.endpoints[name]["DefaultNamespace"] = def_namespace
if tls_cfg:
self.tls_cfg[name] = tls_cfg
def inspect(self):
return self.__call__()
@classmethod
def load_context(cls, name):
name, orchestrator, endpoints = Context._load_meta(name)
if name:
instance = cls(name, orchestrator, endpoints=endpoints)
instance._load_certs()
instance.meta_path = get_meta_dir(name)
return instance
return None
@classmethod
def _load_meta(cls, name):
metadata = {}
meta_file = get_meta_file(name)
if os.path.isfile(meta_file):
with open(meta_file) as f:
try:
with open(meta_file) as f:
metadata = json.load(f)
for k, v in metadata["Endpoints"].items():
metadata["Endpoints"][k]["SkipTLSVerify"] = bool(
v["SkipTLSVerify"])
except (IOError, KeyError, ValueError) as e:
# unknown format
raise Exception("""Detected corrupted meta file for
context {} : {}""".format(name, e))
return (
metadata["Name"], metadata["Metadata"]["StackOrchestrator"],
metadata["Endpoints"])
return None, None, None
def _load_certs(self):
certs = {}
tls_dir = get_tls_dir(self.name)
for endpoint in self.endpoints.keys():
if not os.path.isdir(os.path.join(tls_dir, endpoint)):
continue
ca_cert = None
cert = None
key = None
for filename in os.listdir(os.path.join(tls_dir, endpoint)):
if filename.startswith("ca"):
ca_cert = os.path.join(tls_dir, endpoint, filename)
elif filename.startswith("cert"):
cert = os.path.join(tls_dir, endpoint, filename)
elif filename.startswith("key"):
key = os.path.join(tls_dir, endpoint, filename)
if all([ca_cert, cert, key]):
certs[endpoint] = TLSConfig(
client_cert=(cert, key), ca_cert=ca_cert)
self.tls_cfg = certs
self.tls_path = tls_dir
def save(self):
meta_dir = get_meta_dir(self.name)
if not os.path.isdir(meta_dir):
os.makedirs(meta_dir)
with open(get_meta_file(self.name), "w") as f:
f.write(json.dumps(self.Metadata))
tls_dir = get_tls_dir(self.name)
for endpoint, tls in self.tls_cfg.items():
if not os.path.isdir(os.path.join(tls_dir, endpoint)):
os.makedirs(os.path.join(tls_dir, endpoint))
ca_file = tls.ca_cert
if ca_file:
copyfile(ca_file, os.path.join(
tls_dir, endpoint, os.path.basename(ca_file)))
if tls.cert:
cert_file, key_file = tls.cert
copyfile(cert_file, os.path.join(
tls_dir, endpoint, os.path.basename(cert_file)))
copyfile(key_file, os.path.join(
tls_dir, endpoint, os.path.basename(key_file)))
self.meta_path = get_meta_dir(self.name)
self.tls_path = get_tls_dir(self.name)
def remove(self):
if os.path.isdir(self.meta_path):
rmtree(self.meta_path)
if os.path.isdir(self.tls_path):
rmtree(self.tls_path)
def __repr__(self):
return "<%s: '%s'>" % (self.__class__.__name__, self.name)
def __str__(self):
return json.dumps(self.__call__(), indent=2)
def __call__(self):
result = self.Metadata
result.update(self.TLSMaterial)
result.update(self.Storage)
return result
@property
def Name(self):
return self.name
@property
def Host(self):
if self.orchestrator == "swarm":
return self.endpoints["docker"]["Host"]
return self.endpoints[self.orchestrator]["Host"]
@property
def Orchestrator(self):
return self.orchestrator
@property
def Metadata(self):
return {
"Name": self.name,
"Metadata": {
"StackOrchestrator": self.orchestrator
},
"Endpoints": self.endpoints
}
@property
def TLSConfig(self):
key = self.orchestrator
if key == "swarm":
key = "docker"
if key in self.tls_cfg.keys():
return self.tls_cfg[key]
return None
@property
def TLSMaterial(self):
certs = {}
for endpoint, tls in self.tls_cfg.items():
cert, key = tls.cert
certs[endpoint] = list(
map(os.path.basename, [tls.ca_cert, cert, key]))
return {
"TLSMaterial": certs
}
@property
def Storage(self):
return {
"Storage": {
"MetadataPath": self.meta_path,
"TLSPath": self.tls_path
}}

View File

@ -163,3 +163,35 @@ def create_unexpected_kwargs_error(name, kwargs):
text.append("got unexpected keyword arguments ")
text.append(', '.join(quoted_kwargs))
return TypeError(''.join(text))
class MissingContextParameter(DockerException):
def __init__(self, param):
self.param = param
def __str__(self):
return ("missing parameter: {}".format(self.param))
class ContextAlreadyExists(DockerException):
def __init__(self, name):
self.name = name
def __str__(self):
return ("context {} already exists".format(self.name))
class ContextException(DockerException):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return (self.msg)
class ContextNotFound(DockerException):
def __init__(self, name):
self.name = name
def __str__(self):
return ("context '{}' not found".format(self.name))

View File

@ -1,4 +1,5 @@
import functools
import time
import io
import six
@ -9,7 +10,7 @@ cERROR_PIPE_BUSY = 0xe7
cSECURITY_SQOS_PRESENT = 0x100000
cSECURITY_ANONYMOUS = 0
RETRY_WAIT_TIMEOUT = 10000
MAXIMUM_RETRY_COUNT = 10
def check_closed(f):
@ -46,8 +47,7 @@ class NpipeSocket(object):
self._closed = True
@check_closed
def connect(self, address):
win32pipe.WaitNamedPipe(address, self._timeout)
def connect(self, address, retry_count=0):
try:
handle = win32file.CreateFile(
address,
@ -65,8 +65,10 @@ class NpipeSocket(object):
# Another program or thread has grabbed our pipe instance
# before we got to it. Wait for availability and attempt to
# connect again.
win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT)
return self.connect(address)
retry_count = retry_count + 1
if (retry_count < MAXIMUM_RETRY_COUNT):
time.sleep(1)
return self.connect(address, retry_count)
raise e
self.flags = win32pipe.GetNamedPipeInfo(handle)[0]

View File

@ -1,6 +1,8 @@
import paramiko
import requests.adapters
import six
import logging
import os
from docker.transport.basehttpadapter import BaseHTTPAdapter
from .. import constants
@ -72,15 +74,40 @@ class SSHConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
class SSHHTTPAdapter(BaseHTTPAdapter):
__attrs__ = requests.adapters.HTTPAdapter.__attrs__ + [
'pools', 'timeout', 'ssh_client',
'pools', 'timeout', 'ssh_client', 'ssh_params'
]
def __init__(self, base_url, timeout=60,
pool_connections=constants.DEFAULT_NUM_POOLS):
logging.getLogger("paramiko").setLevel(logging.WARNING)
self.ssh_client = paramiko.SSHClient()
self.ssh_client.load_system_host_keys()
base_url = six.moves.urllib_parse.urlparse(base_url)
self.ssh_params = {
"hostname": base_url.hostname,
"port": base_url.port,
"username": base_url.username
}
ssh_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(ssh_config_file):
conf = paramiko.SSHConfig()
with open(ssh_config_file) as f:
conf.parse(f)
host_config = conf.lookup(base_url.hostname)
self.ssh_conf = host_config
if 'proxycommand' in host_config:
self.ssh_params["sock"] = paramiko.ProxyCommand(
self.ssh_conf['proxycommand']
)
if 'hostname' in host_config:
self.ssh_params['hostname'] = host_config['hostname']
if base_url.port is None and 'port' in host_config:
self.ssh_params['port'] = self.ssh_conf['port']
if base_url.username is None and 'user' in host_config:
self.ssh_params['username'] = self.ssh_conf['user']
self.ssh_client.load_system_host_keys()
self.ssh_client.set_missing_host_key_policy(paramiko.WarningPolicy())
self.base_url = base_url
self._connect()
self.timeout = timeout
self.pools = RecentlyUsedContainer(
@ -89,10 +116,7 @@ class SSHHTTPAdapter(BaseHTTPAdapter):
super(SSHHTTPAdapter, self).__init__()
def _connect(self):
parsed = six.moves.urllib_parse.urlparse(self.base_url)
self.ssh_client.connect(
parsed.hostname, parsed.port, parsed.username,
)
self.ssh_client.connect(**self.ssh_params)
def get_connection(self, url, proxies=None):
with self.pools.lock:

View File

@ -11,6 +11,10 @@ import six
from .. import errors
from .. import tls
from ..constants import DEFAULT_HTTP_HOST
from ..constants import DEFAULT_UNIX_SOCKET
from ..constants import DEFAULT_NPIPE
from ..constants import BYTE_UNITS
if six.PY2:
from urllib import splitnport
@ -18,17 +22,6 @@ if six.PY2:
else:
from urllib.parse import splitnport, urlparse
DEFAULT_HTTP_HOST = "127.0.0.1"
DEFAULT_UNIX_SOCKET = "http+unix:///var/run/docker.sock"
DEFAULT_NPIPE = 'npipe:////./pipe/docker_engine'
BYTE_UNITS = {
'b': 1,
'k': 1024,
'm': 1024 * 1024,
'g': 1024 * 1024 * 1024
}
def create_ipam_pool(*args, **kwargs):
raise errors.DeprecatedMethod(

View File

@ -1,2 +1,2 @@
version = "4.1.0"
version = "4.2.0"
version_info = tuple([int(d) for d in version.split("-")[0].split(".")])

View File

@ -1,6 +1,23 @@
Change log
==========
4.2.0
-----
[List of PRs / issues for this release](https://github.com/docker/docker-py/milestone/63?closed=1)
### Bugfixes
- Fix `win32pipe.WaitNamedPipe` throw exception in Windows containers
- Use `Hostname`, `Username`, `Port` and `ProxyCommand` settings from `.ssh/config` when on SSH
- Set host key policy for ssh transport to `paramiko.WarningPolicy()`
- Set logging level of `paramiko` to warn
### Features
- Add support for docker contexts through `docker.ContextAPI`
4.1.0
-----

View File

@ -1,3 +1,4 @@
setuptools==44.0.0 # last version with python 2.7 support
coverage==4.5.2
flake8==3.6.0
mock==1.0.1

View File

@ -0,0 +1,52 @@
import os
import tempfile
import pytest
from docker import errors
from docker.context import ContextAPI
from docker.tls import TLSConfig
from .base import BaseAPIIntegrationTest
class ContextLifecycleTest(BaseAPIIntegrationTest):
def test_lifecycle(self):
assert ContextAPI.get_context().Name == "default"
assert not ContextAPI.get_context("test")
assert ContextAPI.get_current_context().Name == "default"
dirpath = tempfile.mkdtemp()
ca = tempfile.NamedTemporaryFile(
prefix=os.path.join(dirpath, "ca.pem"), mode="r")
cert = tempfile.NamedTemporaryFile(
prefix=os.path.join(dirpath, "cert.pem"), mode="r")
key = tempfile.NamedTemporaryFile(
prefix=os.path.join(dirpath, "key.pem"), mode="r")
# create context 'test
docker_tls = TLSConfig(
client_cert=(cert.name, key.name),
ca_cert=ca.name)
ContextAPI.create_context(
"test", tls_cfg=docker_tls)
# check for a context 'test' in the context store
assert any([ctx.Name == "test" for ctx in ContextAPI.contexts()])
# retrieve a context object for 'test'
assert ContextAPI.get_context("test")
# remove context
ContextAPI.remove_context("test")
with pytest.raises(errors.ContextNotFound):
ContextAPI.inspect_context("test")
# check there is no 'test' context in store
assert not ContextAPI.get_context("test")
ca.close()
key.close()
cert.close()
def test_context_remove(self):
ContextAPI.create_context("test")
assert ContextAPI.inspect_context("test")["Name"] == "test"
ContextAPI.remove_context("test")
with pytest.raises(errors.ContextNotFound):
ContextAPI.inspect_context("test")

View File

@ -87,8 +87,10 @@ class ImageCollectionTest(BaseIntegrationTest):
def test_pull_multiple(self):
client = docker.from_env(version=TEST_API_VERSION)
images = client.images.pull('hello-world')
assert len(images) == 1
assert 'hello-world:latest' in images[0].attrs['RepoTags']
assert len(images) >= 1
assert any([
'hello-world:latest' in img.attrs['RepoTags'] for img in images
])
def test_load_error(self):
client = docker.from_env(version=TEST_API_VERSION)

View File

@ -0,0 +1,45 @@
import unittest
import docker
import pytest
from docker.constants import DEFAULT_UNIX_SOCKET
from docker.constants import DEFAULT_NPIPE
from docker.constants import IS_WINDOWS_PLATFORM
from docker.context import ContextAPI, Context
class BaseContextTest(unittest.TestCase):
@pytest.mark.skipif(
IS_WINDOWS_PLATFORM, reason='Linux specific path check'
)
def test_url_compatibility_on_linux(self):
c = Context("test")
assert c.Host == DEFAULT_UNIX_SOCKET.strip("http+")
@pytest.mark.skipif(
not IS_WINDOWS_PLATFORM, reason='Windows specific path check'
)
def test_url_compatibility_on_windows(self):
c = Context("test")
assert c.Host == DEFAULT_NPIPE
def test_fail_on_default_context_create(self):
with pytest.raises(docker.errors.ContextException):
ContextAPI.create_context("default")
def test_default_in_context_list(self):
found = False
ctx = ContextAPI.contexts()
for c in ctx:
if c.Name == "default":
found = True
assert found is True
def test_get_current_context(self):
assert ContextAPI.get_current_context().Name == "default"
def test_context_inspect_without_params(self):
ctx = ContextAPI.inspect_context()
assert ctx["Name"] == "default"
assert ctx["Metadata"]["StackOrchestrator"] == "swarm"
assert ctx["Endpoints"]["docker"]["Host"] in [
DEFAULT_NPIPE, DEFAULT_UNIX_SOCKET.strip("http+")]

View File

@ -101,17 +101,17 @@ class APIErrorTest(unittest.TestCase):
assert err.is_error() is True
def test_create_error_from_exception(self):
resp = requests.Response()
resp.status_code = 500
err = APIError('')
resp = requests.Response()
resp.status_code = 500
err = APIError('')
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
try:
create_api_error_from_http_exception(e)
except APIError as e:
err = e
assert err.is_server_error() is True
create_api_error_from_http_exception(e)
except APIError as e:
err = e
assert err.is_server_error() is True
class ContainerErrorTest(unittest.TestCase):