Source code for rain.nodes.pandas.zscore

"""
 Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
 Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU Affero General Public License as
 published by the Free Software Foundation, either version 3 of the
 License, or (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU Affero General Public License for more details.

 You should have received a copy of the GNU Affero General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.
 """

import pickle
from typing import List
import numpy as np
import pandas as pd

from rain import Tags, LibTag, TypeTag
from rain.core.parameter import Parameters, KeyValueParameter
from rain.nodes.pandas.node_structure import PandasNode


[docs]class ZScoreTrainer(PandasNode): """Node that returns the model trained with the ZScore algorithm by analyzing the columns of the dataset. Input ----- dataset : pandas.DataFrame The pandas DataFrame. Output ------ model : pickle The ZScore model in pickle format. Parameters ---------- columns : List[str] Column names to apply ZScore to. Empty to use all columns. """ _input_vars = {"dataset": pd.DataFrame} _output_vars = {"model": "pickle"} def __init__(self, node_id: str, columns: List[str] = []): super(ZScoreTrainer, self).__init__(node_id) self.parameters = Parameters( columns=KeyValueParameter("columns", List[str], columns), )
[docs] def execute(self): if not self.parameters.columns.value: self.parameters.columns.value = self.dataset.columns mean = {} dev_std = {} for column in self.parameters.columns.value: content = self.dataset[column] mean[column] = np.mean(content) dev_std[column] = np.std(content) self.model = pickle.dumps({"mean": mean, "dev_std": dev_std})
@classmethod def _get_tags(cls): return Tags(LibTag.PANDAS, TypeTag.TRAINER)
[docs]class ZScorePredictor(PandasNode): """Node that returns the predictions performed with a ZScore model on the columns of a dataset. Input ----- dataset : pandas.DataFrame The pandas DataFrame. model : pickle The ZScore model in pickle format. Output ------ predictions : pandas.DataFrame The DataFrame containing the predictions. Parameters ---------- columns : List[str] Column names to apply ZScore to. Empty to use all columns. threshold : float, default=1.3 The threshold of the ZScore to distinguish anomalies. """ _input_vars = {"dataset": pd.DataFrame, "model": "pickle"} _output_vars = {"predictions": pd.DataFrame} def __init__(self, node_id: str, columns: List[str] = [], threshold: float = 1.3): super(ZScorePredictor, self).__init__(node_id) self.parameters = Parameters( columns=KeyValueParameter("columns", List[str], columns), threshold=KeyValueParameter("threshold", float, threshold), ) self.predictions = {}
[docs] def execute(self): if not self.parameters.columns.value: self.parameters.columns.value = self.dataset.columns model = pickle.loads(self.model) for column in self.parameters.columns.value: self.predictions[column] = [] content = self.dataset[column] for i in content: z = (i - model.get("mean")[column]) / model.get("dev_std")[column] if abs(z) > self.parameters.threshold.value: self.predictions[column].append(1) else: self.predictions[column].append(0) self.predictions = pd.DataFrame.from_dict(self.predictions)
@classmethod def _get_tags(cls): return Tags(LibTag.PANDAS, TypeTag.PREDICTOR)