194 lines
6.4 KiB
Python
194 lines
6.4 KiB
Python
#!/usr/local/sbin/charm-env python3
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from typing import List, Union
|
|
from charms.reactive import Endpoint, toggle_flag, set_flag, data_changed
|
|
|
|
from charmhelpers.core import hookenv, unitdata
|
|
|
|
try:
|
|
from .models import Taint, Label, DecodeError
|
|
except ImportError:
|
|
# when this code is under test...it's not installed in a package
|
|
# so catching this exception is simply for the test framework
|
|
from models import Taint, Label, DecodeError
|
|
|
|
DB = unitdata.kv()
|
|
|
|
|
|
class KubeControlProvider(Endpoint):
|
|
"""
|
|
Implements the kubernetes-control-plane side of the kube-control interface.
|
|
"""
|
|
|
|
DecodeError = DecodeError
|
|
|
|
def manage_flags(self):
|
|
toggle_flag(self.expand_name("{endpoint_name}.connected"), self.is_joined)
|
|
toggle_flag(
|
|
self.expand_name("{endpoint_name}.gpu.available"),
|
|
self.is_joined and self._get_gpu(),
|
|
)
|
|
requests_data_id = self.expand_name("{endpoint_name}.requests")
|
|
requests = self.auth_user()
|
|
if data_changed(requests_data_id, requests):
|
|
set_flag(self.expand_name("{endpoint_name}.requests.changed"))
|
|
|
|
def set_dns(self, port, domain, sdn_ip, enable_kube_dns):
|
|
"""
|
|
Send DNS info to the remote units.
|
|
|
|
We'll need the port, domain, and sdn_ip of the dns service. If
|
|
sdn_ip is not required in your deployment, the units private-ip
|
|
is available implicitly.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish_raw.update(
|
|
{
|
|
"port": port,
|
|
"domain": domain,
|
|
"sdn-ip": sdn_ip,
|
|
"enable-kube-dns": enable_kube_dns,
|
|
}
|
|
)
|
|
|
|
def auth_user(self):
|
|
"""
|
|
Return the kubelet_user value on the wire from the requestors.
|
|
"""
|
|
requests = []
|
|
|
|
for unit in self.all_joined_units:
|
|
requests.append(
|
|
(
|
|
unit.unit_name,
|
|
{
|
|
"user": unit.received_raw.get("kubelet_user"),
|
|
"group": unit.received_raw.get("auth_group"),
|
|
},
|
|
)
|
|
)
|
|
|
|
requests.sort()
|
|
return requests
|
|
|
|
def sign_auth_request(self, scope, user, kubelet_token, proxy_token, client_token):
|
|
"""
|
|
Send authorization tokens to the requesting unit.
|
|
"""
|
|
cred = {
|
|
"scope": scope,
|
|
"kubelet_token": kubelet_token,
|
|
"proxy_token": proxy_token,
|
|
"client_token": client_token,
|
|
}
|
|
|
|
if not DB.get("creds"):
|
|
DB.set("creds", {})
|
|
|
|
all_creds = DB.get("creds")
|
|
all_creds[user] = cred
|
|
DB.set("creds", all_creds)
|
|
|
|
for relation in self.relations:
|
|
relation.to_publish.update({"creds": all_creds})
|
|
|
|
def clear_creds(self):
|
|
"""
|
|
Clear creds from the relation. This is used by non-leader units to stop
|
|
advertising creds so that the leader can assume full control of them.
|
|
"""
|
|
DB.unset("creds")
|
|
for relation in self.relations:
|
|
relation.to_publish_raw["creds"] = ""
|
|
|
|
def _get_gpu(self):
|
|
"""
|
|
Return True if any remote worker is gpu-enabled.
|
|
"""
|
|
for unit in self.all_joined_units:
|
|
if unit.received_raw.get("gpu") == "True":
|
|
hookenv.log("Unit {} has gpu enabled".format(unit))
|
|
return True
|
|
|
|
return False
|
|
|
|
def set_cluster_tag(self, cluster_tag):
|
|
"""
|
|
Send the cluster tag to the remote units.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish_raw.update({"cluster-tag": cluster_tag})
|
|
|
|
def set_registry_location(self, registry_location):
|
|
"""
|
|
Send the registry location to the remote units.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish_raw.update({"registry-location": registry_location})
|
|
|
|
def set_cohort_keys(self, cohort_keys):
|
|
"""
|
|
Send the cohort snapshot keys.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish["cohort-keys"] = cohort_keys
|
|
|
|
def set_default_cni(self, default_cni):
|
|
"""
|
|
Send the default CNI. The default_cni value should be a string
|
|
containing the name of a related CNI application to use as the
|
|
default CNI. For example: "flannel" or "calico". If no default has
|
|
been chosen then "" can be sent instead.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish["default-cni"] = default_cni
|
|
|
|
def set_api_endpoints(self, endpoints):
|
|
"""
|
|
Send the list of API endpoint URLs to which workers should connect.
|
|
"""
|
|
endpoints = sorted(endpoints)
|
|
for relation in self.relations:
|
|
relation.to_publish["api-endpoints"] = endpoints
|
|
|
|
def set_has_xcp(self, has_xcp):
|
|
"""
|
|
Set the flag indicating that an external cloud provider is in use.
|
|
"""
|
|
for relation in self.relations:
|
|
relation.to_publish["has-xcp"] = bool(has_xcp)
|
|
|
|
def set_controller_taints(
|
|
self, taints: List[Union[Taint, str]]
|
|
) -> "KubeControlProvider":
|
|
"""
|
|
Sends the juju config taints of the control-plane.
|
|
"""
|
|
taints = [str(_) for _ in taints if Taint.valid(_)]
|
|
dedup = sorted(set(taints))
|
|
for relation in self.relations:
|
|
relation.to_publish["taints"] = dedup
|
|
return self
|
|
|
|
def set_controller_labels(
|
|
self, labels: List[Union[Label, str]]
|
|
) -> "KubeControlProvider":
|
|
"""
|
|
Sends the juju config labels of the control-plane.
|
|
"""
|
|
labels = [str(_) for _ in labels if Label.valid(_)]
|
|
dedup = sorted(set(labels))
|
|
for relation in self.relations:
|
|
relation.to_publish["labels"] = dedup
|
|
return self
|