Source code for rain.nodes.spark.data_wrangling

"""
 Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
 Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU Affero General Public License as
 published by the Free Software Foundation, either version 3 of the
 License, or (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU Affero General Public License for more details.

 You should have received a copy of the GNU Affero General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.
 """

from typing import List

from pyspark.sql import DataFrame

from rain.core.parameter import KeyValueParameter, Parameters
from rain.nodes.spark.node_structure import Transformer


[docs]class SparkColumnSelector(Transformer): """SparkColumnSelector manages filtering of rows, columns and values for a Spark DataFrame. Input ----- dataset : DataFrame A Spark DataFrame. Output ------ dataset : DataFrame A Spark DataFrame. Parameters ---------- node_id : str Id of the node column_list : List[str] List of columns to select from the dataset filter_list : List[str] List of conditions used to filter the rows of the dataset """ def __init__( self, node_id: str, column_list: List[str], filter_list: List[str] = [] ): super(SparkColumnSelector, self).__init__(node_id) self.parameters = Parameters( column_list=KeyValueParameter("column_list", List[str], column_list), filter_list=KeyValueParameter("filter_list", List[str], filter_list), )
[docs] def execute(self): self.dataset = self.dataset.select( self.parameters.get_dict().get("column_list") ) for c in self.parameters.get_dict().get("filter_list"): self.dataset = self.dataset.filter(c)
[docs]class SparkSplitDataset(Transformer): """Splits a Spark DataFrame in two DataFrames, train and test. Input ----- dataset : DataFrame A Spark DataFrame. Output ------ train_dataset : DataFrame A Spark DataFrame used for the training phase. test_dataset : DataFrame A Spark DataFrame used for the test phase. Parameters ---------- node_id : str Id of the node. train : float Percentage of the dataset to split into a train dataset. test : float Percentage of the dataset to split into a test dataset. """ _output_vars = {"train_dataset": DataFrame, "test_dataset": DataFrame} def __init__(self, node_id: str, train: float, test: float): super(SparkSplitDataset, self).__init__(node_id) self.parameters = Parameters( train=KeyValueParameter("train", float, train), test=KeyValueParameter("test", float, test), )
[docs] def execute(self): values = list(self.parameters.get_dict().values()) self.train_dataset, self.test_dataset = self.dataset.randomSplit(values)