rain.nodes.spark package#

Subpackages#

Submodules#

rain.nodes.spark.data_wrangling module#

Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A. Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class rain.nodes.spark.data_wrangling.SparkColumnSelector(node_id: str, column_list: List[str], filter_list: List[str] = [])[source]#

Bases: Transformer

SparkColumnSelector manages filtering of rows, columns and values for a Spark DataFrame.

Input:

dataset (DataFrame) – A Spark DataFrame.

Output:

dataset (DataFrame) – A Spark DataFrame.

Parameters:
  • node_id (str) – Id of the node

  • column_list (List[str]) – List of columns to select from the dataset

  • filter_list (List[str]) – List of conditions used to filter the rows of the dataset

dataset = None#
execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.data_wrangling.SparkSplitDataset(node_id: str, train: float, test: float)[source]#

Bases: Transformer

Splits a Spark DataFrame in two DataFrames, train and test.

Input:

dataset (DataFrame) – A Spark DataFrame.

Output:
  • train_dataset (DataFrame) – A Spark DataFrame used for the training phase.

  • test_dataset (DataFrame) – A Spark DataFrame used for the test phase.

Parameters:
  • node_id (str) – Id of the node.

  • train (float) – Percentage of the dataset to split into a train dataset.

  • test (float) – Percentage of the dataset to split into a test dataset.

dataset = None#
execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

test_dataset = None#
train_dataset = None#

rain.nodes.spark.node_structure module#

Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A. Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class rain.nodes.spark.node_structure.Estimator(node_id: str)[source]#

Bases: SparkNode

Class representing a Spark Estimator, it takes a dataset and returns a trained model.

Input:

dataset (DataFrame) – A Spark DataFrame.

Output:

model (PipelineModel) – A Spark PipelineModel.

Parameters:

node_id (str) – Id of the node.

dataset = None#
abstract execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

model = None#
class rain.nodes.spark.node_structure.SparkInputNode(node_id: str)[source]#

Bases: InputNode, SparkNodeSession

Class representing a Spark InputNode, it loads and returns an object/file.

abstract execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.node_structure.SparkNode(node_id)[source]#

Bases: ComputationalNode, SparkNodeSession, SparkStageMixin

Class representing a Spark ComputationalNode, it could be either a Transformer or Estimator.

Input:

dataset (DataFrame) – A Spark DataFrame.

Parameters:

node_id (str) – Id of the node.

dataset = None#
abstract execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.node_structure.SparkNodeSession[source]#

Bases: object

Mixin class to share the spark session among different kinds of spark nodes.

spark: SparkSession = None#
class rain.nodes.spark.node_structure.SparkOutputNode(node_id: str)[source]#

Bases: OutputNode, SparkNodeSession

Class representing a Spark OutputNode, it save a given object/file.

abstract execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.node_structure.SparkStageMixin[source]#

Bases: object

Mixin class to store the Spark Estimator/Transformer instance that should be used in a SparkPipeline.

class rain.nodes.spark.node_structure.Transformer(node_id: str)[source]#

Bases: SparkNode

Class representing a Spark Transformer, it manipulates a given dataset and returns a modified version of it.

Input:

dataset (DataFrame) – A Spark DataFrame.

Output:

dataset (DataFrame) – A Spark DataFrame.

Parameters:

node_id (str) – Id of the node.

dataset = None#
abstract execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

rain.nodes.spark.spark_input module#

Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A. Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class rain.nodes.spark.spark_input.SparkCSVLoader(node_id: str, path: str, header: bool = False, schema: bool = False)[source]#

Bases: SparkInputNode

Loads a CSV file as a Spark DataFrame.

Output:

dataset (DataFrame) – The loaded Spark DataFrame.

Parameters:
  • node_id (str) – Id of the node.

  • path (str) – Path of the csv file.

  • header (bool, default False) – Uses the first line as names of columns.

  • schema (bool, default False) – Infers the input schema automatically from data. It requires one extra pass over the data.

dataset = None#
execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.spark_input.SparkModelLoader(node_id: str, path: str)[source]#

Bases: SparkInputNode

Loads a file as a Spark Model.

Output:

model (PipelineModel) – The loaded Spark PipelineModel.

Parameters:
  • node_id (str) – Id of the node.

  • path (str) – Path of the csv file.

execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

model = None#

rain.nodes.spark.spark_output module#

Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A. Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class rain.nodes.spark.spark_output.SparkSaveDataset(node_id: str, path: str, index: bool = True)[source]#

Bases: SparkOutputNode

Save a Spark Dataframe in a .csv format

Input:

dataset (DataFrame) – The Spark Dataframe to be saved.

Parameters:
  • node_id (str) – Id of the node.

  • path (str) – String representing the path where to save the dataset

  • index (bool, default True) – String representing the path where to save the dataset

dataset = None#
execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

class rain.nodes.spark.spark_output.SparkSaveModel(node_id: str, path: str)[source]#

Bases: SparkOutputNode

Save a trained PipelineModel

Input:

model (PipelineModel) – The Spark PipelineModel to be saved.

Parameters:
  • node_id (str) – Id of the node.

  • path (str) – String representing the path where to save the model.

execute()[source]#

Expose the main functionality: depending on the node, the computation is done using a specific Python library and its function/s.

model = None#

Module contents#

Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A. Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.