From 6db8e694dba271fe5352e36def51dc9b7fd0e8c6 Mon Sep 17 00:00:00 2001 From: Victorien Plot <65306057+Viicos@users.noreply.github.com> Date: Mon, 2 Jan 2023 20:22:43 +0100 Subject: [PATCH] types: Add types to `auth`, `client`, `errors`, `tls` Signed-off-by: Victorien Plot <65306057+Viicos@users.noreply.github.com> Signed-off-by: Viicos <65306057+Viicos@users.noreply.github.com> --- docker/auth.py | 57 ++++++++++++++++++++++++++---------------------- docker/client.py | 44 ++++++++++++++++++++----------------- docker/errors.py | 44 ++++++++++++++++++++----------------- docker/tls.py | 10 +++++---- 4 files changed, 85 insertions(+), 70 deletions(-) diff --git a/docker/auth.py b/docker/auth.py index 7a301ba4..45a76728 100644 --- a/docker/auth.py +++ b/docker/auth.py @@ -1,9 +1,12 @@ +from __future__ import annotations import base64 import json import logging +from typing import Any, AnyStr, Tuple, Dict, Optional, Union from . import credentials from . import errors +from .api.client import APIClient from .utils import config INDEX_NAME = 'docker.io' @@ -13,7 +16,7 @@ TOKEN_USERNAME = '' log = logging.getLogger(__name__) -def resolve_repository_name(repo_name): +def resolve_repository_name(repo_name: str) -> Tuple[str, str]: if '://' in repo_name: raise errors.InvalidRepository( f'Repository name cannot contain a scheme ({repo_name})' @@ -28,14 +31,14 @@ def resolve_repository_name(repo_name): return resolve_index_name(index_name), remote_name -def resolve_index_name(index_name): +def resolve_index_name(index_name: str) -> str: index_name = convert_to_hostname(index_name) if index_name == f"index.{INDEX_NAME}": index_name = INDEX_NAME return index_name -def get_config_header(client, registry): +def get_config_header(client: APIClient, registry: str) -> Optional[bytes]: log.debug('Looking for auth config') if not client._auth_configs or client._auth_configs.is_empty: log.debug( @@ -57,7 +60,7 @@ def get_config_header(client, registry): return None -def split_repo_name(repo_name): +def split_repo_name(repo_name: str) -> Tuple[str, str]: parts = repo_name.split('/', 1) if len(parts) == 1 or ( '.' not in parts[0] and ':' not in parts[0] and parts[0] != 'localhost' @@ -74,7 +77,7 @@ def get_credential_store(authconfig, registry): class AuthConfig(dict): - def __init__(self, dct, credstore_env=None): + def __init__(self, dct: Dict[str, Any], credstore_env: Optional[Dict[str, Any]] = None) -> None: if 'auths' not in dct: dct['auths'] = {} self.update(dct) @@ -82,7 +85,7 @@ class AuthConfig(dict): self._stores = {} @classmethod - def parse_auth(cls, entries, raise_on_error=False): + def parse_auth(cls, entries: Dict[str, Any], raise_on_error: bool = False) -> Dict[str, Any]: """ Parses authentication entries @@ -142,7 +145,7 @@ class AuthConfig(dict): return conf @classmethod - def load_config(cls, config_path, config_dict, credstore_env=None): + def load_config(cls, config_path: str, config_dict: Dict[str, Any], credstore_env: Optional[Dict[str, Any]] = None) -> AuthConfig: """ Loads authentication data from a Docker configuration file in the given root directory or if config_path is passed use given path. @@ -190,24 +193,24 @@ class AuthConfig(dict): return cls({'auths': cls.parse_auth(config_dict)}, credstore_env) @property - def auths(self): + def auths(self) -> Dict[str, Any]: return self.get('auths', {}) @property - def creds_store(self): + def creds_store(self) -> Optional[str]: return self.get('credsStore', None) @property - def cred_helpers(self): + def cred_helpers(self) -> Dict[str, Any]: return self.get('credHelpers', {}) @property - def is_empty(self): + def is_empty(self) -> bool: return ( not self.auths and not self.creds_store and not self.cred_helpers ) - def resolve_authconfig(self, registry=None): + def resolve_authconfig(self, registry: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Returns the authentication data from the given auth configuration for a specific registry. As with the Docker client, legacy entries in the @@ -242,7 +245,7 @@ class AuthConfig(dict): log.debug("No entry found") return None - def _resolve_authconfig_credstore(self, registry, credstore_name): + def _resolve_authconfig_credstore(self, registry: str, credstore_name: str) -> Optional[Dict[str, Any]]: if not registry or registry == INDEX_NAME: # The ecosystem is a little schizophrenic with index.docker.io VS # docker.io - in that case, it seems the full URL is necessary. @@ -270,20 +273,20 @@ class AuthConfig(dict): f'Credentials store error: {repr(e)}' ) from e - def _get_store_instance(self, name): + def _get_store_instance(self, name: str) -> Dict[str, Any]: if name not in self._stores: self._stores[name] = credentials.Store( name, environment=self._credstore_env ) return self._stores[name] - def get_credential_store(self, registry): + def get_credential_store(self, registry: str) -> str: if not registry or registry == INDEX_NAME: registry = INDEX_URL return self.cred_helpers.get(registry) or self.creds_store - def get_all_credentials(self): + def get_all_credentials(self) -> Dict[str, Any]: auth_data = self.auths.copy() if self.creds_store: # Retrieve all credentials from the default store @@ -303,34 +306,36 @@ class AuthConfig(dict): return auth_data - def add_auth(self, reg, data): + def add_auth(self, reg: str, data: Any) -> None: self['auths'][reg] = data -def resolve_authconfig(authconfig, registry=None, credstore_env=None): +def resolve_authconfig(authconfig: Union[AuthConfig, Dict[str, Any]], registry: Optional[str] = None, credstore_env: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: if not isinstance(authconfig, AuthConfig): authconfig = AuthConfig(authconfig, credstore_env) return authconfig.resolve_authconfig(registry) -def convert_to_hostname(url): +def convert_to_hostname(url: str) -> str: return url.replace('http://', '').replace('https://', '').split('/', 1)[0] -def decode_auth(auth): +def decode_auth(auth: AnyStr) -> Tuple[str, str]: if isinstance(auth, str): - auth = auth.encode('ascii') - s = base64.b64decode(auth) + auth_bytes = auth.encode('ascii') + else: + auth_bytes = auth + s = base64.b64decode(auth_bytes) login, pwd = s.split(b':', 1) return login.decode('utf8'), pwd.decode('utf8') -def encode_header(auth): +def encode_header(auth: Dict[str, Any]) -> bytes: auth_json = json.dumps(auth).encode('ascii') return base64.urlsafe_b64encode(auth_json) -def parse_auth(entries, raise_on_error=False): +def parse_auth(entries: Dict[str, Any], raise_on_error: bool = False) -> Dict[str, Any]: """ Parses authentication entries @@ -346,11 +351,11 @@ def parse_auth(entries, raise_on_error=False): return AuthConfig.parse_auth(entries, raise_on_error) -def load_config(config_path=None, config_dict=None, credstore_env=None): +def load_config(config_path: Optional[str] = None, config_dict: Optional[Dict[str, Any]] = None, credstore_env: Optional[Dict[str, Any]] = None) -> AuthConfig: return AuthConfig.load_config(config_path, config_dict, credstore_env) -def _load_legacy_config(config_file): +def _load_legacy_config(config_file: str) -> Dict[str, Any]: log.debug("Attempting to parse legacy auth file format") try: data = [] diff --git a/docker/client.py b/docker/client.py index 4dbd846f..bd7e7173 100644 --- a/docker/client.py +++ b/docker/client.py @@ -1,3 +1,7 @@ +from __future__ import annotations +from typing import Any, Dict, NoReturn + +from .types.daemon import CancellableStream from .api.client import APIClient from .constants import (DEFAULT_TIMEOUT_SECONDS, DEFAULT_MAX_POOL_SIZE) from .models.configs import ConfigCollection @@ -41,11 +45,11 @@ class DockerClient: max_pool_size (int): The maximum number of connections to save in the pool. """ - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: self.api = APIClient(*args, **kwargs) @classmethod - def from_env(cls, **kwargs): + def from_env(cls, **kwargs: Any) -> DockerClient: """ Return a client configured from environment variables. @@ -103,7 +107,7 @@ class DockerClient: # Resources @property - def configs(self): + def configs(self) -> ConfigCollection: """ An object for managing configs on the server. See the :doc:`configs documentation ` for full details. @@ -111,7 +115,7 @@ class DockerClient: return ConfigCollection(client=self) @property - def containers(self): + def containers(self) -> ContainerCollection: """ An object for managing containers on the server. See the :doc:`containers documentation ` for full details. @@ -119,7 +123,7 @@ class DockerClient: return ContainerCollection(client=self) @property - def images(self): + def images(self) -> ImageCollection: """ An object for managing images on the server. See the :doc:`images documentation ` for full details. @@ -127,7 +131,7 @@ class DockerClient: return ImageCollection(client=self) @property - def networks(self): + def networks(self) -> NetworkCollection: """ An object for managing networks on the server. See the :doc:`networks documentation ` for full details. @@ -135,7 +139,7 @@ class DockerClient: return NetworkCollection(client=self) @property - def nodes(self): + def nodes(self) -> NodeCollection: """ An object for managing nodes on the server. See the :doc:`nodes documentation ` for full details. @@ -143,7 +147,7 @@ class DockerClient: return NodeCollection(client=self) @property - def plugins(self): + def plugins(self) -> PluginCollection: """ An object for managing plugins on the server. See the :doc:`plugins documentation ` for full details. @@ -151,7 +155,7 @@ class DockerClient: return PluginCollection(client=self) @property - def secrets(self): + def secrets(self) -> SecretCollection: """ An object for managing secrets on the server. See the :doc:`secrets documentation ` for full details. @@ -159,7 +163,7 @@ class DockerClient: return SecretCollection(client=self) @property - def services(self): + def services(self) -> ServiceCollection: """ An object for managing services on the server. See the :doc:`services documentation ` for full details. @@ -167,7 +171,7 @@ class DockerClient: return ServiceCollection(client=self) @property - def swarm(self): + def swarm(self) -> Swarm: """ An object for managing a swarm on the server. See the :doc:`swarm documentation ` for full details. @@ -175,7 +179,7 @@ class DockerClient: return Swarm(client=self) @property - def volumes(self): + def volumes(self) -> VolumeCollection: """ An object for managing volumes on the server. See the :doc:`volumes documentation ` for full details. @@ -183,35 +187,35 @@ class DockerClient: return VolumeCollection(client=self) # Top-level methods - def events(self, *args, **kwargs): + def events(self, *args: Any, **kwargs: Any) -> CancellableStream: return self.api.events(*args, **kwargs) events.__doc__ = APIClient.events.__doc__ - def df(self): + def df(self) -> Dict[str, Any]: return self.api.df() df.__doc__ = APIClient.df.__doc__ - def info(self, *args, **kwargs): + def info(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return self.api.info(*args, **kwargs) info.__doc__ = APIClient.info.__doc__ - def login(self, *args, **kwargs): + def login(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return self.api.login(*args, **kwargs) login.__doc__ = APIClient.login.__doc__ - def ping(self, *args, **kwargs): + def ping(self, *args: Any, **kwargs: Any) -> bool: return self.api.ping(*args, **kwargs) ping.__doc__ = APIClient.ping.__doc__ - def version(self, *args, **kwargs): + def version(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return self.api.version(*args, **kwargs) version.__doc__ = APIClient.version.__doc__ - def close(self): + def close(self) -> None: return self.api.close() close.__doc__ = APIClient.close.__doc__ - def __getattr__(self, name): + def __getattr__(self, name: str) -> NoReturn: s = [f"'DockerClient' object has no attribute '{name}'"] # If a user calls a method on APIClient, they if hasattr(APIClient, name): diff --git a/docker/errors.py b/docker/errors.py index d03e10f6..8959aaa3 100644 --- a/docker/errors.py +++ b/docker/errors.py @@ -1,5 +1,9 @@ +from typing import Any, Optional, List, Union + import requests +from .models.containers import Container + _image_not_found_explanation_fragments = frozenset( fragment.lower() for fragment in [ 'no such image', @@ -19,11 +23,11 @@ class DockerException(Exception): """ -def create_api_error_from_http_exception(e): +def create_api_error_from_http_exception(e: requests.HTTPError) -> Union["APIError", "ImageNotFound", "NotFound"]: """ Create a suitable APIError from requests.exceptions.HTTPError. """ - response = e.response + response: requests.Response = e.response try: explanation = response.json()['message'] except ValueError: @@ -43,14 +47,14 @@ class APIError(requests.exceptions.HTTPError, DockerException): """ An HTTP error from the API. """ - def __init__(self, message, response=None, explanation=None): + def __init__(self, message: str, response: Optional[requests.Response] = None, explanation: Optional[str] = None) -> None: # requests 1.2 supports response as a keyword argument, but # requests 1.1 doesn't super().__init__(message) self.response = response self.explanation = explanation - def __str__(self): + def __str__(self) -> str: message = super().__str__() if self.is_client_error(): @@ -71,19 +75,19 @@ class APIError(requests.exceptions.HTTPError, DockerException): return message @property - def status_code(self): + def status_code(self) -> Optional[int]: if self.response is not None: return self.response.status_code - def is_error(self): + def is_error(self) -> bool: return self.is_client_error() or self.is_server_error() - def is_client_error(self): + def is_client_error(self) -> bool: if self.status_code is None: return False return 400 <= self.status_code < 500 - def is_server_error(self): + def is_server_error(self) -> bool: if self.status_code is None: return False return 500 <= self.status_code < 600 @@ -118,10 +122,10 @@ class DeprecatedMethod(DockerException): class TLSParameterError(DockerException): - def __init__(self, msg): + def __init__(self, msg: str) -> None: self.msg = msg - def __str__(self): + def __str__(self) -> str: return self.msg + (". TLS configurations should map the Docker CLI " "client configurations. See " "https://docs.docker.com/engine/articles/https/ " @@ -136,7 +140,7 @@ class ContainerError(DockerException): """ Represents a container that has exited with a non-zero exit code. """ - def __init__(self, container, exit_status, command, image, stderr): + def __init__(self, container: Container, exit_status: int, command: Union[str, List[str]], image: str, stderr: Optional[str]) -> None: self.container = container self.exit_status = exit_status self.command = command @@ -151,7 +155,7 @@ class ContainerError(DockerException): class StreamParseError(RuntimeError): - def __init__(self, reason): + def __init__(self, reason: Any) -> None: self.msg = reason @@ -178,32 +182,32 @@ def create_unexpected_kwargs_error(name, kwargs): class MissingContextParameter(DockerException): - def __init__(self, param): + def __init__(self, param: str) -> None: self.param = param - def __str__(self): + def __str__(self) -> str: return (f"missing parameter: {self.param}") class ContextAlreadyExists(DockerException): - def __init__(self, name): + def __init__(self, name: str) -> None: self.name = name - def __str__(self): + def __str__(self) -> str: return (f"context {self.name} already exists") class ContextException(DockerException): - def __init__(self, msg): + def __init__(self, msg: str) -> None: self.msg = msg - def __str__(self): + def __str__(self) -> str: return (self.msg) class ContextNotFound(DockerException): - def __init__(self, name): + def __init__(self, name: str) -> None: self.name = name - def __str__(self): + def __str__(self) -> str: return (f"context '{self.name}' not found") diff --git a/docker/tls.py b/docker/tls.py index a4dd0020..4c762dfa 100644 --- a/docker/tls.py +++ b/docker/tls.py @@ -1,7 +1,9 @@ import os import ssl +from typing import Tuple, Optional, Union from . import errors +from .api.client import APIClient from .transport import SSLHTTPAdapter @@ -26,9 +28,9 @@ class TLSConfig: verify = None ssl_version = None - def __init__(self, client_cert=None, ca_cert=None, verify=None, - ssl_version=None, assert_hostname=None, - assert_fingerprint=None): + def __init__(self, client_cert: Optional[Tuple[str, str]] = None, ca_cert: Optional[str] = None, verify: Optional[Union[bool, str]] = None, + ssl_version: Optional[int] = None, assert_hostname: Optional[bool] = None, + assert_fingerprint: Optional[bool] = None) -> None: # Argument compatibility/mapping with # https://docs.docker.com/engine/articles/https/ # This diverges from the Docker CLI in that users can specify 'tls' @@ -73,7 +75,7 @@ class TLSConfig: 'Invalid CA certificate provided for `ca_cert`.' ) - def configure_client(self, client): + def configure_client(self, client: APIClient) -> None: """ Configure a client with these TLS options. """