Source code for rain.nodes.spark.pipeline.spark_pipeline
"""
Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from typing import List
from pyspark.ml import Pipeline
from rain.nodes.spark.node_structure import Estimator, SparkNode
[docs]class SparkPipelineNode(Estimator):
"""Represent a Spark Pipeline consisting of SparkNode (stages). It should contain some Spark Transformer and a final
Spark Estimator that return the trained model.
Input
-----
dataset : DataFrame
A Spark DataFrame.
Output
------
model : PipelineModel
A Spark PipelineModel.
Parameters
----------
node_id : str
Id of the node.
stages: List[SparkNode]
List of SparkNode that can be executed in a Spark Pipeline.
Notes
-----
Visit `<https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline>`_ for Spark Pipeline documentation.
"""
_stages = []
def __init__(self, node_id: str, stages: List[SparkNode]):
super(SparkPipelineNode, self).__init__(node_id)
for stage in stages:
if stage.computational_instance is None:
raise Exception(
"{} is not a valid stage".format(stage.__class__.__name__)
)
self._stages.append(stage)
[docs] def execute(self):
pipeline_stages = []
for stage in self._stages:
pipeline_stages.append(stage.computational_instance)
pipeline = Pipeline(stages=pipeline_stages)
self.model = pipeline.fit(self.dataset)