# This work is based on original code developed and copyrighted by TNO 2023
# and further developed and copyrighted by Scene Ltd in 2025.
# Subsequent contributions are licensed to you by the developers of such code and are
# made available under one or several contributor license agreements.
#
# This work is licensed to you under the Apache License, Version 2.0.
# You may obtain a copy of the license at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Contributors:
# TNO - Initial implementation of the dots calculation-service-generator
# Scene Ltd - Development of libdots
# Manager:
# Scene Ltd
import logging
import threading
import traceback
from typing import Any
from paho.mqtt.client import Client
from paho.mqtt.client import MQTTMessage
from ..model.service_calc import ServiceCalc
from ..types import EsdlId
from ..types import ServiceName
from . import messages
from .input_data_inventory import InputDataInventory
from .io_data import IODataInterface
from .io_data import ModelParameters
MODEL_PARAMETERS = "model_parameters"
NEW_STEP = "new_step"
SIMULATION_DONE = "simulations_done"
[docs]
class MqttClient:
"""
The MQTT Client handling receiving and sending MQTT messages.
"""
def __init__(
self,
host: str,
port: int,
qos: int,
username: str,
password: str,
service_name: str,
input_data_inventory: InputDataInventory,
service_calc: ServiceCalc[Any],
sim_logger: logging.Logger,
):
self.logger = logging.getLogger(__name__)
self.sim_logger = sim_logger
self.host = host
self.port = port
self.qos = qos
self.username = username
self.password = password
self.service_name = service_name
self._mqtt_client: Client | None = None
self.subscribed_topics: list[str] = []
self.input_data_inventory = input_data_inventory
self.service_calc = service_calc
@property
def mqtt_client(self) -> Client:
if self._mqtt_client is None:
self._mqtt_client = Client(clean_session=True)
return self._mqtt_client
[docs]
def wait_for_data(self):
# initialize mqtt connection
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client: Client, userdata: Any, flags: dict[str, Any], rc: int):
# print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
for topic in self.subscribed_topics:
client.subscribe(topic, self.qos)
# The callback for when a PUBLISH message is received from the server.
def on_message(client: Client, userdata: Any, msg: MQTTMessage):
t = threading.Thread(target=self._process_message, args=[client, msg])
t.setDaemon(True) # kill thread when main thread stops
t.start()
self.mqtt_client.on_connect = on_connect
self.mqtt_client.on_message = on_message
self.mqtt_client.username_pw_set(self.username, self.password)
self.mqtt_client.connect(self.host, port=self.port)
self._subscribe_lifecycle_topics()
self._send_ready_for_processing()
self.logger.debug("Service started, waiting for model parameters...")
self.mqtt_client.loop_forever()
def _process_message(self, client: Client, msg: MQTTMessage):
try:
topic = msg.topic
self.logger.debug(f" [received] {topic}: {msg.payload}")
data_name: str = self._get_data_name(topic)
if data_name == SIMULATION_DONE:
self.logger.debug("Received simulation done message")
self.service_calc.write_to_influxdb()
self.sim_logger.info(
f"Simulation Orchestrator terminated service: '{self.service_name}: "
f"{self.service_calc.model_id}' - '{self.service_calc.simulation_id}'"
)
client.disconnect()
else:
if (
data_name == MODEL_PARAMETERS or data_name == NEW_STEP
): # set lifecycle main topic
main_topic = "/lifecycle/dots-so/model"
else: # get data main topic from message topic
main_topic = "/".join(
topic.split("/")[0:4]
) # first three items separated by '/'
if data_name == MODEL_PARAMETERS:
model_parameter_data = self.input_data_inventory.create_new_class(
ModelParameters, msg.payload
)
if (
not isinstance(model_parameter_data, ModelParameters)
or model_parameter_data.parameters_dict is None
):
raise ValueError("Invalid Model Parameters Received")
self.service_calc.setup(model_parameter_data.parameters_dict)
self.input_data_inventory.set_expected_esdl_ids_for_input_data(
self.service_calc.connected_input_esdl_objects_dict
)
self._subscribe_data_topics(
client, self.service_calc.connected_input_esdl_objects_dict
)
self._send_parameterized()
non_executed_calc_names_input_received = []
else:
# add input data and receive a list of calculations that have all required input available
non_executed_calc_names_input_received = (
self.input_data_inventory.add_input(
main_topic, data_name, msg.payload
)
)
# do step for calculations that have received all required input
if self.input_data_inventory.is_step_active():
for calc_name in non_executed_calc_names_input_received:
self._do_step(calc_name)
if self.input_data_inventory.all_calcs_done():
self._send_calculations_done()
self.input_data_inventory.delete_all_received_input_data()
except Exception as ex:
error_message = str(ex) + traceback.format_exc()
self.logger.error(error_message)
self._send_error_occurred(error_message)
client.disconnect()
@staticmethod
def _get_data_name(topic: str):
message = topic.split("/")[6]
if topic.startswith("/lifecycle/dots-so/model/"):
if message == "ModelParameters":
message = MODEL_PARAMETERS
elif message == "NewStep":
message = NEW_STEP
elif message == "SimulationDone":
message = SIMULATION_DONE
else:
raise Exception(f"Received unknown lifecycle message: '{message}'")
return message
def _subscribe_lifecycle_topics(self):
topic = f"/lifecycle/dots-so/model/{self.service_calc.simulation_id}/{self.service_calc.model_id}/+"
self.mqtt_client.subscribe(topic, self.qos)
self.subscribed_topics.append(topic)
def _subscribe_data_topics(
self,
client: Client,
connected_input_esdl_objects_dict: dict[
EsdlId, dict[ServiceName, list[EsdlId]]
],
):
topics: list[str] = []
for (
main_topic,
esdl_ids,
) in self.input_data_inventory.expected_esdl_ids_dict.items():
for esdl_id in esdl_ids:
topics.append(
f"{main_topic}/{self.service_calc.simulation_id}/{esdl_id}/#"
)
for topic in set(topics):
client.subscribe(topic, qos=self.qos)
self.subscribed_topics.append(topic)
def _send_ready_for_processing(self):
topic = f"/lifecycle/model/mso/{self.service_calc.simulation_id}/{self.service_calc.model_id}/ReadyForProcessing"
self.mqtt_client.publish(
topic, payload=messages.ReadyForProcessing().SerializeToString()
)
self.logger.debug(f" [sent] {topic}")
def _send_parameterized(self):
topic = f"/lifecycle/model/dots-so/{self.service_calc.simulation_id}/{self.service_calc.model_id}/Parameterized"
self.mqtt_client.publish(
topic, payload=messages.Parameterized().SerializeToString()
)
self.logger.debug(f" [sent] {topic}")
def _send_io_data(self, esdl_id: EsdlId, io_data: IODataInterface):
topic = f"{io_data.get_main_topic()}/{self.service_calc.simulation_id}/{esdl_id}/{io_data.get_name()}"
self.mqtt_client.publish(topic, io_data.get_values_as_serialized_protobuf())
self.logger.debug(f" [sent] {topic}: {io_data.get_variable_descr()}")
def _send_calculations_done(self):
topic = f"/lifecycle/model/dots-so/{self.service_calc.simulation_id}/{self.service_calc.model_id}/CalculationsDone"
self.mqtt_client.publish(
topic, payload=messages.CalculationsDone().SerializeToString()
)
self.logger.debug(f" [sent] {topic}")
[docs]
def send_log(self, message: str):
self.mqtt_client.publish(
f"/log/model/dots-so/{self.service_calc.simulation_id}/{self.service_calc.model_id}",
message.encode("utf-8"),
)
def _send_error_occurred(self, message: str):
error_occurred_message = messages.ErrorOccurred(error_message=message)
self.mqtt_client.publish(
f"/lifecycle/model/dots-so/{self.service_calc.simulation_id}/{self.service_calc.model_id}/ErrorOccurred",
error_occurred_message.SerializeToString(),
)
def _do_step(self, calc_name: str):
# do step calculation if all data received for 'calc_name'
self.sim_logger.debug(
f"start '{self.service_calc.service_name} ({self.service_calc.model_id}) - {calc_name}'"
)
output_data_tuple = self.service_calc.calc_function(
calc_name, self.input_data_inventory.get_input_data(calc_name)
)
# send results
if output_data_tuple:
for output_data_dict in output_data_tuple:
for esdl_id, output_data in output_data_dict.items():
self._send_io_data(esdl_id, output_data)
self.input_data_inventory.set_calc_done(calc_name)
self.sim_logger.debug(
f"finished '{self.service_calc.service_name} ({self.service_calc.model_id}) - {calc_name}'"
)