"""
Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Union
import copy
import networkx as nx
from rain.core.exception import (
EdgeConnectionError,
CyclicDataFlowException,
DuplicatedNodeId,
)
from rain.core.execution import LocalExecutor
from rain.loguru_logger import logger
[docs]class LibTag(Enum):
"""
Enumeration representing the library which the SimpleNode refers to.
"""
PANDAS = "Pandas"
SPARK = "PySpark"
SKLEARN = "Scikit-Learn"
MONGODB = "PyMongo"
TPOT = 'TPOT'
PYSAD = "PySad"
OTHER = "Other"
BASE = "Base"
[docs]class TypeTag(Enum):
"""
Enumeration representing the type of the SimpleNode according to its functionality.
"""
INPUT = "Input"
OUTPUT = "Output"
TRANSFORMER = "Transformer"
CLASSIFIER = "Classifier"
CLUSTERER = "Clusterer"
REGRESSOR = "Regressor"
ESTIMATOR = "Estimator"
TRAINER = "Trainer"
PREDICTOR = "Predictor"
METRICS = "Metrics"
OTHER = "Other"
CUSTOM = "Custom"
[docs]class SimpleNode(metaclass=Meta):
"""Base class of each node in Rain.
Parameters
----------
node_id : str
The unique identifier of the node
"""
def __init__(self, node_id: str):
logger.info("Create Node", node_name=node_id)
super(SimpleNode, self).__init__()
self.node_id = node_id
[docs] @abstractmethod
def execute(self):
"""Expose the main functionality: depending on the node, the computation is done using a specific Python
library and its function/s.
"""
pass # pragma: no cover
[docs] @abstractmethod
def has_attribute(self, attribute: str) -> bool:
"""Tell if the node has the given attribute
Parameters
----------
attribute : str
The name of the parameter to check.
Returns
-------
bool
True if the node has the given parameter, False otherwise.
"""
pass # pragma: no cover
@classmethod
def _get_tags(cls):
"""Return the Tags associated to this node"""
pass # pragma: no cover
def __hash__(self):
return hash(self.node_id)
def __eq__(self, other):
if not isinstance(other, SimpleNode):
return False
if not self.node_id == other.node_id:
return False
return True
def __matmul__(self, other):
return EdgeContentSpecifier(self, other)
def __str__(self):
return self.node_id
[docs]class OutputMixin:
"""Mixin used by a SimpleNode to inherit that it is an output node, so that the right input variables are set."""
_input_vars = {}
def __init__(self):
# Set every input as an attribute
for key in self._input_vars.keys():
if not hasattr(self, key):
setattr(self, key, None)
[docs]class ComputationalNode(SimpleNode, InputMixin, OutputMixin):
"""Class representing a computational node, having both input and output attributes.
Parameters
----------
node_id : str
The unique identifier of the node.
"""
def __init__(self, node_id: str):
super(ComputationalNode, self).__init__(node_id)
[docs] @abstractmethod
def execute(self):
pass # pragma: no cover
[docs] def has_attribute(self, attribute: str) -> bool:
in_out_vars = set(self._input_vars.keys()).union(self._output_vars.keys())
return attribute in in_out_vars
[docs]class OutputNode(SimpleNode, OutputMixin):
"""Class representing an output node.
Parameters
----------
node_id : str
The unique identifier of the node.
"""
def __init__(self, node_id: str):
super(OutputNode, self).__init__(node_id)
[docs] @abstractmethod
def execute(self):
pass # pragma: no cover
[docs] def has_attribute(self, attribute: str) -> bool:
return attribute in self._input_vars.keys()
@classmethod
def _get_tags(cls):
return Tags(LibTag.OTHER, TypeTag.OUTPUT)
[docs]class EdgeContentSpecifier:
"""It works as an attribute specifier for the nodes that are used within a Multiedge.
Parameters
----------
node : SimpleNode
The node that contains the chosen attributes
nodes_attributes : Union[str, List]
The chosen attributes of the node, they can either be the input or the output of the node.
"""
def __init__(self, node: SimpleNode, nodes_attributes: Union[str, List]):
self.node = node
if isinstance(nodes_attributes, str):
nodes_attributes = [nodes_attributes]
elif (
not isinstance(nodes_attributes, list)
or not bool(list)
or not all([isinstance(attr, str) for attr in nodes_attributes])
):
raise EdgeConnectionError(
f"The chosen {nodes_attributes} node attributes must be either string or a non-empty list."
)
for attr in nodes_attributes:
if not node.has_attribute(attr):
raise EdgeConnectionError(f"Node {node} has no attribute {attr}")
self.nodes_attributes = nodes_attributes
def __gt__(self, other):
if not isinstance(other, EdgeContentSpecifier):
raise EdgeConnectionError(
"The right side of '>' must be an EdgeContentSpecifier!"
"The latter can be created with 'node_var @ ['in_out_attr']'"
)
return MultiEdge(self, other)
[docs]class MultiEdge:
"""Represents an edge of the dataflow.
Parameters
----------
source : EdgeContentSpecifier
source node and its attributes.
destination : EdgeContentSpecifier
destination node and its attributes.
"""
def __init__(
self,
source: EdgeContentSpecifier,
destination: EdgeContentSpecifier,
):
if not isinstance(source.node, InputMixin):
raise EdgeConnectionError(f"Node '{source.node}' has no output variable.")
elif not isinstance(destination.node, OutputMixin):
raise EdgeConnectionError(
f"Node '{destination.node}' has no input variable."
)
logger.debug(
f"Create edge from {source.nodes_attributes} to {destination.node.node_id} - {destination.nodes_attributes}",
node_name=source.node.node_id,
)
self.source = source
self.destination = destination
[docs]class DataFlow:
"""Class representing a Dataflow in Rain, containing nodes and edges.
Parameters
----------
dataflow_id : str
The unique identifier of the dataflow
executor: Any, default LocalExecutor
The executor used to run the Dataflow
"""
def __init__(self, dataflow_id: str, executor: Any = LocalExecutor()):
logger.info("Create Dataflow", dataflow_id=dataflow_id)
self.id = dataflow_id
self.executor = executor
self._nodes = {}
self._edges: List[MultiEdge] = []
self._dag = nx.DiGraph(name=self.id)
logger.debug(
f"Use executor {type(self.executor).__name__}", dataflow_id=dataflow_id
)
[docs] def add_node(self, node) -> bool:
"""Add a node to the dataflow. If a node with the same node id exists then an exception will be raised.
Parameters
----------
node : SimpleNode
The node to add.
Returns
-------
bool
True if the node has been correctly added.
Raises
------
DuplicatedNodeId
If a node with the same node id already exists.
"""
logger.info(f"Add node {node.node_id}", dataflow_id=self.id)
if node.node_id in self._nodes.keys():
raise DuplicatedNodeId(
"The node identified as {} already exists within the DataFlow.".format(
node.node_id
)
)
self._nodes[node.node_id] = node
self._dag.add_node(node.node_id)
return True
[docs] def add_nodes(self, nodes) -> bool:
"""Add a node to the dataflow. If a node with the same node id exists then an exception will be raised.
Parameters
----------
nodes : list of SimpleNode
The node to add.
Returns
-------
bool
True if the node has been correctly added.
Raises
------
DuplicatedNodeId
If a node with the same node id already exists.
"""
for node in nodes:
self.add_node(node)
return True
[docs] def get_node(self, node_id: str):
"""Method used to return the SimpleNode given its id.
Parameters
----------
node_id : str
The id of the node to return.
Returns
-------
SimpleNode
The SimpleNode with the given id.
"""
return self._nodes.get(node_id) if node_id in self._nodes.keys() else None
[docs] def add_edge(self, edge: MultiEdge):
"""Method used to add an edge to the Dataflow.
Parameters
----------
edge : MultiEdge
The edge that should be added to the Dataflow.
"""
logger.info(
f"Add edge from {edge.source.node.node_id} - {edge.source.nodes_attributes} to {edge.destination.node.node_id} - {edge.destination.nodes_attributes}",
dataflow_id=self.id,
)
try:
self.add_node(edge.source.node)
except DuplicatedNodeId:
pass
try:
self.add_node(edge.destination.node)
except DuplicatedNodeId:
pass
self._edges.append(edge)
self._dag.add_edge(edge.source.node.node_id, edge.destination.node.node_id)
[docs] def add_edges(self, edges: List[MultiEdge]):
"""Method used to add a list of edges to the Dataflow.
Parameters
----------
edges : List[MultiEdge]
The list of edges that should be added to the Dataflow.
"""
for edge in edges:
self.add_edge(edge)
[docs] def get_edge(self, source: SimpleNode, destination: SimpleNode) -> MultiEdge:
"""Method used to get the edge with the specif source and destination node.
Parameters
----------
source : SimpleNode
The source node of the edge.
destination : SimpleNode
The destination node of the edge.
Returns
-------
MultiEdge
The required edge with the specific source and destination node.
"""
matching_edges = list(
filter(
lambda edge: source == edge.source.node
and destination == edge.destination.node,
self._edges,
)
)
return matching_edges[0] if matching_edges else None
[docs] def get_outgoing_edges(self, source: SimpleNode) -> List[MultiEdge]:
"""Method used to get all the outgoing edges of the specif source node.
Parameters
----------
source : SimpleNode
The source node of the edges.
Returns
-------
List[MultiEdge]
The required outgoing edges with the specific source node.
"""
matching_edges = list(
filter(
lambda edge: source == edge.source.node,
self._edges,
)
)
return matching_edges
[docs] def get_ingoing_edges(self, destination) -> List[MultiEdge]:
"""Method used to get all the ingoing edges of the specif destination node.
Parameters
----------
destination : SimpleNode
The destination node of the edges.
Returns
-------
List[MultiEdge]
The required ingoing edges with the specific destination node.
"""
matching_edges = list(
filter(
lambda edge: destination in edge.destination,
self._edges,
)
)
return matching_edges
[docs] def has_node(self, node: SimpleNode):
"""Tell if the Dataflow contains the given SimpleNode
Parameters
----------
node : SimpleNode
The SimpleNode to check
Returns
-------
bool
True if the Dataflow contains the given node, False otherwise.
"""
if type(node) is str:
return lambda n: node in self._nodes.keys()
return lambda n: node in self._nodes.values()
[docs] def is_acyclic(self):
"""Returns True if the Dataflow is a directed acyclic graph (DAG) or False if not."""
return nx.is_directed_acyclic_graph(self._dag)
[docs] def get_execution_ordered_nodes(self):
"""Returns a list of SimpleNode in topologically sorted order.
Returns
-------
List[SimpleNode]
The list of ordered nodes to be executed.
"""
topological_order = nx.topological_sort(self._dag)
return list(map(lambda node_id: self._nodes.get(node_id), topological_order))
[docs] def execute(self):
"""Execute all the nodes contained in the Dataflow if there are no cycle."""
if not self.is_acyclic():
raise CyclicDataFlowException(self.id)
logger.info("Start execution of the Dataflow", dataflow_id=self.id)
self.executor.execute(self)