import os
import subprocess
import requests
import time
import json
import pykube
from time import gmtime, strftime
import re
import click
# JSON files with available metrics
standard_node_metrics = json.load(open(os.path.join(os.path.dirname(__file__),'node_metrics.json')))
standard_pod_metrics = json.load(open(os.path.join(os.path.dirname(__file__),'pod_metrics.json')))
# Auxiliary Functions.
[docs]def start_proxy():
"""
Starts local proxy to Kubernetes cluster, host: 127.0.0.1:8001
"""
bash_proxy = "kubectl proxy &"
subprocess.call(['bash', '-c', bash_proxy])
time.sleep(2)
[docs]def check_rule(rule,typ):
"""
Checks the rules are well written.
Format rule: \"metric_greater|lower_limit_scale|reduce\"
Parameters:
- rule: str
Rule to check.
- type: str
Rule type, can be for node or pod
Returns:
- check: bool
True if the rule has correct format.
"""
rule=rule.split("_")
check = False
if typ=="pod":
if len(rule)==4 and rule[0] in standard_pod_metrics \
and (rule[1]=="greater" or rule[1]=="lower") and rule[2].isdigit()\
and (rule[3]=="scale" or rule[3]=="reduce") :
# or len(list(filter(re.compile("and-.*").search, list(rule[3])))) > 0):
check=True
if typ=="node":
if len(rule)==4 and rule[0] in standard_node_metrics \
and (rule[1]=="greater" or rule[1]=="lower") and rule[2].isdigit()\
and (rule[3]=="scale" or rule[3]=="reduce") :
#len(list(filter(re.compile("and-.*").search, list(rule[3])))) > 0):
check=True
return check
[docs]def get_num_nodes():
"""
Returns number of active nodes.
Returns:
- num_nodes: int
Number of current nodes.
"""
try:
num_nodes=len(requests.get('http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/').json())
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S",
gmtime()) + " [ERROR] Error to get active nodes.")
num_nodes=0
return num_nodes
[docs]def get_mean(metric):
"""
Calculates the arithmetic mean of a metric.
Parameters:
- metric: dict
Dictionary with status metrics.
Returns:
- mean: float
Arithmetic mean.
"""
mean=0
cont=0
for i in metric:
if isinstance(i,dict) and 'value' in i and str(i['value']).isdigit():
mean+=float(i['value'])
cont+=1
if cont > 0:
mean=round(mean/cont, 2)
return mean
# Get Labels Functions.
[docs]def get_metrics(labels, typ):
"""
Get metrics from a dictionary of labels.
Parameters:
- labels: dict
Dictionary with all metrics.
- typ: str
Metrics type, "pod" or "cluster".
Returns:
- metrics: str lst
List with metrics to monitor.
"""
metrics=[]
if typ=="pod":
available_metrics=list(standard_pod_metrics.keys())
if typ=="cluster":
available_metrics=list(standard_node_metrics.keys())
if "all-metrics" in labels and \
labels["all-metrics"].lower() == "true":
metrics = available_metrics
else:
for i in list(filter(re.compile("metric-.*").search, labels)):
if labels[i] in available_metrics:
metrics.append(labels[i])
else:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Wrong value for " + str(i))
return metrics
[docs]def get_rules(labels, name):
"""
Get rules from a dictionary of labels.
Parameters:
- labels: dict
Dictionary with all rules.
- name: str
Stateful Set name.
Returns:
- rules: str list
List with all rules to apply.
"""
rules=[]
for i in list(filter(re.compile("rule-.*").search, labels)):
rule = labels[i]
check = check_rule(rule, "pod")
if check:
rules.append(rule)
else:
click.echo(strftime("%Y-%m-%d %H:%M:%S",
gmtime()) + " [ERROR] Wrong value for " + i + " of Stateful Set " + name + ": " + rule)
return rules
[docs]def get_cluster_labels(cluster_info):
"""
Gets cluster information.
Returns information about the number of nodes and their limits,
node autoscale function and labels.
Parameters:
- cluster_info: dict
Dictionary with all cluster information.
Returns:
- autoscale: bool
True if node autoscale is active.
- max_nodes: int
Maximum number of allowed nodes.
- min_nodes: int
Minimum number of allowed nodes.
- metrics: list
List of cluster metrics to monitor.
"""
metrics = []
autoscale = False
max_nodes = 0
min_nodes= 0
# rules = []
# overscaler = False
try:
if isinstance(cluster_info["nodePools"][0]["autoscaling"]["enabled"],bool) \
and cluster_info["nodePools"][0]["autoscaling"]["minNodeCount"]>0 \
and cluster_info["nodePools"][0]["autoscaling"]["maxNodeCount"]>=cluster_info["nodePools"][0]["autoscaling"]["minNodeCount"]:
autoscale = cluster_info["nodePools"][0]["autoscaling"]["enabled"]
max_nodes = int(cluster_info["nodePools"][0]["autoscaling"]["maxNodeCount"])
min_nodes = int(cluster_info["nodePools"][0]["autoscaling"]["minNodeCount"])
metrics=get_metrics(cluster_info["resourceLabels"],"cluster")
# if "overscaler" in list(cluster_info["resourceLabels"].keys())\
# and cluster_info["resourceLabels"]["overscaler"] == "true":
# overscaler = True
# if len(list(filter(re.compile("rule-.*").search, list(cluster_info["resourceLabels"].keys())))) > 0:
# for i in list(filter(re.compile("rule-.*").search, list(cluster_info["resourceLabels"].keys()))):
# rule = cluster_info["resourceLabels"][i]
# check=check_rule(rule,"node")
# if check:
# rules.append(rule)
# else:
# click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Wrong built rule: " +str(rule))
# else:
# click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Cluster labels without rules or with rules incorrectly created.")
except:
metrics = []
autoscale = False
max_nodes = 0
min_nodes = 0
# rules = []
# overscaler = False
click.echo(strftime("%Y-%m-%d %H:%M:%S",
gmtime()) + " [ERROR] Cluster without labels or with labels incorrectly created.")
# return autoscale, max_nodes,min_nodes, overscaler, metrics, rules
return autoscale, max_nodes,min_nodes, metrics
[docs]def get_statefulset_labels(statefulset_info):
"""
Gets Stateful Set information.
Returns information about labels, metrics and rules.
Parameters:
- statefulset_info: dict
Dictionary with all Stateful Set information.
Returns:
- statefulset_labels: dict
Dictionary with only the information needed for the overscaler.
Returned dict format:
{
statefulset_name1:{
overscaler: bool,
Is overscaler active?
current-count:int,
Autoscale pause counter.
autoscaler-count: int number,
Number of waiting cycles after rescalling.
max-replicas: int,
Maximum number of replicas.
min-replicas: int,
Minimum number of replicas.
metrics: [str, str...],
List with all metrics to monitor.
rules: [str,str...]
List with all rules for this Stateful Set.
...
}
statefulset_name2:{
...
}
...
}
"""
statefulset_labels = {}
try:
for s in statefulset_info['items']:
rules = []
overscaler = False
current_count = 0
autoscaler_count = 0
max_replicas = 0
min_replicas = 0
try:
name = s["metadata"]["labels"]["app"]
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [INFO] Getting metrics for " + name)
metrics=get_metrics(s["metadata"]["labels"],"pod")
try:
if s["metadata"]["labels"]["overscaler"] == "true" and int(s["metadata"]["labels"]["min-replicas"])>0 \
and int(s["metadata"]["labels"]["max-replicas"]) >= int(s["metadata"]["labels"]["min-replicas"]) \
and int(s["metadata"]["labels"]["autoscaler-count"])>=0 and int(s["metadata"]["labels"]["current-count"])>=0 \
and s["metadata"]["labels"]["rescaling"] in ["true","false"]:
overscaler = True
current_count = int(s["metadata"]["labels"]["current-count"])
autoscaler_count = int(s["metadata"]["labels"]["autoscaler-count"])
max_replicas = int(s["metadata"]["labels"]["max-replicas"])
min_replicas = int(s["metadata"]["labels"]["min-replicas"])
rules=get_rules(s["metadata"]["labels"],name)
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [INFO POD] Stateful Set labels obtained correctly: " + name)
elif s["metadata"]["labels"]["overscaler"] != "true":
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Overscaler label is not true in "+name+". Autoscale off.")
else:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Wrong value for overscaler labels of Stateful Set "+name+". Autoscale off.")
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime())+" [ERROR] Error to get labels for "+name+". Are all overscaler labels correctly written?")
if overscaler==True or metrics:
statefulset_labels[name] = {'overscaler': overscaler, 'current-count': current_count,
'autoscaler-count': autoscaler_count, 'max-replicas': max_replicas,
'min-replicas': min_replicas, 'metrics': metrics, 'rules': rules}
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S",
gmtime()) + " [ERROR] Stateful Set without name")
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S",
gmtime()) + " [ERROR] Stateful Set info empty")
return statefulset_labels
# Get Status Functions.
[docs]def get_node_status(metrics):
"""
Gets Node status.
Returns information about state of all nodes.
Parameters:
- metrics: str list
List of metrics to monitor.
Returns:
- node_status: dict
Dictionary with all the information.
Returned dict format:
{
node_name1:{
metric-1: float,
Metric-1 value.
metric-2: float,
Metric-2 value.
...
}
node_name2:{
...
}
...
}
"""
try:
node_status = {}
nodes = requests.get(
'http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/').json()
max_node_memory = float(requests.get(
'http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/' + nodes[0]+
'/metrics/memory/node_allocatable').json()["metrics"][0]["value"])
max_node_cpu = float(requests.get(
'http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/' + nodes[0] +
'/metrics/cpu/node_allocatable').json()["metrics"][0]["value"])
for i in nodes:
status={}
try:
for j in metrics:
if j == "memory-usage-percent":
memory_usage = requests.get('http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/'+i+'/metrics/memory/working_set').json()["metrics"]
status[j] = round(get_mean(memory_usage)/max_node_memory *100,2)
elif j == "cpu-usage-percent":
cpu_usage = requests.get('http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/'+i+'/metrics/cpu/usage_rate').json()["metrics"]
status[j] = round(get_mean(cpu_usage)/max_node_cpu*100,2)
elif j in standard_node_metrics:
status[j] = get_mean(requests.get('http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/nodes/'+i+'/metrics/'+standard_node_metrics[j]).json()["metrics"])
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime())+" [ERROR] Error to get status for node: " +i)
status={}
node_status[i]=status
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to get status for cluster")
node_status={}
max_node_cpu=0
max_node_memory=0
return node_status, max_node_cpu, max_node_memory
[docs]def get_pod_status(api,namespace,statefulset_labels,memory_allocatable, cpu_allocatable):
"""
Gets Pod status.
Returns information about state of all stateful set pods.
Parameters:
- api: pykube.http.HTTPClient
Http client for requests to Kubernetes Api.
- namespace: str
Project namespace.
- statefulset_lables: dict
Dict with metrics for each stateful set.
- memory_allocatable: int
Maximum memory allowed per node, expressed in bytes.
- cpu_allocatable: int
Maximum memory allowed per node, expressed in minicores.
Returns:
- pod_status: dict
Dictionary with all the information.
Returned dict format:
{
node_name1:{
pod-name1:{
metric-1: float,
Metric-1 value.
metric-2: float,
Metric-2 value.
...
}
pod-name2:{
...
}
}
node_name2:{
...
}
...
}
"""
pre_set = pykube.Pod.objects(api)
pod_status = {}
if statefulset_labels:
try:
for i in pre_set.response['items']:
node_name = i['spec']['nodeName']
pod_name=i['metadata']['name']
if not node_name in pod_status.keys():
pod_status[node_name]={}
if pod_name.rsplit("-",1)[0] in statefulset_labels:
metrics= statefulset_labels[pod_name.rsplit("-",1)[0]]['metrics']
status={}
for j in metrics:
try:
if j == "memory-usage-percent":
memory_usage = requests.get("http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/namespaces/"+namespace+"/pods/" + pod_name + "/metrics/memory/working_set").json()["metrics"]
status[j] = round(get_mean(memory_usage) / memory_allocatable * 100, 2)
elif j == "cpu-usage-percent":
cpu_usage = requests.get("http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/namespaces/"+namespace+"/pods/" + pod_name + "/metrics/cpu/usage_rate").json()["metrics"]
status[j] = round(get_mean(cpu_usage) / cpu_allocatable * 100, 2)
elif j in standard_pod_metrics:
status[j]=get_mean(requests.get("http://localhost:8001/api/v1/namespaces/kube-system/services/heapster/proxy/api/v1/model/namespaces/"+namespace+"/pods/" + pod_name + "/metrics/"+standard_pod_metrics[j]).json()["metrics"])
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to get status for: "+pod_name)
status={}
pod_status[node_name][pod_name]=status
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to get status pods.")
return pod_status
# Actions functios.
[docs]def actions(api,namespace, pod_status, statefulset_labels, max_nodes):
"""
Decision making based on pods status and stateful set rules.
Parameters:
- api: pykube.http.HTTPClient
Http client for requests to Kubernetes Api.
- namespace: str
Project namespace.
- pod_status: dict
Dictionary with status pod information.
- statefulset_lables: dict
Dict with metrics and rules of each stateful set.
- max_nodes: int
Maximum number of allowed nodes.
"""
for i in pod_status:
for j in pod_status[i]:
statefulset_name=j.rsplit("-",1)[0]
if statefulset_name in statefulset_labels and \
statefulset_labels[statefulset_name]['overscaler']==True:
try:
pre_set = pykube.StatefulSet.objects(api).filter(namespace=namespace).get(name=statefulset_name)
if int(pre_set.labels["current-count"])==0 and pre_set.labels["rescaling"]=="false":
for k in statefulset_labels[statefulset_name]['rules']:
if k.split("_")[0] in pod_status[i][j]:
action=None
if k.split("_")[1] == "greater":
if float(pod_status[i][j][k.split("_")[0]]) >= float(k.split("_")[2]):
if k.split("_")[3] == "scale":
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Is necessary to scale " + j + " for rule: " + k.replace("_", " "))
action="scale"
if k.split("_")[3] == "reduce":
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Is necessary to reduce " + j + " for rule: " + k.replace("_", " "))
action="reduce"
if k.split("_")[1] == "lower":
if float(pod_status[i][j][k.split("_")[0]]) <= float(k.split("_")[2]):
if k.split("_")[3] == "scale":
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Is necessary to scale " + j + " for rule: " + k.replace("_", " "))
action = "scale"
if k.split("_")[3] == "reduce":
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Is necessary to reduce " + j + " for rule: " + k.replace("_", " "))
action = "reduce"
rescale(api, namespace, statefulset_name, action,max_nodes)
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to make decissions for: "+statefulset_name)
[docs]def rescale(api,namespace,statefulset_name,action,max_nodes):
"""
Sets a new number of replicas for a given stateful set.
Parameters:
- api: pykube.http.HTTPClient
Http client for requests to Kubernetes Api.
- namespace: str
Project namespace.
- statefulset_name: dict
Name of the statefulset to be rescaled.
- action: str
Action to be realized. Can be "scale" o "reduce", one pods more or one pod less, respectively.
- max_nodes: dict
Maximum number of allowed nodes.
"""
try:
pre_set = pykube.StatefulSet.objects(api).filter(namespace=namespace).get(name=statefulset_name)
if action is "scale" and pre_set.replicas<max_nodes and pre_set.replicas<int(pre_set.labels['max-replicas']):
pre_set.replicas=pre_set.replicas+1
pre_set.labels["rescaling"]="true"
pre_set.update()
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling " + statefulset_name + "... ")
elif action is "reduce" and pre_set.replicas>1 and pre_set.replicas>int(pre_set.labels['min-replicas']):
pre_set.replicas=pre_set.replicas-1
pre_set.labels["rescaling"]="true"
pre_set.update()
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling " + statefulset_name + "... ")
elif pre_set.replicas==1:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling not possible, there is only one replica for "+statefulset_name)
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Resizing for "+statefulset_name+" is failed")
elif pre_set.replicas>=max_nodes:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling not possible, maximum nodes reached")
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Resizing for "+statefulset_name+" is failed")
elif pre_set.replicas >= int(pre_set.labels['max-replicas']):
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling not possible, maximum replicas reached for "+statefulset_name)
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Resizing for " + statefulset_name + " is failed")
elif pre_set.replicas <= int(pre_set.labels['min-replicas']):
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling not possible, minimum replicas reached for " + statefulset_name)
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Resizing for " + statefulset_name + " is failed")
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to rescale for: " + statefulset_name)
[docs]def update_current_count(api,namespace,statefulsets_labels):
"""
Updates the "current-count" label of all Stateful sets.
If its value is 0, this stateful set is ready to be scaled if is necessary.
Parameters:
- api: pykube.http.HTTPClient
Http client for requests to Kubernetes Api.
- namespace: str
Project namespace.
- statefulset_lables: dict
Dict with metrics and rules of each stateful set.
"""
for i in statefulsets_labels:
if statefulsets_labels[i]['overscaler']==True:
try:
pre_set = pykube.StatefulSet.objects(api).filter(namespace=namespace, field_selector={"metadata.name": i})
if pre_set.response["items"][0]["status"]["currentReplicas"]==pre_set.response["items"][0]["status"]["replicas"] and pre_set.response["items"][0]['metadata']['labels']['rescaling']!="false":
new_statefulset = pykube.StatefulSet.objects(api).filter(namespace=namespace).get(name=i)
new_statefulset.labels["rescaling"] = "false"
new_statefulset.labels["current-count"] = new_statefulset.labels["autoscaler-count"]
new_statefulset.update()
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Rescaling for " + i + " is completed")
if int(pre_set.response["items"][0]['metadata']['labels']['current-count'])>0 and pre_set.response["items"][0]['metadata']['labels']["rescaling"]=="false":
new_statefulset = pykube.StatefulSet.objects(api).filter(namespace=namespace).get(name=i)
new_statefulset.labels["current-count"]=str(int(new_statefulset.labels["current-count"])-1)
new_statefulset.update()
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ACTION] Update current-count for " +i)
except:
click.echo(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + " [ERROR] Error to update \"current-count\" for: " + i)