303 lines
9.2 KiB
Python
303 lines
9.2 KiB
Python
from charms.reactive import clear_flag, is_data_changed, data_changed
|
|
|
|
|
|
class CertificateRequest(dict):
|
|
def __init__(self, unit, cert_type, cert_name, common_name, sans):
|
|
self._unit = unit
|
|
self._cert_type = cert_type
|
|
super().__init__({
|
|
'certificate_name': cert_name,
|
|
'common_name': common_name,
|
|
'sans': sans,
|
|
})
|
|
|
|
@property
|
|
def _key(self):
|
|
return '.'.join((self._unit.relation.relation_id,
|
|
self.unit_name,
|
|
self.common_name))
|
|
|
|
def resolve_unit_name(self, unit):
|
|
"""Return name of unit associated with this request.
|
|
|
|
unit_name should be provided in the relation data to ensure
|
|
compatability with cross-model relations. If the unit name
|
|
is absent then fall back to unit_name attribute of the
|
|
unit associated with this request.
|
|
|
|
:param unit: Unit to extract name from
|
|
:type unit: charms.reactive.endpoints.RelatedUnit
|
|
:returns: Name of unit
|
|
:rtype: str
|
|
"""
|
|
unit_name = unit.received_raw['unit_name']
|
|
if not unit_name:
|
|
unit_name = unit.unit_name
|
|
return unit_name
|
|
|
|
@property
|
|
def unit_name(self):
|
|
"""Name of this unit.
|
|
|
|
:returns: Name of unit
|
|
:rtype: str
|
|
"""
|
|
return self.resolve_unit_name(unit=self._unit).replace('/', '_')
|
|
|
|
@property
|
|
def application_name(self):
|
|
"""Name of the application which the request came from.
|
|
|
|
:returns: Name of application
|
|
:rtype: str
|
|
"""
|
|
return self.resolve_unit_name(unit=self._unit).split('/')[0]
|
|
|
|
@property
|
|
def cert_type(self):
|
|
"""
|
|
Type of certificate, 'server' or 'client', being requested.
|
|
"""
|
|
return self._cert_type
|
|
|
|
@property
|
|
def cert_name(self):
|
|
return self['certificate_name']
|
|
|
|
@property
|
|
def common_name(self):
|
|
return self['common_name']
|
|
|
|
@property
|
|
def sans(self):
|
|
return self['sans']
|
|
|
|
@property
|
|
def _publish_key(self):
|
|
if self.cert_type == 'server':
|
|
return '{}.processed_requests'.format(self.unit_name)
|
|
elif self.cert_type == 'client':
|
|
return '{}.processed_client_requests'.format(self.unit_name)
|
|
raise ValueError('Unknown cert_type: {}'.format(self.cert_type))
|
|
|
|
@property
|
|
def _server_cert_key(self):
|
|
return '{}.server.cert'.format(self.unit_name)
|
|
|
|
@property
|
|
def _server_key_key(self):
|
|
return '{}.server.key'.format(self.unit_name)
|
|
|
|
@property
|
|
def _is_top_level_server_cert(self):
|
|
return (self.cert_type == 'server' and
|
|
self.common_name == self._unit.received_raw['common_name'])
|
|
|
|
@property
|
|
def cert(self):
|
|
"""
|
|
The cert published for this request, if any.
|
|
"""
|
|
cert, key = None, None
|
|
if self._is_top_level_server_cert:
|
|
tpr = self._unit.relation.to_publish_raw
|
|
cert = tpr[self._server_cert_key]
|
|
key = tpr[self._server_key_key]
|
|
else:
|
|
tp = self._unit.relation.to_publish
|
|
certs_data = tp.get(self._publish_key, {})
|
|
cert_data = certs_data.get(self.common_name, {})
|
|
cert = cert_data.get('cert')
|
|
key = cert_data.get('key')
|
|
if cert and key:
|
|
return Certificate(self.cert_type, self.common_name, cert, key)
|
|
return None
|
|
|
|
@property
|
|
def is_handled(self):
|
|
has_cert = self.cert is not None
|
|
same_sans = not is_data_changed(self._key,
|
|
sorted(set(self.sans or [])))
|
|
return has_cert and same_sans
|
|
|
|
def set_cert(self, cert, key):
|
|
rel = self._unit.relation
|
|
if self._is_top_level_server_cert:
|
|
# backwards compatibility; if this is the cert that was requested
|
|
# as a single server cert, set it in the response as the single
|
|
# server cert
|
|
rel.to_publish_raw.update({
|
|
self._server_cert_key: cert,
|
|
self._server_key_key: key,
|
|
})
|
|
else:
|
|
data = rel.to_publish.get(self._publish_key, {})
|
|
data[self.common_name] = {
|
|
'cert': cert,
|
|
'key': key,
|
|
}
|
|
rel.to_publish[self._publish_key] = data
|
|
if not rel.endpoint.new_server_requests:
|
|
clear_flag(rel.endpoint.expand_name('{endpoint_name}.server'
|
|
'.cert.requested'))
|
|
if not rel.endpoint.new_requests:
|
|
clear_flag(rel.endpoint.expand_name('{endpoint_name}.'
|
|
'certs.requested'))
|
|
data_changed(self._key, sorted(set(self.sans or [])))
|
|
|
|
|
|
class ApplicationCertificateRequest(CertificateRequest):
|
|
"""
|
|
A request for an application consistent certificate.
|
|
|
|
This is a request for a certificate that works for all units of an
|
|
application. All sans and cns are added together to produce one
|
|
certificate and the same certificate and key are sent to all the
|
|
units of an application. Only one ApplicationCertificateRequest
|
|
is needed per application.
|
|
"""
|
|
|
|
@property
|
|
def _key(self):
|
|
"""Key to identify this cert.
|
|
|
|
:returns: cert key
|
|
:rtype: str
|
|
"""
|
|
return '{}.{}'.format(self._unit.relation.relation_id, 'app_cert')
|
|
|
|
@property
|
|
def cert(self):
|
|
"""
|
|
The cert published for this request, if any.
|
|
|
|
:returns: Certificate
|
|
:rtype: Certificate or None
|
|
"""
|
|
cert, key = None, None
|
|
tp = self._unit.relation.to_publish
|
|
certs_data = tp.get(self._publish_key, {})
|
|
cert_data = certs_data.get('app_data', {})
|
|
cert = cert_data.get('cert')
|
|
key = cert_data.get('key')
|
|
if cert and key:
|
|
return Certificate(self.cert_type, self.common_name, cert, key)
|
|
return None
|
|
|
|
@property
|
|
def is_handled(self):
|
|
"""Whether the certificate has been handled.
|
|
|
|
:returns: If the cert has been handled
|
|
:rtype: bool
|
|
"""
|
|
has_cert = self.cert is not None
|
|
same_sans = not is_data_changed(self._key,
|
|
sorted(set(self.sans or [])))
|
|
return has_cert and same_sans
|
|
|
|
@property
|
|
def sans(self):
|
|
"""Generate a list of all sans from all units of application
|
|
|
|
Examine all units of the application and compile a list of
|
|
all sans. CNs are treated as addition san entries.
|
|
|
|
:returns: List of sans
|
|
:rtype: List[str]
|
|
"""
|
|
_sans = []
|
|
for unit in self._unit.relation.units:
|
|
reqs = unit.received['application_cert_requests'] or {}
|
|
for cn, req in reqs.items():
|
|
_sans.append(cn)
|
|
_sans.extend(req['sans'])
|
|
return sorted(list(set(_sans)))
|
|
|
|
@property
|
|
def _request_key(self):
|
|
"""Key used to request cert
|
|
|
|
:returns: Key used to request cert
|
|
:rtype: str
|
|
"""
|
|
return 'application_cert_requests'
|
|
|
|
def derive_publish_key(self, unit=None):
|
|
"""Derive the application cert publish key for a unit.
|
|
|
|
:param unit: Unit to extract name from
|
|
:type unit: charms.reactive.endpoints.RelatedUnit
|
|
:returns: publish key
|
|
:rtype: str
|
|
"""
|
|
if not unit:
|
|
unit = self._unit
|
|
unit_name = self.resolve_unit_name(unit).replace('/', '_')
|
|
return '{}.processed_application_requests'.format(unit_name)
|
|
|
|
@property
|
|
def _publish_key(self):
|
|
"""Key used to publish cert
|
|
|
|
:returns: Key used to publish cert
|
|
:rtype: str
|
|
"""
|
|
return self.derive_publish_key(unit=self._unit)
|
|
|
|
def set_cert(self, cert, key):
|
|
"""Send the cert and key to all units of the application
|
|
|
|
:param cert: TLS Certificate
|
|
:type cert: str
|
|
:param key: TLS Private Key
|
|
:type cert: str
|
|
"""
|
|
rel = self._unit.relation
|
|
for unit in self._unit.relation.units:
|
|
pub_key = self.derive_publish_key(unit=unit)
|
|
data = rel.to_publish.get(
|
|
pub_key,
|
|
{})
|
|
data['app_data'] = {
|
|
'cert': cert,
|
|
'key': key,
|
|
}
|
|
rel.to_publish[pub_key] = data
|
|
if not rel.endpoint.new_application_requests:
|
|
clear_flag(rel.endpoint.expand_name(
|
|
'{endpoint_name}.application.certs.requested'))
|
|
data_changed(self._key, sorted(set(self.sans or [])))
|
|
|
|
|
|
class Certificate(dict):
|
|
"""
|
|
Represents a created certificate and key.
|
|
|
|
The ``cert_type``, ``common_name``, ``cert``, and ``key`` values can
|
|
be accessed either as properties or as the contents of the dict.
|
|
"""
|
|
def __init__(self, cert_type, common_name, cert, key):
|
|
super().__init__({
|
|
'cert_type': cert_type,
|
|
'common_name': common_name,
|
|
'cert': cert,
|
|
'key': key,
|
|
})
|
|
|
|
@property
|
|
def cert_type(self):
|
|
return self['cert_type']
|
|
|
|
@property
|
|
def common_name(self):
|
|
return self['common_name']
|
|
|
|
@property
|
|
def cert(self):
|
|
return self['cert']
|
|
|
|
@property
|
|
def key(self):
|
|
return self['key']
|