types: Add types to `auth`, `client`, `errors`, `tls`

Signed-off-by: Victorien Plot <65306057+Viicos@users.noreply.github.com>
Signed-off-by: Viicos <65306057+Viicos@users.noreply.github.com>
This commit is contained in:
Victorien Plot 2023-01-02 20:22:43 +01:00 committed by Viicos
parent c38656dc78
commit 6db8e694db
4 changed files with 85 additions and 70 deletions

View File

@ -1,9 +1,12 @@
from __future__ import annotations
import base64
import json
import logging
from typing import Any, AnyStr, Tuple, Dict, Optional, Union
from . import credentials
from . import errors
from .api.client import APIClient
from .utils import config
INDEX_NAME = 'docker.io'
@ -13,7 +16,7 @@ TOKEN_USERNAME = '<token>'
log = logging.getLogger(__name__)
def resolve_repository_name(repo_name):
def resolve_repository_name(repo_name: str) -> Tuple[str, str]:
if '://' in repo_name:
raise errors.InvalidRepository(
f'Repository name cannot contain a scheme ({repo_name})'
@ -28,14 +31,14 @@ def resolve_repository_name(repo_name):
return resolve_index_name(index_name), remote_name
def resolve_index_name(index_name):
def resolve_index_name(index_name: str) -> str:
index_name = convert_to_hostname(index_name)
if index_name == f"index.{INDEX_NAME}":
index_name = INDEX_NAME
return index_name
def get_config_header(client, registry):
def get_config_header(client: APIClient, registry: str) -> Optional[bytes]:
log.debug('Looking for auth config')
if not client._auth_configs or client._auth_configs.is_empty:
log.debug(
@ -57,7 +60,7 @@ def get_config_header(client, registry):
return None
def split_repo_name(repo_name):
def split_repo_name(repo_name: str) -> Tuple[str, str]:
parts = repo_name.split('/', 1)
if len(parts) == 1 or (
'.' not in parts[0] and ':' not in parts[0] and parts[0] != 'localhost'
@ -74,7 +77,7 @@ def get_credential_store(authconfig, registry):
class AuthConfig(dict):
def __init__(self, dct, credstore_env=None):
def __init__(self, dct: Dict[str, Any], credstore_env: Optional[Dict[str, Any]] = None) -> None:
if 'auths' not in dct:
dct['auths'] = {}
self.update(dct)
@ -82,7 +85,7 @@ class AuthConfig(dict):
self._stores = {}
@classmethod
def parse_auth(cls, entries, raise_on_error=False):
def parse_auth(cls, entries: Dict[str, Any], raise_on_error: bool = False) -> Dict[str, Any]:
"""
Parses authentication entries
@ -142,7 +145,7 @@ class AuthConfig(dict):
return conf
@classmethod
def load_config(cls, config_path, config_dict, credstore_env=None):
def load_config(cls, config_path: str, config_dict: Dict[str, Any], credstore_env: Optional[Dict[str, Any]] = None) -> AuthConfig:
"""
Loads authentication data from a Docker configuration file in the given
root directory or if config_path is passed use given path.
@ -190,24 +193,24 @@ class AuthConfig(dict):
return cls({'auths': cls.parse_auth(config_dict)}, credstore_env)
@property
def auths(self):
def auths(self) -> Dict[str, Any]:
return self.get('auths', {})
@property
def creds_store(self):
def creds_store(self) -> Optional[str]:
return self.get('credsStore', None)
@property
def cred_helpers(self):
def cred_helpers(self) -> Dict[str, Any]:
return self.get('credHelpers', {})
@property
def is_empty(self):
def is_empty(self) -> bool:
return (
not self.auths and not self.creds_store and not self.cred_helpers
)
def resolve_authconfig(self, registry=None):
def resolve_authconfig(self, registry: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Returns the authentication data from the given auth configuration for a
specific registry. As with the Docker client, legacy entries in the
@ -242,7 +245,7 @@ class AuthConfig(dict):
log.debug("No entry found")
return None
def _resolve_authconfig_credstore(self, registry, credstore_name):
def _resolve_authconfig_credstore(self, registry: str, credstore_name: str) -> Optional[Dict[str, Any]]:
if not registry or registry == INDEX_NAME:
# The ecosystem is a little schizophrenic with index.docker.io VS
# docker.io - in that case, it seems the full URL is necessary.
@ -270,20 +273,20 @@ class AuthConfig(dict):
f'Credentials store error: {repr(e)}'
) from e
def _get_store_instance(self, name):
def _get_store_instance(self, name: str) -> Dict[str, Any]:
if name not in self._stores:
self._stores[name] = credentials.Store(
name, environment=self._credstore_env
)
return self._stores[name]
def get_credential_store(self, registry):
def get_credential_store(self, registry: str) -> str:
if not registry or registry == INDEX_NAME:
registry = INDEX_URL
return self.cred_helpers.get(registry) or self.creds_store
def get_all_credentials(self):
def get_all_credentials(self) -> Dict[str, Any]:
auth_data = self.auths.copy()
if self.creds_store:
# Retrieve all credentials from the default store
@ -303,34 +306,36 @@ class AuthConfig(dict):
return auth_data
def add_auth(self, reg, data):
def add_auth(self, reg: str, data: Any) -> None:
self['auths'][reg] = data
def resolve_authconfig(authconfig, registry=None, credstore_env=None):
def resolve_authconfig(authconfig: Union[AuthConfig, Dict[str, Any]], registry: Optional[str] = None, credstore_env: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
if not isinstance(authconfig, AuthConfig):
authconfig = AuthConfig(authconfig, credstore_env)
return authconfig.resolve_authconfig(registry)
def convert_to_hostname(url):
def convert_to_hostname(url: str) -> str:
return url.replace('http://', '').replace('https://', '').split('/', 1)[0]
def decode_auth(auth):
def decode_auth(auth: AnyStr) -> Tuple[str, str]:
if isinstance(auth, str):
auth = auth.encode('ascii')
s = base64.b64decode(auth)
auth_bytes = auth.encode('ascii')
else:
auth_bytes = auth
s = base64.b64decode(auth_bytes)
login, pwd = s.split(b':', 1)
return login.decode('utf8'), pwd.decode('utf8')
def encode_header(auth):
def encode_header(auth: Dict[str, Any]) -> bytes:
auth_json = json.dumps(auth).encode('ascii')
return base64.urlsafe_b64encode(auth_json)
def parse_auth(entries, raise_on_error=False):
def parse_auth(entries: Dict[str, Any], raise_on_error: bool = False) -> Dict[str, Any]:
"""
Parses authentication entries
@ -346,11 +351,11 @@ def parse_auth(entries, raise_on_error=False):
return AuthConfig.parse_auth(entries, raise_on_error)
def load_config(config_path=None, config_dict=None, credstore_env=None):
def load_config(config_path: Optional[str] = None, config_dict: Optional[Dict[str, Any]] = None, credstore_env: Optional[Dict[str, Any]] = None) -> AuthConfig:
return AuthConfig.load_config(config_path, config_dict, credstore_env)
def _load_legacy_config(config_file):
def _load_legacy_config(config_file: str) -> Dict[str, Any]:
log.debug("Attempting to parse legacy auth file format")
try:
data = []

View File

@ -1,3 +1,7 @@
from __future__ import annotations
from typing import Any, Dict, NoReturn
from .types.daemon import CancellableStream
from .api.client import APIClient
from .constants import (DEFAULT_TIMEOUT_SECONDS, DEFAULT_MAX_POOL_SIZE)
from .models.configs import ConfigCollection
@ -41,11 +45,11 @@ class DockerClient:
max_pool_size (int): The maximum number of connections
to save in the pool.
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.api = APIClient(*args, **kwargs)
@classmethod
def from_env(cls, **kwargs):
def from_env(cls, **kwargs: Any) -> DockerClient:
"""
Return a client configured from environment variables.
@ -103,7 +107,7 @@ class DockerClient:
# Resources
@property
def configs(self):
def configs(self) -> ConfigCollection:
"""
An object for managing configs on the server. See the
:doc:`configs documentation <configs>` for full details.
@ -111,7 +115,7 @@ class DockerClient:
return ConfigCollection(client=self)
@property
def containers(self):
def containers(self) -> ContainerCollection:
"""
An object for managing containers on the server. See the
:doc:`containers documentation <containers>` for full details.
@ -119,7 +123,7 @@ class DockerClient:
return ContainerCollection(client=self)
@property
def images(self):
def images(self) -> ImageCollection:
"""
An object for managing images on the server. See the
:doc:`images documentation <images>` for full details.
@ -127,7 +131,7 @@ class DockerClient:
return ImageCollection(client=self)
@property
def networks(self):
def networks(self) -> NetworkCollection:
"""
An object for managing networks on the server. See the
:doc:`networks documentation <networks>` for full details.
@ -135,7 +139,7 @@ class DockerClient:
return NetworkCollection(client=self)
@property
def nodes(self):
def nodes(self) -> NodeCollection:
"""
An object for managing nodes on the server. See the
:doc:`nodes documentation <nodes>` for full details.
@ -143,7 +147,7 @@ class DockerClient:
return NodeCollection(client=self)
@property
def plugins(self):
def plugins(self) -> PluginCollection:
"""
An object for managing plugins on the server. See the
:doc:`plugins documentation <plugins>` for full details.
@ -151,7 +155,7 @@ class DockerClient:
return PluginCollection(client=self)
@property
def secrets(self):
def secrets(self) -> SecretCollection:
"""
An object for managing secrets on the server. See the
:doc:`secrets documentation <secrets>` for full details.
@ -159,7 +163,7 @@ class DockerClient:
return SecretCollection(client=self)
@property
def services(self):
def services(self) -> ServiceCollection:
"""
An object for managing services on the server. See the
:doc:`services documentation <services>` for full details.
@ -167,7 +171,7 @@ class DockerClient:
return ServiceCollection(client=self)
@property
def swarm(self):
def swarm(self) -> Swarm:
"""
An object for managing a swarm on the server. See the
:doc:`swarm documentation <swarm>` for full details.
@ -175,7 +179,7 @@ class DockerClient:
return Swarm(client=self)
@property
def volumes(self):
def volumes(self) -> VolumeCollection:
"""
An object for managing volumes on the server. See the
:doc:`volumes documentation <volumes>` for full details.
@ -183,35 +187,35 @@ class DockerClient:
return VolumeCollection(client=self)
# Top-level methods
def events(self, *args, **kwargs):
def events(self, *args: Any, **kwargs: Any) -> CancellableStream:
return self.api.events(*args, **kwargs)
events.__doc__ = APIClient.events.__doc__
def df(self):
def df(self) -> Dict[str, Any]:
return self.api.df()
df.__doc__ = APIClient.df.__doc__
def info(self, *args, **kwargs):
def info(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
return self.api.info(*args, **kwargs)
info.__doc__ = APIClient.info.__doc__
def login(self, *args, **kwargs):
def login(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
return self.api.login(*args, **kwargs)
login.__doc__ = APIClient.login.__doc__
def ping(self, *args, **kwargs):
def ping(self, *args: Any, **kwargs: Any) -> bool:
return self.api.ping(*args, **kwargs)
ping.__doc__ = APIClient.ping.__doc__
def version(self, *args, **kwargs):
def version(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
return self.api.version(*args, **kwargs)
version.__doc__ = APIClient.version.__doc__
def close(self):
def close(self) -> None:
return self.api.close()
close.__doc__ = APIClient.close.__doc__
def __getattr__(self, name):
def __getattr__(self, name: str) -> NoReturn:
s = [f"'DockerClient' object has no attribute '{name}'"]
# If a user calls a method on APIClient, they
if hasattr(APIClient, name):

View File

@ -1,5 +1,9 @@
from typing import Any, Optional, List, Union
import requests
from .models.containers import Container
_image_not_found_explanation_fragments = frozenset(
fragment.lower() for fragment in [
'no such image',
@ -19,11 +23,11 @@ class DockerException(Exception):
"""
def create_api_error_from_http_exception(e):
def create_api_error_from_http_exception(e: requests.HTTPError) -> Union["APIError", "ImageNotFound", "NotFound"]:
"""
Create a suitable APIError from requests.exceptions.HTTPError.
"""
response = e.response
response: requests.Response = e.response
try:
explanation = response.json()['message']
except ValueError:
@ -43,14 +47,14 @@ class APIError(requests.exceptions.HTTPError, DockerException):
"""
An HTTP error from the API.
"""
def __init__(self, message, response=None, explanation=None):
def __init__(self, message: str, response: Optional[requests.Response] = None, explanation: Optional[str] = None) -> None:
# requests 1.2 supports response as a keyword argument, but
# requests 1.1 doesn't
super().__init__(message)
self.response = response
self.explanation = explanation
def __str__(self):
def __str__(self) -> str:
message = super().__str__()
if self.is_client_error():
@ -71,19 +75,19 @@ class APIError(requests.exceptions.HTTPError, DockerException):
return message
@property
def status_code(self):
def status_code(self) -> Optional[int]:
if self.response is not None:
return self.response.status_code
def is_error(self):
def is_error(self) -> bool:
return self.is_client_error() or self.is_server_error()
def is_client_error(self):
def is_client_error(self) -> bool:
if self.status_code is None:
return False
return 400 <= self.status_code < 500
def is_server_error(self):
def is_server_error(self) -> bool:
if self.status_code is None:
return False
return 500 <= self.status_code < 600
@ -118,10 +122,10 @@ class DeprecatedMethod(DockerException):
class TLSParameterError(DockerException):
def __init__(self, msg):
def __init__(self, msg: str) -> None:
self.msg = msg
def __str__(self):
def __str__(self) -> str:
return self.msg + (". TLS configurations should map the Docker CLI "
"client configurations. See "
"https://docs.docker.com/engine/articles/https/ "
@ -136,7 +140,7 @@ class ContainerError(DockerException):
"""
Represents a container that has exited with a non-zero exit code.
"""
def __init__(self, container, exit_status, command, image, stderr):
def __init__(self, container: Container, exit_status: int, command: Union[str, List[str]], image: str, stderr: Optional[str]) -> None:
self.container = container
self.exit_status = exit_status
self.command = command
@ -151,7 +155,7 @@ class ContainerError(DockerException):
class StreamParseError(RuntimeError):
def __init__(self, reason):
def __init__(self, reason: Any) -> None:
self.msg = reason
@ -178,32 +182,32 @@ def create_unexpected_kwargs_error(name, kwargs):
class MissingContextParameter(DockerException):
def __init__(self, param):
def __init__(self, param: str) -> None:
self.param = param
def __str__(self):
def __str__(self) -> str:
return (f"missing parameter: {self.param}")
class ContextAlreadyExists(DockerException):
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def __str__(self):
def __str__(self) -> str:
return (f"context {self.name} already exists")
class ContextException(DockerException):
def __init__(self, msg):
def __init__(self, msg: str) -> None:
self.msg = msg
def __str__(self):
def __str__(self) -> str:
return (self.msg)
class ContextNotFound(DockerException):
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def __str__(self):
def __str__(self) -> str:
return (f"context '{self.name}' not found")

View File

@ -1,7 +1,9 @@
import os
import ssl
from typing import Tuple, Optional, Union
from . import errors
from .api.client import APIClient
from .transport import SSLHTTPAdapter
@ -26,9 +28,9 @@ class TLSConfig:
verify = None
ssl_version = None
def __init__(self, client_cert=None, ca_cert=None, verify=None,
ssl_version=None, assert_hostname=None,
assert_fingerprint=None):
def __init__(self, client_cert: Optional[Tuple[str, str]] = None, ca_cert: Optional[str] = None, verify: Optional[Union[bool, str]] = None,
ssl_version: Optional[int] = None, assert_hostname: Optional[bool] = None,
assert_fingerprint: Optional[bool] = None) -> None:
# Argument compatibility/mapping with
# https://docs.docker.com/engine/articles/https/
# This diverges from the Docker CLI in that users can specify 'tls'
@ -73,7 +75,7 @@ class TLSConfig:
'Invalid CA certificate provided for `ca_cert`.'
)
def configure_client(self, client):
def configure_client(self, client: APIClient) -> None:
"""
Configure a client with these TLS options.
"""