Source code for rain.nodes.pysad.trainer

"""
 Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
 Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU Affero General Public License as
 published by the Free Software Foundation, either version 3 of the
 License, or (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU Affero General Public License for more details.

 You should have received a copy of the GNU Affero General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.
 """

import numpy as np

from rain.core.parameter import Parameters, KeyValueParameter
from rain.nodes.pysad.node_structure import PySadTrainer

from pysad.models import IForestASD as IFASD, xStream as xS, HalfSpaceTrees as HST
from pysad.evaluation import AUROCMetric
from rain.loguru_logger import logger


[docs]class HalfSpaceTree(PySadTrainer): """Node that trains a model using the HalfSpaceTree algorithm. Input ----- dataset : pd.DataFrame A Pandas DataFrame containing the features. labels : pd.Series A Pandas Series containing the labels. Output ------ model : pickle The trained model in pickle format. auroc : float The AUROC metric of the trained model. Parameters ---------- node_id : str Id of the node. window_size : int, default=100 The size of the window. num_trees : int, default=25 The number of trees. initial_window_x : np.ndarray, default=None The initial window to fit for initial calibration period. If not None, we simply apply fit to these instances. max_depth : int, default=15 The maximum depth of the trees. """ def __init__(self, node_id: str, data, window_size: int = 100, num_trees: int = 25, initial_window_x: np.ndarray = None, max_depth: int = 15): super(HalfSpaceTree, self).__init__(node_id) self.parameters = Parameters( window_size=KeyValueParameter("window_size", int, window_size), num_trees=KeyValueParameter("num_trees", int, num_trees), max_depth=KeyValueParameter("max_depth", int, max_depth), initial_window_X=KeyValueParameter("initial_window_X", np.ndarray, initial_window_x) ) self.dataset = data self.model = HST(self.dataset.to_numpy().min(axis=0), self.dataset.to_numpy().max(axis=0), **self.parameters.get_dict()) self.metric = AUROCMetric()
[docs] def execute(self): self.scores = self.model.fit_score(self.dataset.to_numpy()) if self.labels is not None: for label, score in zip(self.labels, self.scores): if np.isnan(score): continue self.metric.update(label, score) self.auroc = self.metric.get() print(self.auroc)
[docs]class XStream(PySadTrainer): """Node that trains a model using the xStream algorithm. Input ----- dataset : pd.DataFrame A Pandas DataFrame containing the features. labels : pd.Series A Pandas Series containing the labels. Output ------ model : pickle The trained model in pickle format. auroc : float The AUROC metric of the trained model. Parameters ---------- node_id : str Id of the node. window_size : int, default=25 The size (and the sliding length) of the reference window. num_components : int, default=100 The number of components for streamhash projection. n_chains : int, default=100 The number of half-space chains. depth : int, default=25 The maximum depth for the chains. """ def __init__(self, node_id: str, window_size: int = 25, num_components: int = 100, n_chains: int = 100, depth: int = 25): super(XStream, self).__init__(node_id) self.parameters = Parameters( window_size=KeyValueParameter("window_size", int, window_size), num_components=KeyValueParameter("num_components", int, num_components), n_chains=KeyValueParameter("n_chains", int, n_chains), depth=KeyValueParameter("depth", int, depth) ) self.model = xS(**self.parameters.get_dict()) self.metric = AUROCMetric()
[docs] def execute(self): self.scores = self.model.fit_score(self.dataset.to_numpy()) if self.labels is not None: for label, score in zip(self.labels, self.scores): if np.isnan(score): continue self.metric.update(label, score) self.auroc = self.metric.get() print(self.auroc)
[docs]class IForestASD(PySadTrainer): """Node that trains a model using the IForestASD algorithm. Input ----- dataset : pd.DataFrame A Pandas DataFrame containing the features. labels : pd.Series A Pandas Series containing the labels. Output ------ model : pickle The trained model in pickle format. auroc : float The AUROC metric of the trained model. Parameters ---------- node_id : str Id of the node. window_size : int, default= 2048 The size of the reference window and its sliding. """ def __init__(self, node_id: str, window_size: int = 2048): super(IForestASD, self).__init__(node_id) self.parameters = Parameters( window_size=KeyValueParameter("window_size", int, window_size) ) self.model = IFASD(**self.parameters.get_dict()) self.metric = AUROCMetric()
[docs] def execute(self): self.scores = self.model.fit_score(self.dataset.to_numpy()) if self.labels is not None: for label, score in zip(self.labels, self.scores): if np.isnan(score): continue self.metric.update(label, score) self.auroc = self.metric.get() logger.info(f"Model trained - AUROC: {self.auroc}", node_name=self.node_id)