Source code for libdots.model.influxdb_connector

#  This work is based on original code developed and copyrighted by TNO 2023
#  and further developed and copyrighted by Scene Ltd in 2025.
#  Subsequent contributions are licensed to you by the developers of such code and are
#  made available under one or several contributor license agreements.
#
#  This work is licensed to you under the Apache License, Version 2.0.
#  You may obtain a copy of the license at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Contributors:
#      TNO         - Initial implementation of the dots calculation-service-generator
#      Scene Ltd   - Development of libdots
#  Manager:
#      Scene Ltd

import logging
from collections.abc import Iterable
from datetime import datetime
from datetime import timedelta
from datetime import timezone as tz
from typing import Any

import numpy
from influxdb import InfluxDBClient

from ..types import EsdlId
from ..types import ESDLObject


[docs] class InfluxDBConnector: """A connector writes data to an InfluxDB database.""" def __init__( self, influx_host: str, influx_port: str, influx_user: str, influx_password: str, influx_database_name: str, ): self.influx_host: str = influx_host.split("//")[-1] self.influx_port: str = influx_port self.influx_database_name: str = influx_database_name self.influx_user: str = influx_user self.influx_password: str = influx_password self.logger = logging.getLogger(__name__) self.logger.debug(f"influx server: {self.influx_host}") self.logger.debug(f"influx port: {self.influx_port}") self.logger.debug(f"influx database: {self.influx_database_name}") self._client: InfluxDBClient | None = None self.simulation_id: str | None = None self.model_id: str | None = None self.esdl_type: str | None = None self.start_date: datetime | None = None self.time_step_seconds: int | None = None self.nr_of_time_steps: int | None = None self.profile_output_data: dict[ EsdlId, dict[str, list[Any] | numpy.typing.NDArray[Any]], ] = {} self.summary_output_data: dict[EsdlId, dict[str, float]] = {} self.esdl_objects: dict[EsdlId, ESDLObject] | None = None @property def client(self) -> InfluxDBClient: if self._client is None: self._client = self.connect() return self._client
[docs] def connect(self) -> InfluxDBClient: try: self.logger.debug("Connecting InfluxDBClient") client = InfluxDBClient( host=self.influx_host, port=int(self.influx_port), database=self.influx_database_name, username=self.influx_user, password=self.influx_password, ) self.logger.debug(f"InfluxDBClient ping: {client.ping()}") return client except Exception as e: self.logger.debug(f"Failed to connect to influx db: {e}") raise
[docs] def query(self, query: str): return self.client.query(query)
[docs] def create_database(self): self.client.create_database(self.influx_database_name)
[docs] def write(self, msgs: list[dict[str, Iterable[Any]]]): # Send message to database. self.client.write_points( msgs, database=self.influx_database_name, time_precision="s" )
[docs] def close(self): if self._client is not None: self._client.close() self._client = None
[docs] def init_profile_output_data( self, simulation_id: str, model_id: str, esdl_type: str, start_date: datetime, time_step_seconds: int, nr_of_time_steps: int, esdl_ids: list[EsdlId], output_names: list[str], esdl_objects: dict[EsdlId, ESDLObject], ): self.simulation_id = simulation_id self.esdl_type = esdl_type self.model_id = model_id self.start_date = start_date self.time_step_seconds = time_step_seconds self.nr_of_time_steps = nr_of_time_steps for esdl_id in esdl_ids: self.profile_output_data[esdl_id] = {} for output_name in output_names: self.profile_output_data[esdl_id][output_name] = numpy.zeros( self.nr_of_time_steps ) self.summary_output_data[esdl_id] = {} self.esdl_objects = esdl_objects
[docs] def set_time_step_data_point( self, esdl_id: EsdlId, output_name: str, time_step_nr: int, value: float ): if numpy.isnan(value): self.logger.warning("Value for %s is nan, changing to 0.0", output_name) value = 0.0 self.profile_output_data[esdl_id][output_name][time_step_nr - 1] = float(value)
[docs] def set_summary_data_point(self, esdl_id: EsdlId, output_name: str, value: float): self.summary_output_data[esdl_id][output_name] = value
[docs] def write_output(self): points: list[Any] = [] first_timestamp: str | None = None assert self.nr_of_time_steps is not None, "We do not have a nr_of_time_steps" assert self.time_step_seconds is not None, "We do not have a time_step_seconds" assert self.start_date is not None, "We do not have a start_date" for i_step in range(self.nr_of_time_steps): step_time = ( ( self.start_date + timedelta(seconds=(i_step + 2) * self.time_step_seconds) ) .astimezone(tz.utc) .strftime("%Y-%m-%dT%H:%M:%SZ") ) if first_timestamp is None: first_timestamp = step_time for ( esdl_id, esdl_object_profile_output_data, ) in self.profile_output_data.items(): fields = {} for output_name in esdl_object_profile_output_data.keys(): if i_step < len(esdl_object_profile_output_data[output_name]): fields[output_name] = esdl_object_profile_output_data[ output_name ][i_step] else: # allow data writing even if simulation was terminated fields[output_name] = 0.0 self.add_measurement(points, esdl_id, step_time, fields) if self.summary_output_data and first_timestamp is not None: for ( esdl_id, esdl_object_summary_output_data, ) in self.summary_output_data.items(): fields: dict[str, Any] = {} for output_name in esdl_object_summary_output_data.keys(): fields[output_name] = esdl_object_summary_output_data[output_name] if fields: self.add_measurement(points, esdl_id, first_timestamp, fields) self.logger.info( f"InfluxDB writing {len(points)} points to measurement '{self.esdl_type}'" f" with tag simulationRun {self.simulation_id}" ) self.write(points)
[docs] def add_measurement( self, points: list[dict[str, Any]], esdl_id: EsdlId, timestamp: str, fields: dict[str, Any], ): try: assert self.esdl_objects is not None if hasattr(self.esdl_objects[esdl_id], "name"): esdl_name = self.esdl_objects[esdl_id].name else: esdl_name = self.esdl_type item = { "measurement": f"{self.esdl_type}", "tags": { "simulation_id": self.simulation_id, "model_id": self.model_id, "esdl_id": esdl_id, "esdl_name": esdl_name, }, "time": timestamp, "fields": fields, } points.append(item) except Exception as e: self.logger.debug(f"Exception: {e} {e.args}")