python 调用 k8s api
k8s api 文档地址:Documentation for API Endpoints
1. 使用 Bearer token
python
from kubernetes import client
class K8S():
def __init__(self, cloud, env='test'):
"""
cloud: aliyun/tencent/aws
env: test/prod
"""
self.app = Sanic.get_app()
key_server = f'k8s_server_{cloud}_{env}'.upper()
key_token = f'k8s_token_{cloud}_{env}'.upper()
api_server = self.app.config[key_server]
api_token = self.app.config[key_token]
api_config = client.Configuration()
api_config.host = api_server
api_config.verify_ssl = False
api_config.api_key = {'authorization': 'Bearer ' + api_token}
api_client = client.ApiClient(api_config)
self.core_api = client.CoreV1Api(api_client)
self.apps_api = client.AppsV1Api(api_client)
def get_deployment_body(self, namespace, deployment_name):
"""获取 deployment yaml"""
body = self.apps_api.read_namespaced_deployment(
deployment_name, namespace)
return body
def get_all_namespace(self):
"""获取所有命名空间"""
res = self.core_api.list_namespace()
# namespace = [item.metadata.name for item in res.items]
namespace = [{
'name': item.metadata.name,
'desc': item.metadata.annotations.get('description', '')
if item.metadata.annotations else '',
} for item in res.items]
return namespace
# ...
- 我是在 sanic 框架中使用的,server 地址和 token 均从配置文件中获取
- token 的创建请参照 k8s 创建 api token
- 参考:kubernetes-client/python/examples/remote_cluster.py
2. 使用 kubeconfig 配置文件
python
cluster_info = {
'tencent': {
'test': 'cls-io******',
'prod': 'cls-o9******',
},
'aliyun': {
'test': 'aliyun-test',
'prod': 'aliyun-prod',
},
'aws': {
'test': 'arn:aws:eks:us-east-1:9959********:cluster/'
'test-xxx-eks',
'prod': 'arn:aws:eks:us-west-1:9959********:cluster/'
'prod-xxx-eks',
},
}
class K8S():
def __init__(self, cloud, env='test'):
cluster_name = cluster_info[cloud][env]
self.config_context = self.get_config_context(cluster_name)
config.load_kube_config(context=self.config_context)
self.core_api = client.CoreV1Api()
self.apps_api = client.AppsV1Api()
def get_config_context(self, cluster_name):
"""获取集群配置"""
contexts, active_context = config.list_kube_config_contexts()
if not contexts:
raise Exception('未找到 k8s 相关的配置')
cluster_contexts = {
context['context']['cluster']: context['name']
for context in contexts}
if cluster_name not in cluster_contexts:
raise Exception(f'未找到 {cluster_name} 集群的配置')
return cluster_contexts[cluster_name]
# ...
kubeconfig
配置文件存储于~/.kube/
路径- 环境变量设置,我是配置于
~/.bashrc
文件,多个配置文件使用冒号分隔:bashexport KUBECONFIG=$HOME/.kube/aliyun-prod-config:$HOME/.kube/aliyun-test-config
- 参考:kubernetes-client/python/examples/pod_config_list.py