Source code for hpecp.k8s_cluster

# (C) Copyright [2020] Hewlett Packard Enterprise Development LP
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.

from __future__ import absolute_import

import re
from distutils.version import LooseVersion
from enum import Enum

from requests.structures import CaseInsensitiveDict

from .base_resource import AbstractResource, AbstractWaitableResourceController

try:
    basestring
except NameError:
    basestring = str


[docs]class K8sClusterStatus(Enum): """Bases: enum.Enum The statuses for a K8S Cluster **Note:** The integer values do not have a meaning outside of this library. The API uses a string identifier with the status name rather than an integer value. """ ready = 1 creating = 2 updating = 3 upgrading = 4 deleting = 5 error = 6 warning = 7
[docs]class K8sCluster(AbstractResource): """Create an instance of K8sCluster from json data returned from the HPE Container Platform API. Users of this library are not expected to create an instance of this class. Parameters ---------- json : str The json returned by the API representing a K8sCluster. Returns ------- K8sCluster: An instance of K8sCluster """ all_fields = [ "id", "name", "description", "k8s_version", "addons", "created_by_user_id", "created_by_user_name", "created_time", "k8shosts_config", "admin_kube_config", "dashboard_token", "api_endpoint_access", "dashboard_endpoint_access", "cert_data", "status", "status_message", "_links", ] """All of the fields of a K8s Cluster objects that are returned by the HPE Container Platform API""" default_display_fields = [ "id", "name", "description", "k8s_version", "status", ] @property def name(self): """@Field: from json['label']['name']""" return self.json["label"]["name"] @property def description(self): """@Field: from json['label']['description']""" return self.json["label"]["description"] @property def k8s_version(self): """@Field: from json['k8s_version']""" return self.json["k8s_version"] @property def addons(self): """@Field: from json['addons']""" if "addons" in self.json: return self.json["addons"] else: return "" @property def created_by_user_id(self): """@Field: from json['created_by_user_id']""" return self.json["created_by_user_id"] @property def created_by_user_name(self): """@Field: from json['created_by_user_name']""" return self.json["created_by_user_name"] @property def created_time(self): """@Field: from json['created_time']""" return self.json["created_time"] @property def k8shosts_config(self): """@Field: from json['k8shosts_config']""" return self.json["k8shosts_config"] @property def admin_kube_config(self): """@Field: from json['admin_kube_config']""" if "admin_kube_config" in self.json: return self.json["admin_kube_config"] else: return "" @property def dashboard_token(self): """@Field: from json['dashboard_token']""" if "dashboard_token" in self.json: return self.json["dashboard_token"] else: return "" @property def api_endpoint_access(self): """@Field: from json['api_endpoint_access']""" if "api_endpoint_access" in self.json: return self.json["api_endpoint_access"] else: return "" @property def dashboard_endpoint_access(self): """@Field: from json['dashboard_endpoint_access']""" if "dashboard_endpoint_access" in self.json: return self.json["dashboard_endpoint_access"] else: return "" @property def cert_data(self): """@Field: from json['cert_data'] or None if cert_data not available""" try: return self.json["cert_data"] except KeyError: return None @property def status(self): """@Field: from json['status']""" return self.json["status"] @property def status_message(self): """@Field: from json['status_message']""" if "status_message" in self.json: return self.json["status_message"] else: return ""
[docs]class K8sClusterHostConfig: """Object to represent a pair of `host node` and the `role` of the host - `master` or `worker`. """
[docs] @classmethod def create_from_list(cls, noderole): """Factory method to create K8sClusterHostConfig from a list. Parameters ---------- noderole: list the noderole must only have two values: [ node, role ] See Also -------- See :py:meth:`K8sClusterHostConfig` for the allowed node and role values. """ assert ( len(noderole) == 2 ), "'noderole' list must have two values [ node, role ]" return K8sClusterHostConfig(node=noderole[0], role=noderole[1])
def __init__(self, node, role): assert isinstance(node, basestring), "'node' must be an string" assert re.match( r"\/api\/v2\/worker\/k8shost\/[0-9]+", node ), "'node' must have format '/api/v2/worker/k8shost/[0-9]+'" assert role in [ "master", "worker", ], "'role' must one of ['master, worker']" self.node = node self.role = role
[docs] def to_dict(self): """Returns a dict representation of the object. Returns ------- dict Example ------- >>> .to_dict() { 'node': '/api/v2/worker/k8shost/12', 'role': 'master' } """ return {"node": self.node, "role": self.role}
[docs]class K8sClusterController(AbstractWaitableResourceController): """Class for interacting with K8S Clusters. An instance of this class is available in the client.ContainerPlatformClient with the attribute name :py:attr:`k8s_cluster <.client.ContainerPlatformClient.k8s_cluster>`. The methods of this class can be invoked using `client.k8s_cluster.method()`. See the example below: Example ------- >>> client = ContainerPlatformClient(...).create_session() >>> client.k8s_cluster.list() """ base_resource_path = "/api/v2/k8scluster" resource_list_path = "k8sclusters" resource_class = K8sCluster status_class = K8sClusterStatus status_fieldname = "status"
[docs] def create( self, name=None, description=None, k8s_version=None, pod_network_range="10.192.0.0/12", service_network_range="10.96.0.0/12", pod_dns_domain="cluster.local", persistent_storage_local=False, persistent_storage_nimble_csi=False, k8shosts_config=[], addons=[], external_identity_server={}, external_groups=[], datafabric=False, datafabric_name=None, ): """Send an API request to create a K8s Cluster. The cluster creation will be asynchronous - use the :py:meth:`wait_for_status` method to wait for the cluster to be created. For the list of possible statuses see :py:class:`K8sClusterStatus`. Parameters ---------- name: str Cluster name - required parameter. Name must be at least 1 character description: str Cluster description - defaults to empty string if not provided k8s_version: str Kubernetes version to configure. If not specified defaults to the latest version as supported by the rpms. pod_network_range: str Network range to be used for kubernetes pods. Defaults to `10.192.0.0/12` addons: list Addons - See :py:method:`get_available_addons`. service_network_range: str Network range to be used for kubernetes services that are exposed with Cluster IP. Defaults to `10.96.0.0/12` pod_dns_domain: str DNS Domain to be used for kubernetes pods. Defaults to `cluster.local` persistent_storage_local: str Enables local host storage to be available in the kubernetes cluster persistent_storage_nimble_csi: bool Set to True to installs the Nimble CSI plugin for Nimble storage to be available in the kubernetes cluster k8shosts_config: list[K8sClusterHostConfig] list of :py:class:`K8sClusterHostConfig` objects determining which hosts to add and their role (master or worker) external_identity_server: dict Example { "bind_pwd":"password", "user_attribute":"CN", "bind_type":"search_bind", "bind_dn":"cn=Administrator,CN=Users,DC=samdom,DC=example,DC=com", "host":"10.1.0.15", "group_attribute":"member", "security_protocol":"ldaps", "base_dn":"CN=Users,DC=samdom,DC=example,DC=com", "verify_peer":false, "type":"Active Directory", "port":636} datafabric: bool datafabric_name: str optional Returns ------- str K8s Cluster ID with the format: '/api/v2/k8scluster/[0-9]+' Raises ------ APIException """ assert ( isinstance(name, basestring) and len(name) > 0 ), "'name' must be provided and must be a string" assert description is None or isinstance( description, basestring ), "'description' if provided, must be a string" assert k8s_version is None or isinstance( k8s_version, basestring ), "'k8s_version' if provided, must be a string" assert isinstance( pod_network_range, basestring ), "'pod_network_range' must be a string" assert isinstance( service_network_range, basestring ), "'service_network_range' must be a string" assert isinstance( pod_dns_domain, basestring ), "'pod_dns_domain' must be a string" assert isinstance( persistent_storage_local, bool ), "'persistent_storage_local' must be True or False" assert isinstance( persistent_storage_nimble_csi, bool ), "'persistent_storage_nimble_csi' must be True or False" assert isinstance( k8shosts_config, list ), "'k8shosts_config' must be a list" assert ( len(k8shosts_config) > 0 ), "'k8shosts_config' must have at least one item" for i, conf in enumerate(k8shosts_config): assert isinstance(conf, K8sClusterHostConfig), ( "'k8shosts_config' item '{}' is not of" " type K8sClusterHostConfig" ).format(i) assert isinstance(addons, list), "'addons' must be a list" assert isinstance( external_identity_server, dict ), "'external_identity_server' must be a dict" data = { "label": {"name": name}, "pod_network_range": pod_network_range, "service_network_range": service_network_range, "pod_dns_domain": pod_dns_domain, "addons": addons, "k8shosts_config": [c.to_dict() for c in k8shosts_config], } if description is not None: data["label"]["description"] = description if k8s_version is not None: data["k8s_version"] = k8s_version if external_identity_server: data["external_identity_server"] = external_identity_server if external_groups: data["external_groups"] = external_groups if datafabric: data["datafabric"] = True data["datafabric_name"] = datafabric_name if LooseVersion( self.client.config.get()["objects"]["bds_global_version"] ) <= LooseVersion("5.1"): # persistent_storage was deprecated after 5.1 data["persistent_storage"] = ( { "local": persistent_storage_local, "nimble_csi": persistent_storage_nimble_csi, }, ) response = self.client._request( url="/api/v2/k8scluster", http_method="post", data=data, description="k8s_cluster/create", ) return CaseInsensitiveDict(response.headers)["Location"]
[docs] def get(self, id, params={}, setup_log=False): """Retrieve a K8s Cluster. Parameters ---------- id: str The k8s cluster ID """ if setup_log is True: params["setup_log"] = "true" return super(K8sClusterController, self).get(id=id, params=params)
[docs] def k8smanifest(self): """Retrieve the k8smanifest. Returns ------- json K8sManifest Raises ------ APIException """ response = self.client._request( url="/api/v2/k8smanifest", http_method="get", description="k8s_cluster/k8smanifest", ) return response.json()
[docs] def k8s_supported_versions(self): """Retrieve list of K8S Supported Versions. Returns ------- list[str] List of K8s Supported Versions Raises ------ APIException """ return self.k8smanifest()["supported_versions"]
[docs] def get_available_addons(self, id=None, k8s_version=None): """Retrieve list of K8S Supported Versions. Parameters ---------- id: str The k8s cluster ID Returns ------- list[str] List of available addons Raises ------ APIException """ assert ( id is not None or k8s_version is not None ), "Either 'id' or 'k8s_version' parameter must be provided" assert ( id is None or k8s_version is None ), "Either 'id' or 'k8s_version' parameter must be provided" if id: k8s_version = self.get(id).k8s_version return self.k8smanifest()["version_info"][k8s_version]["addons"]
[docs] def add_addons(self, id, addons=[]): """Retrieve list of K8S Supported Versions. Parameters ---------- id: str The k8s cluster ID addons: list The list of addons to add. Raises ------ APIException """ # TODO assert ID is provided and valid assert ( isinstance(addons, list) and len(addons) > 0 ), "'Addons' parameter must be a list and have at least one entry." current_addons = self.get(id).addons required_addons = current_addons + addons # de-duplicate required_addons = list(dict.fromkeys(required_addons)) data = { "change_spec": {"addons": required_addons}, "operation": "reconfigure", "reason": "", } self.client._request( url="{}/change_task".format(id), http_method="post", description="k8s_cluster/add_addons", data=data, )
[docs] def upgrade_cluster( self, id, k8s_upgrade_version, worker_upgrade_percent=20 ): """Upgrade a cluster. TODO Returns ------- TODO Raises ------ APIException """ data = { "change_spec": { "k8s_upgrade": { "worker_upgrade_percent": worker_upgrade_percent, "k8s_upgrade_version": k8s_upgrade_version, } }, "operation": "reconfigure", "reason": "Kubernetes upgrade", } response = self.client._request( url="{}/change_task".format(id), http_method="post", description="K8sClusterController/upgrade_cluster", data=data, ) return response.json()
[docs] def import_cluster( self, cluster_type, name, description, pod_dns_domain, server_url, ca, bearer_token, ): """Import a k8s cluster. TODO Returns ------- TODO Raises ------ APIException """ assert cluster_type in [ "generic", "eks", "aks", "gke", "pks", ], 'cluster_type must be either "generic", "eks", "aks", "gke", "pks"' data = { "label": {"name": name, "description": description}, "pod_dns_domain": pod_dns_domain, "type": cluster_type, "sysadmin_data": { "server_url": server_url, "ca": ca, "bearer_token": bearer_token, }, } response = self.client._request( url="/api/v2/k8scluster/import", http_method="post", description="K8sClusterController/import_generic_cluster", data=data, ) return CaseInsensitiveDict(response.headers)["Location"]
[docs] def import_generic_cluster( self, name, description, pod_dns_domain, server_url, ca, bearer_token ): return self.import_cluster( "generic", name, description, pod_dns_domain, server_url, ca, bearer_token, )
[docs] def import_generic_cluster_with_json(self, json): """Import a generic k8s cluster. TODO Returns ------- TODO Raises ------ APIException """ response = self.client._request( url="/api/v2/k8scluster/import", http_method="post", description=( "K8sClusterController/" "import_generic_cluster_with_json" ), data=json, ) return CaseInsensitiveDict(response.headers)["Location"]
[docs] def run_kubectl_command(self, id, op="apply", yaml=""): """Run a kubectl command on k8s cluster. Parameters ------ id: str The k8s cluster ID (i.e., '/api/v2/k8s_cluster/1') op: str op can be either 'create', 'apply' or 'delete' yaml: str base64 encoding of the yaml file Returns ------- TODO Raises ------ APIException """ assert isinstance( id, basestring ), "ID must be provided in /api/v2/k8scluster/<int> format" assert op in [ "create", "apply", "delete", ], 'op must be either "create", "apply", "delete"' # TODO: assert yaml is a valid base64 encoded string of a yaml data = {"op": op, "data": yaml} response = self.client._request( url="{}/kubectl".format(id), http_method="post", description=("K8sClusterController/" "run_kubectl_command"), data=data, ) return response.text