Source code for rain.nodes.mongodb.database_io

"""
 Copyright (C) 2023 Università degli Studi di Camerino and Sigma S.p.A.
 Authors: Alessandro Antinori, Rosario Capparuccia, Riccardo Coltrinari, Flavio Corradini, Marco Piangerelli, Barbara Re, Marco Scarpetta

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU Affero General Public License as
 published by the Free Software Foundation, either version 3 of the
 License, or (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU Affero General Public License for more details.

 You should have received a copy of the GNU Affero General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.
 """

import ssl

import pandas
import pandas as pd
import pymongo

from rain.core.base import InputNode, OutputNode, Tags, LibTag, TypeTag
from rain.core.parameter import Parameters, KeyValueParameter


[docs]class MongoCSVWriter(OutputNode): """Write a Pandas Dataframe into a MongoDB collection. Parameters ---------- node_id : str The unique id of the node. connection : str Hostname or IP address or Unix domain socket path of a single MongoDB instance to connect to, or a mongodb URI db : str Name of the database to connect to. coll : str Name of the collection to connect to. """ _input_vars = {"dataset": pd.DataFrame} def __init__(self, node_id: str, connection: str, db: str, coll: str): self.parameters = Parameters( connection=KeyValueParameter("connection", str, connection), db=KeyValueParameter("db", str, db), coll=KeyValueParameter("coll", str, coll), ) super(MongoCSVWriter, self).__init__(node_id)
[docs] def execute(self): params = self.parameters.get_dict() client = pymongo.MongoClient( params.get("connection"), ssl=True, ssl_cert_reqs=ssl.CERT_NONE ) collection = client[params.get("db")][params.get("coll")] collection.insert_many(self.dataset.to_dict("records")) return collection
@classmethod def _get_tags(cls): return Tags(LibTag.MONGODB, TypeTag.OUTPUT)
[docs]class MongoCSVReader(InputNode): """Read a Pandas Dataframe from a MongoDB collection. Parameters ---------- node_id : str The unique id of the node. connection : str Hostname or IP address or Unix domain socket path of a single MongoDB instance to connect to, or a mongodb URI db : str Name of the database to connect to. coll : str Name of the collection to connect to. filter : dict, default None A SON object specifying elements which must be present for a document to be included in the result set projection : dict, default None A dict to exclude fields from the result (e.g. projection={'_id': False}) """ _output_vars = {"dataset": pd.DataFrame} def __init__( self, node_id: str, connection: str, db: str, coll: str, filter: dict = None, projection: dict = None, ): self.parameters = Parameters( connection=KeyValueParameter("connection", str, connection), db=KeyValueParameter("db", str, db), coll=KeyValueParameter("coll", str, coll), filter=KeyValueParameter("filter", dict, filter), projection=KeyValueParameter("projection", dict, projection), ) super(MongoCSVReader, self).__init__(node_id)
[docs] def execute(self): params = self.parameters.get_dict() client = pymongo.MongoClient( params.get("connection"), ssl=True, ssl_cert_reqs=ssl.CERT_NONE ) collection = client[params.get("db")][params.get("coll")] self.dataset = pandas.DataFrame( list( collection.find( filter=params.get("filter"), projection=params.get("projection") ) ) )
@classmethod def _get_tags(cls): return Tags(LibTag.MONGODB, TypeTag.INPUT)