# SPDX-License-Identifier: GPL-3.0+
from __future__ import unicode_literals
from datetime import datetime
from neomodel import (EITHER, INCOMING, OUTGOING, One, StructuredNode,
UniqueIdProperty, ZeroOrOne)
from estuary import log
from estuary.utils.general import inflate_node
[docs]class EstuaryStructuredNode(StructuredNode):
"""Base class for Estuary Neo4j models."""
__abstract_node__ = True
@property
def display_name(self):
"""Get intuitive (human readable) display name for the node."""
raise NotImplementedError('The display_name method is not defined')
@property
def timeline_timestamp(self):
"""Get the DateTime property used for the Estuary timeline as a string."""
if self.timeline_datetime:
return self.timeline_datetime.strftime('%Y-%m-%dT%H:%M:%SZ')
return None
@property
def timeline_datetime(self):
"""Get the DateTime property used for the Estuary timeline."""
raise NotImplementedError('The timeline_datetime method is not defined')
@property
def serialized(self):
"""
Convert a model to serialized form.
:return: a serialized form of the node
:rtype: dictionary
"""
rv = {}
for key, value in self.__properties__.items():
# id is the internal Neo4j ID that we don't want to display to the user
if key == 'id':
continue
actual_key = getattr(self.__class__, key).db_property or key
if isinstance(value, datetime):
rv[actual_key] = value.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
rv[actual_key] = value
rv['resource_type'] = self.__label__
rv['display_name'] = self.display_name
return rv
@property
def serialized_all(self):
"""
Generate a serialized form of the node that includes all its relationships.
:return: a serialized form of the node with relationships
:rtype: dictionary
:raises RuntimeError: if the label of a Neo4j node can't be mapped back to a neomodel class
"""
# Avoid circular imports
from estuary.models import models_inheritance
# A set that will keep track of all properties on the node that weren't returned from Neo4j
null_properties = set()
# A mapping of Neo4j relationship names in the format of:
# {
# node_label: {
# relationship_name: {direction: (property_name, cardinality_class) ...},
# }
# }
relationship_map = {}
for property_name, relationship in self.__all_relationships__:
node_label = relationship.definition['node_class'].__label__
relationship_name = relationship.definition['relation_type']
for label in models_inheritance[node_label]:
if label not in relationship_map:
relationship_map[label] = {}
relationship_direction = relationship.definition['direction']
if relationship_direction == EITHER:
# The direction can be coming from either direction, so map both
properties = {
INCOMING: (property_name, relationship.manager),
OUTGOING: (property_name, relationship.manager),
}
else:
properties = {relationship_direction: (property_name, relationship.manager)}
if relationship_name not in relationship_map[label]:
relationship_map[label][relationship_name] = properties
else:
relationship_map[label][relationship_name].update(properties)
null_properties.add(property_name)
# This variable will contain the current node as serialized + all relationships
serialized = self.serialized
# Get all the direct relationships in both directions
results, _ = self.cypher('MATCH (a) WHERE id(a)={self} MATCH (a)-[r]-(all) RETURN r, all')
for relationship, node in results:
# If the starting node in the relationship is the same as the node being serialized,
# we know that the relationship is outgoing
if relationship.start_node.id == self.id:
direction = OUTGOING
else:
direction = INCOMING
# Convert the Neo4j result into a model object
inflated_node = inflate_node(node)
try:
property_name, cardinality_class = \
relationship_map[inflated_node.__label__][relationship.type][direction]
except KeyError:
if direction == OUTGOING:
direction_text = 'outgoing'
else:
direction_text = 'incoming'
log.warn(
'An {0} {1} relationship of {2!r} with {3!r} is not mapped in the models and '
'will be ignored'.format(direction_text, relationship.type, self, inflate_node))
continue
if not serialized.get(property_name):
null_properties.remove(property_name)
if cardinality_class in (One, ZeroOrOne):
serialized[property_name] = inflated_node.serialized
else:
if not serialized.get(property_name):
serialized[property_name] = []
serialized[property_name].append(inflated_node.serialized)
# Neo4j won't return back relationships it doesn't know about, so just make them empty
# so that the keys are always consistent
for property_name in null_properties:
prop = getattr(self, property_name)
if isinstance(prop, One) or isinstance(prop, ZeroOrOne):
serialized[property_name] = None
else:
serialized[property_name] = []
return serialized
[docs] @classmethod
def find_or_none(cls, identifier):
"""
Find the node using the supplied identifier.
This method should be overridden if the node class accepts multiple types of identifiers.
:param str identifier: the identifier to search the node by
:return: the node or None
:rtype: EstuaryStructuredNode or None
"""
for _, prop_def in cls.__all_properties__:
if isinstance(prop_def, UniqueIdProperty):
return cls.nodes.get_or_none(**{prop_def.name: identifier})
raise RuntimeError('{0} has no UniqueIdProperty'.format(cls.__label__))
[docs] @staticmethod
def conditional_connect(relationship, new_node):
"""
Wrap the connect and replace methods for conditional relationship handling.
:param neomodel.RelationshipManager relationship: a relationship to connect on
:param neomodel.StructuredNode new_node: the node to create the relationship with
:raises NotImplementedError: if this method is called with a relationship of cardinality of
one
"""
if new_node not in relationship:
if len(relationship) == 0:
relationship.connect(new_node)
else:
if isinstance(relationship, ZeroOrOne):
relationship.replace(new_node)
elif isinstance(relationship, One):
raise NotImplementedError(
'conditional_connect doesn\'t support cardinality of one')
else:
relationship.connect(new_node)
@property
def unique_id_property(self):
"""
Get the name of the UniqueIdProperty for the node.
:return: a string containing name of the unique ID property of a node
:rtype: str
"""
for _, prop_def in self.__all_properties__:
if isinstance(prop_def, UniqueIdProperty):
return prop_def.db_property or prop_def.name
[docs] @staticmethod
def inflate_results(results):
"""
Inflate the results.
:param str results: results obtained from Neo4j
:return: a list of dictionaries containing serialized results received from Neo4j
:rtype: list
"""
results_list = []
for raw_result in results:
nodes = []
for node in raw_result:
if node:
inflated_node = inflate_node(node)
nodes.append(inflated_node)
results_list.append(nodes)
return results_list
[docs] def add_label(self, new_label):
"""
Add a Neo4j label to an existing node.
:param str new_label: the new label to add to the node
"""
self.cypher('MATCH (a) WHERE id(a)={{self}} SET a :{0}'.format(new_label))
[docs] def remove_label(self, label):
"""
Remove a Neo4j label from an existing node.
:param str label: the label to be removed from the node
"""
self.cypher('MATCH (a) WHERE id(a)={{self}} REMOVE a :{0}'.format(label))