io

input_data_inventory

class libdots.io.input_data_inventory.InputDataInventory(calculation_messages: dict[str, list[type[IODataInterface]]], service_name: str)[source]

Bases: object

The input data inventory. This tracks the incoming mqtt messages and whether all required messages have been received for a specific calculation function.

Parameters:
  • calculation_messages – A dictionary of function names and message types it should receive.

  • service_name – The name of the calculation service.

An example for calculation_messages looks like this

{
    "post_battery": [NewStep, ChargeConsumption, Supply],
    "pre_battery": [NewStep, Load, PvOutput, Import, Export],
}

This defines that there are 2 calculation functions for the calculstion service and what message type each of them requires as input.

add_input(main_topic: str, data_name: str, serialized_values: bytes) list[str][source]
all_calcs_done()[source]
create_new_class(input_class: type[IODataInterface], serialized_values: bytes) IODataInterface[source]
delete_all_received_input_data()[source]
property expected_esdl_ids_dict: dict[str, list[str]]
get_input_data(calc_name: str) dict[str, NewStep | list[IODataInterface]][source]
is_step_active() bool[source]
set_calc_done(calc_name: str)[source]
set_expected_esdl_ids_for_input_data(connected_input_esdl_objects_dict: dict[str, dict[str, list[str]]])[source]

io_data

class libdots.io.io_data.IODataInterface[source]

Bases: ABC

Abstract class defining incoming our outgoing messages. The actual data is comming from the messages in the messages module, and those are compiled from protobuf in message_definitions.

abstract classmethod get_main_topic() str[source]

Get MQTT topic

abstract classmethod get_name() str[source]

Get data name

abstract get_values_as_serialized_protobuf() bytes[source]

Get a protobuf encoded message with all variables.

abstract classmethod get_variable_descr() str[source]

“Get variables description

abstract set_values_from_serialized_protobuf(serialized_message: bytes)[source]

Set values on this object from the protobuf encoded message.

class libdots.io.io_data.ModelParameters[source]

Bases: IODataInterface

classmethod get_main_topic() str[source]

Get MQTT topic

classmethod get_name() str[source]

Get data name

get_values_as_serialized_protobuf() bytes[source]

Get a protobuf encoded message with all variables.

classmethod get_variable_descr() str[source]

“Get variables description

parameters_dict: ModelParametersDescription
set_values_from_serialized_protobuf(serialized_message: bytes)[source]

Set values on this object from the protobuf encoded message.

class libdots.io.io_data.NewStep[source]

Bases: IODataInterface

Messages that are sent over MQTT for each new timestep.

classmethod get_main_topic() str[source]

Get MQTT topic

classmethod get_name() str[source]

Get data name

get_values_as_serialized_protobuf() bytes[source]

Get a protobuf encoded message with all variables.

classmethod get_variable_descr() str[source]

“Get variables description

parameters_dict: TimeStepDescription
set_values_from_serialized_protobuf(serialized_message: bytes)[source]

Set values on this object from the protobuf encoded message.

mqtt_client

class libdots.io.mqtt_client.MqttClient(host: str, port: int, qos: int, username: str, password: str, service_name: str, input_data_inventory: InputDataInventory, service_calc: ServiceCalc[Any], sim_logger: Logger)[source]

Bases: object

The MQTT Client handling receiving and sending MQTT messages.

property mqtt_client: Client
send_log(message: str)[source]
wait_for_data()[source]

mqtt_log_handler

class libdots.io.mqtt_log_handler.MqttLogHandler(client: MqttClient)[source]

Bases: Handler

emit(record: LogRecord)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.