Source code for estuary.utils.general

# SPDX-License-Identifier: GPL-3.0+

from __future__ import unicode_literals

import re
from datetime import datetime
from functools import wraps

from flask import current_app, request
from six import text_type
from werkzeug.exceptions import Unauthorized

from estuary import log
from estuary.authorization import is_user_authorized
from estuary.error import ValidationError


[docs]def timestamp_to_datetime(timestamp): """ Convert a string timestamp to a datetime object. :param str timestamp: a generic or ISO-8601 timestamp :return: datetime object of the timestamp :rtype: datetime.datetime :raises ValueError: if the timestamp is an unsupported or invalid format """ log.debug('Trying to parse the timestamp "{0}"'.format(timestamp)) error_msg = 'The timestamp "{0}" is an invalid format'.format(timestamp) combinations = ( (r'^(?P<datetime>\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2})(?:\.\d+)?$', '%Y-%m-%d %H:%M:%S'), (r'^(?P<datetime>\d{4}-\d{1,2}-\d{1,2})$', '%Y-%m-%d'), # ISO 8601 format (r'^(?P<datetime>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(?:\.\d+)?(?:Z|[-+]00(?::00)?)?$', '%Y-%m-%dT%H:%M:%S')) for combination in combinations: regex_match = re.match(combination[0], timestamp) if regex_match: try: return datetime.strptime(regex_match.group('datetime'), combination[1]) except ValueError: # In case the user asked for an unreleastic date like "2020:99:99" raise ValueError(error_msg) raise ValueError(error_msg)
[docs]def timestamp_to_date(timestamp): """ Convert a string timestamp to a date object. :param str timestamp: a generic or ISO-8601 timestamp :return: date object of the timestamp :rtype: datetime.date :raises ValueError: if the timestamp is an unsupported or invalid format """ return timestamp_to_datetime(timestamp).date()
[docs]def str_to_bool(item): """ Convert a string to a boolean. :param str item: string to parse :return: a boolean equivalent :rtype: boolean """ if isinstance(item, text_type): return item.lower() in ('true', '1') else: return False
[docs]def inflate_node(result): """ Inflate a Neo4j result to a neomodel model object. :param neo4j.v1.types.Node result: a node from a cypher query result :return: a model (EstuaryStructuredNode) object """ # To prevent a ciruclar import, this must be imported here from estuary.models import names_to_model if 'ContainerKojiBuild' in result.labels: result_label = 'ContainerKojiBuild' elif 'ContainerAdvisory' in result.labels: result_label = 'ContainerAdvisory' elif 'ModuleKojiBuild' in result.labels: result_label = 'ModuleKojiBuild' elif len(result.labels) > 1: raise RuntimeError('inflate_node encounted a node with multiple labels: {0}. ' 'Which one should be used?'.format(', '.join(result.labels))) else: result_label = list(result.labels)[0] if result_label in names_to_model: node_model = names_to_model[result_label] else: # This should never happen unless Neo4j returns labels that aren't associated with # classes in all_models raise RuntimeError('A StructuredNode couldn\'t be found from the labels: {0}'.format( ', '.join(result.labels))) return node_model.inflate(result)
[docs]def get_neo4j_node(resource_name, uid): """ Get a Neo4j node based on a label and unique identifier. :param str resource_name: a neomodel model label :param str uid: a string of the unique identifier defined in the neomodel model :return: a neomodel model object :raises ValidationError: if the requested resource doesn't exist or doesn't have a UniqueIdProperty """ # To prevent a ciruclar import, we must import this here from estuary.models import all_models for model in all_models: if model.__label__.lower() == resource_name.lower(): try: return model.find_or_none(uid) except RuntimeError: # There is no UniqueIdProperty on this model so raise an exception models_wo_uid = ('DistGitRepo') model_names = [model.__name__.lower() for model in all_models if model.__name__ not in models_wo_uid] error = ('The requested resource "{0}" is invalid. Choose from the following: ' '{1}, and {2}.'.format(resource_name, ', '.join(model_names[:-1]), model_names[-1])) raise ValidationError(error)
[docs]def login_required(f): """ Decorate a Flask route to validate a token if authentication is enabled. :param function f: the function to wrap :return: the wrapper function :rtype: function """ @wraps(f) def wrapper(*args, **kwargs): if current_app.config['ENABLE_AUTH']: if 'Authorization' not in request.headers: raise Unauthorized('An "Authorization" header wasn\'t provided') token = request.headers['Authorization'].strip() prefix = 'Bearer ' if not token.startswith(prefix): raise Unauthorized( 'The "Authorization" header must start with "{0}"'.format(prefix.rstrip())) token = token[len(prefix):] # Keycloak doesn't return the scopes from its introspection API endpoint. Other # validation is used instead. required_scopes = [] validity = current_app.oidc.validate_token(token, required_scopes) if validity is not True: raise Unauthorized(validity) token_info = current_app.oidc._get_token_info(token) username = token_info.get('username') employee_type = token_info.get('employeeType') if not is_user_authorized(username, employee_type): raise Unauthorized('You must be an employee to access this service') return f(*args, **kwargs) return wrapper