Skip to content

Power Station Simulator

This section contains the Python API documentation for the power station simulation code.

The following modules are documented here:

main()

Main function to run the Power Station Simulator.

This function: 1. Sets up command-line argument parsing 2. Loads configuration based on the provided station prefix 3. Initializes the station simulator with the configuration 4. Runs the simulator in an infinite loop until interrupted 5. Performs graceful shutdown on keyboard interrupt

Command-line arguments

-sp, --station-prefix: Prefix for power station-specific environment variables (e.g., PS_001)

Returns:

Type Description
None

None

Source code in src/powerstation_simulator/main.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def main() -> None:
    """
    Main function to run the Power Station Simulator.

    This function:
    1. Sets up command-line argument parsing
    2. Loads configuration based on the provided station prefix
    3. Initializes the station simulator with the configuration
    4. Runs the simulator in an infinite loop until interrupted
    5. Performs graceful shutdown on keyboard interrupt

    Command-line arguments:
        -sp, --station-prefix: Prefix for power station-specific environment variables
                              (e.g., PS_001)

    Returns:
        None
    """
    parser = argparse.ArgumentParser(
        description="⚡ Power Station Simulator",
        epilog="""
IMPORTANT:
- Use the --station-prefix (or -sp) flag to load power station-specific config.
- The required environment variables should be defined in a `.env` file or OS environment.
- When a prefix is given, all relevant keys will be loaded with that prefix applied.

See `src/config.py` for the complete list of supported configuration fields.

Examples:
  # Load config with prefix PS_001 (looks for variables like PS_001_*)
  python main.py --station-prefix PS_001

  # Load config with prefix TEST (looks for variables like TEST_*)
  python main.py -sp TEST
""",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument(
        "-sp",
        "--station-prefix",
        type=str,
        help="Prefix for power station-specific environment variables (e.g., PS_001)",
    )

    args = parser.parse_args()

    print(ps_banner)

    app_config: AppConfig = load_power_station_configs(
        station_prefix=args.station_prefix
    )

    simulator: StationSimulator = StationSimulator(app_config=app_config)
    simulator.startup_sequence()
    try:
        while True:
            sleep(1)
    except KeyboardInterrupt:
        simulator.shutdown_sequence()

StationSimulator

Simulates a power station that publishes its status, metadata, and output via MQTT.

The simulator can be in one of two states: - Online but not running (power output is 0) - Online and running (generating power)

The class handles MQTT communication, publishes information on regular intervals, and responds to control commands to start or stop power generation.

Attributes:

Name Type Description
metadata_topic str

Topic for publishing station metadata

output_topic str

Topic for publishing power output

status_topic str

Topic for publishing operational status

control_topic str

Topic for receiving control commands

Source code in src/powerstation_simulator/station_simulator.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class StationSimulator:
    """
    Simulates a power station that publishes its status, metadata, and output via MQTT.

    The simulator can be in one of two states:
    - Online but not running (power output is 0)
    - Online and running (generating power)

    The class handles MQTT communication, publishes information on regular intervals,
    and responds to control commands to start or stop power generation.

    Attributes:
        metadata_topic (str): Topic for publishing station metadata
        output_topic (str): Topic for publishing power output
        status_topic (str): Topic for publishing operational status
        control_topic (str): Topic for receiving control commands
    """

    metadata_topic: str = "metadata"
    output_topic: str = "output"
    status_topic: str = "status"
    control_topic: str = "control"

    def __init__(self, app_config: AppConfig):
        """
        Initialize the station simulator.

        Args:
            app_config (AppConfig): Configuration object containing MQTT settings,
                                    power station details, and publishing intervals
        """
        self.online: bool = False
        self.running: bool = False
        self.app_config: AppConfig = app_config
        self.mqqt_client: MQTTClient = get_mqtt_client(app_config=app_config)

    def startup_sequence(self):
        """
        Start the station simulator and initialize all communication threads.

        Connects to MQTT broker, subscribes to control topic, and starts
        threads for publishing metadata, status, and power output.
        """
        logger.info("Initializing StationSimulator startup-sequence...")
        self.mqqt_client.connect()
        sleep(0.01)
        self.online = True

        # Subscribe to control channel
        self.mqqt_client.subscribe(
            topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.control_topic}",
            on_message=self.__handle_control,
        )

        # Start metadata + status loops
        self.__status_thread: Thread = Thread(target=self.__publish_status_loop)
        self.__metadata_thread: Thread = Thread(target=self.__publish_metadata_loop)
        self.__output_thread: Thread = Thread(target=self.__publish_output_loop)
        self.__status_thread.start()
        self.__metadata_thread.start()
        self.__output_thread.start()

        logger.info("StationSimulator startup-sequence COMPLETED.")

    def shutdown_sequence(self):
        """
        Safely shut down the station simulator.

        Sets the station to offline state, waits for all publisher threads to complete,
        and disconnects from MQTT broker.
        """
        logger.info("Initializing StationSimulator shutdown-sequence...")
        logger.info("!!!...PLEASE DO NOT REPEATEDLY PRESS 'Ctrl+C' ...!!!")
        self.online = False
        self.running = False
        if hasattr(self, "__status_thread") and self.__status_thread.is_alive():
            self.__status_thread.join()
        if hasattr(self, "__metadata_thread") and self.__metadata_thread.is_alive():
            self.__metadata_thread.join()
        if hasattr(self, "__output_thread") and self.__output_thread.is_alive():
            self.__output_thread.join()

        self.mqqt_client.disconnect()
        logger.info("StationSimulator shutdown-sequence COMPLETED.")

    def simulate_output(self) -> int:
        """
        Generate a simulated power output value.

        Returns:
            int: Simulated power output in kilowatts, a random value between
                 80% and 120% of the configured capacity
        """
        return int(
            self.app_config.CAPACITY_KW * 0.8
            + (self.app_config.CAPACITY_KW * 0.4 * random())
        )

    def publish_metadata(self):
        """
        Publish the power station's metadata to MQTT.

        Publishes location and capacity information to the metadata topic.
        """
        self.mqqt_client.publish(
            topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.metadata_topic}/{self.app_config.POWER_STATION_ID}",
            payload=dict(
                location=self.app_config.LOCATION,
                capacity_kw=self.app_config.CAPACITY_KW,
            ),
        )

    def publish_status(self):
        """
        Publish the power station's current status to MQTT.

        Status will be either "running" (if generating power) or "online" (if not generating).
        """
        self.mqqt_client.publish(
            topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.status_topic}",
            payload="running" if self.running else "online",
        )

    def publish_output(self):
        """
        Publish the current power output to MQTT.

        If the station is running, publishes a simulated output value; otherwise, publishes 0.
        """
        self.mqqt_client.publish(
            topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.output_topic}",
            payload=self.simulate_output() if self.running else 0,
        )

    def __publish_metadata_loop(self):
        """
        Background loop that publishes metadata at regular intervals.

        Runs continuously while the station is online.
        """
        while self.online:
            self.publish_metadata()
            sleep(self.app_config.METADATA_PUBLISH_INTERVAL_SECONDS)

    def __publish_status_loop(self):
        """
        Background loop that publishes status at regular intervals.

        Runs continuously while the station is online.
        """
        while self.online:
            self.publish_status()
            sleep(self.app_config.STATUS_PUBLISH_INTERVAL_SECONDS)

    def __publish_output_loop(self):
        """
        Background loop that publishes power output at regular intervals.

        Runs continuously while the station is online.
        """
        while self.online:
            self.publish_output()
            sleep(self.app_config.PUBLISH_INTERVAL_SECONDS)

    def __handle_control(self, client: Any, userdata: Any, message: Any):
        """
        Callback handler for MQTT control messages.

        Parses control commands and changes the station's running state accordingly.

        Args:
            client: MQTT client instance (not used)
            userdata: MQTT user data (not used)
            message: MQTT message containing the control command
        """
        payload = message.payload.decode()
        logger.info(f"Control message received: {payload}")

        normalized_command = str(payload).strip()

        if normalized_command not in ("0", "1"):
            logger.warning(f"Unknown control command (expected '0' or '1'): {payload}")
            return

        self.control(is_start=normalized_command == "1")

    def control(self, is_start: bool):
        """
        Change the running state of the power station.

        Args:
            is_start (bool): True to start power generation, False to stop
        """
        logger.info(f"StationSimulator is {'starting' if is_start else 'stopping'}...")
        self.running = is_start
        logger.info(f"StationSimulator {'started' if is_start else 'stopped'}.")

__handle_control(client, userdata, message)

Callback handler for MQTT control messages.

Parses control commands and changes the station's running state accordingly.

Parameters:

Name Type Description Default
client Any

MQTT client instance (not used)

required
userdata Any

MQTT user data (not used)

required
message Any

MQTT message containing the control command

required
Source code in src/powerstation_simulator/station_simulator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def __handle_control(self, client: Any, userdata: Any, message: Any):
    """
    Callback handler for MQTT control messages.

    Parses control commands and changes the station's running state accordingly.

    Args:
        client: MQTT client instance (not used)
        userdata: MQTT user data (not used)
        message: MQTT message containing the control command
    """
    payload = message.payload.decode()
    logger.info(f"Control message received: {payload}")

    normalized_command = str(payload).strip()

    if normalized_command not in ("0", "1"):
        logger.warning(f"Unknown control command (expected '0' or '1'): {payload}")
        return

    self.control(is_start=normalized_command == "1")

__init__(app_config)

Initialize the station simulator.

Parameters:

Name Type Description Default
app_config AppConfig

Configuration object containing MQTT settings, power station details, and publishing intervals

required
Source code in src/powerstation_simulator/station_simulator.py
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, app_config: AppConfig):
    """
    Initialize the station simulator.

    Args:
        app_config (AppConfig): Configuration object containing MQTT settings,
                                power station details, and publishing intervals
    """
    self.online: bool = False
    self.running: bool = False
    self.app_config: AppConfig = app_config
    self.mqqt_client: MQTTClient = get_mqtt_client(app_config=app_config)

__publish_metadata_loop()

Background loop that publishes metadata at regular intervals.

Runs continuously while the station is online.

Source code in src/powerstation_simulator/station_simulator.py
147
148
149
150
151
152
153
154
155
def __publish_metadata_loop(self):
    """
    Background loop that publishes metadata at regular intervals.

    Runs continuously while the station is online.
    """
    while self.online:
        self.publish_metadata()
        sleep(self.app_config.METADATA_PUBLISH_INTERVAL_SECONDS)

__publish_output_loop()

Background loop that publishes power output at regular intervals.

Runs continuously while the station is online.

Source code in src/powerstation_simulator/station_simulator.py
167
168
169
170
171
172
173
174
175
def __publish_output_loop(self):
    """
    Background loop that publishes power output at regular intervals.

    Runs continuously while the station is online.
    """
    while self.online:
        self.publish_output()
        sleep(self.app_config.PUBLISH_INTERVAL_SECONDS)

__publish_status_loop()

Background loop that publishes status at regular intervals.

Runs continuously while the station is online.

Source code in src/powerstation_simulator/station_simulator.py
157
158
159
160
161
162
163
164
165
def __publish_status_loop(self):
    """
    Background loop that publishes status at regular intervals.

    Runs continuously while the station is online.
    """
    while self.online:
        self.publish_status()
        sleep(self.app_config.STATUS_PUBLISH_INTERVAL_SECONDS)

control(is_start)

Change the running state of the power station.

Parameters:

Name Type Description Default
is_start bool

True to start power generation, False to stop

required
Source code in src/powerstation_simulator/station_simulator.py
199
200
201
202
203
204
205
206
207
208
def control(self, is_start: bool):
    """
    Change the running state of the power station.

    Args:
        is_start (bool): True to start power generation, False to stop
    """
    logger.info(f"StationSimulator is {'starting' if is_start else 'stopping'}...")
    self.running = is_start
    logger.info(f"StationSimulator {'started' if is_start else 'stopped'}.")

publish_metadata()

Publish the power station's metadata to MQTT.

Publishes location and capacity information to the metadata topic.

Source code in src/powerstation_simulator/station_simulator.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def publish_metadata(self):
    """
    Publish the power station's metadata to MQTT.

    Publishes location and capacity information to the metadata topic.
    """
    self.mqqt_client.publish(
        topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.metadata_topic}/{self.app_config.POWER_STATION_ID}",
        payload=dict(
            location=self.app_config.LOCATION,
            capacity_kw=self.app_config.CAPACITY_KW,
        ),
    )

publish_output()

Publish the current power output to MQTT.

If the station is running, publishes a simulated output value; otherwise, publishes 0.

Source code in src/powerstation_simulator/station_simulator.py
136
137
138
139
140
141
142
143
144
145
def publish_output(self):
    """
    Publish the current power output to MQTT.

    If the station is running, publishes a simulated output value; otherwise, publishes 0.
    """
    self.mqqt_client.publish(
        topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.output_topic}",
        payload=self.simulate_output() if self.running else 0,
    )

publish_status()

Publish the power station's current status to MQTT.

Status will be either "running" (if generating power) or "online" (if not generating).

Source code in src/powerstation_simulator/station_simulator.py
125
126
127
128
129
130
131
132
133
134
def publish_status(self):
    """
    Publish the power station's current status to MQTT.

    Status will be either "running" (if generating power) or "online" (if not generating).
    """
    self.mqqt_client.publish(
        topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.status_topic}",
        payload="running" if self.running else "online",
    )

shutdown_sequence()

Safely shut down the station simulator.

Sets the station to offline state, waits for all publisher threads to complete, and disconnects from MQTT broker.

Source code in src/powerstation_simulator/station_simulator.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def shutdown_sequence(self):
    """
    Safely shut down the station simulator.

    Sets the station to offline state, waits for all publisher threads to complete,
    and disconnects from MQTT broker.
    """
    logger.info("Initializing StationSimulator shutdown-sequence...")
    logger.info("!!!...PLEASE DO NOT REPEATEDLY PRESS 'Ctrl+C' ...!!!")
    self.online = False
    self.running = False
    if hasattr(self, "__status_thread") and self.__status_thread.is_alive():
        self.__status_thread.join()
    if hasattr(self, "__metadata_thread") and self.__metadata_thread.is_alive():
        self.__metadata_thread.join()
    if hasattr(self, "__output_thread") and self.__output_thread.is_alive():
        self.__output_thread.join()

    self.mqqt_client.disconnect()
    logger.info("StationSimulator shutdown-sequence COMPLETED.")

simulate_output()

Generate a simulated power output value.

Returns:

Name Type Description
int int

Simulated power output in kilowatts, a random value between 80% and 120% of the configured capacity

Source code in src/powerstation_simulator/station_simulator.py
 98
 99
100
101
102
103
104
105
106
107
108
109
def simulate_output(self) -> int:
    """
    Generate a simulated power output value.

    Returns:
        int: Simulated power output in kilowatts, a random value between
             80% and 120% of the configured capacity
    """
    return int(
        self.app_config.CAPACITY_KW * 0.8
        + (self.app_config.CAPACITY_KW * 0.4 * random())
    )

startup_sequence()

Start the station simulator and initialize all communication threads.

Connects to MQTT broker, subscribes to control topic, and starts threads for publishing metadata, status, and power output.

Source code in src/powerstation_simulator/station_simulator.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def startup_sequence(self):
    """
    Start the station simulator and initialize all communication threads.

    Connects to MQTT broker, subscribes to control topic, and starts
    threads for publishing metadata, status, and power output.
    """
    logger.info("Initializing StationSimulator startup-sequence...")
    self.mqqt_client.connect()
    sleep(0.01)
    self.online = True

    # Subscribe to control channel
    self.mqqt_client.subscribe(
        topic=f"{self.app_config.MQTT_TOPIC_PREFIX}/{self.app_config.POWER_STATION_ID}/{self.control_topic}",
        on_message=self.__handle_control,
    )

    # Start metadata + status loops
    self.__status_thread: Thread = Thread(target=self.__publish_status_loop)
    self.__metadata_thread: Thread = Thread(target=self.__publish_metadata_loop)
    self.__output_thread: Thread = Thread(target=self.__publish_output_loop)
    self.__status_thread.start()
    self.__metadata_thread.start()
    self.__output_thread.start()

    logger.info("StationSimulator startup-sequence COMPLETED.")

MQTTClient

A client for connecting to an MQTT broker and publishing/subscribing to topics.

This class provides methods to connect to an MQTT broker, publish messages, subscribe to topics, and handle connection events.

Source code in src/powerstation_simulator/mqtt_client.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class MQTTClient:
    """
    A client for connecting to an MQTT broker and publishing/subscribing to topics.

    This class provides methods to connect to an MQTT broker, publish messages,
    subscribe to topics, and handle connection events.
    """

    def __init__(
        self,
        client_id: str,
        host: str,
        port: int,
        username: str,
        password: str,
        enable_websocket: bool,
    ):
        """
        Initialize an MQTT client.

        Args:
            client_id (str): Unique identifier for this client
            host (str): MQTT broker hostname or IP address
            port (int): MQTT broker port
            username (str): Authentication username
            password (str): Authentication password
            enable_websocket (bool): Use websockets instead of TCP if True
        """
        self.host: str = host
        self.port: int = port
        self.connected: bool = False

        self.client: mqtt_client.Client = mqtt_client.Client(
            client_id=f"{client_id}_".join(choices(ascii_letters + digits, k=6)),
            transport="websockets" if enable_websocket else "tcp",
        )
        if username and password:
            self.client.username_pw_set(username, password)

        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect

    def connect(self):
        """
        Connect to the MQTT broker and start the network loop.
        """
        self.client.connect(self.host, self.port)
        self.client.loop_start()

    def disconnect(self):
        """
        Disconnect from the MQTT broker and stop the network loop.
        """
        self.client.loop_stop()
        self.client.disconnect()

    def on_connect(
        self, client: mqtt_client.Client, userdata: Any, flags: dict, rc: int
    ) -> None:
        """
        Callback for when the client connects to the broker.

        Args:
            client: The client instance
            userdata: User data of any type
            flags: Response flags sent by the broker
            rc (int): Connection result code
        """
        if rc == 0:
            self.connected = True
            logger.info(f"Connected to MQTT broker at {self.host}:{self.port}")
        else:
            logger.error(f"Failed to connect, return code {rc}")

    def on_disconnect(self, client: mqtt_client.Client, userdata: Any, rc: int) -> None:
        """
        Callback for when the client disconnects from the broker.

        Args:
            client: The client instance
            userdata: User data of any type
            rc (int): Disconnection result code
        """
        self.connected = False
        logger.info("Disconnected from MQTT broker")

    def publish(self, topic: str, payload: Any, qos: int = 1) -> bool:
        """
        Publish a message to a topic.

        Args:
            topic (str): The topic to publish to
            payload (Any): The message to publish (dictionaries will be JSON-encoded)
            qos (int, optional): Quality of Service level. Defaults to 1.

        Returns:
            bool: True if the message was published successfully, False otherwise
        """
        if not self.connected:
            logger.warning("MQTT client not connected. Cannot publish.")
            return False

        result = self.client.publish(
            topic=topic,
            payload=dumps(payload) if isinstance(payload, dict) else payload,
            qos=qos,
        )

        if not result.rc:
            logger.debug(f"Published to topic {topic}: {payload}")
            return True
        logger.error(f"Failed to publish to topic {topic}")
        return False

    def subscribe(
        self, topic: str, qos: int = 1, on_message: Any | None = None
    ) -> None:
        """
        Subscribe to a topic.

        Args:
            topic (str): The topic to subscribe to
            qos (int, optional): Quality of Service level. Defaults to 1.
            on_message (callable, optional): Callback for when a message is received.
                If None, a default handler will be used.
        """

        def __on_message(client, userdata, msg):
            logger.info(f"Received message on {msg.topic}: {msg.payload.decode()}")

        self.client.subscribe(topic, qos=qos)
        self.client.on_message = on_message or __on_message

__init__(client_id, host, port, username, password, enable_websocket)

Initialize an MQTT client.

Parameters:

Name Type Description Default
client_id str

Unique identifier for this client

required
host str

MQTT broker hostname or IP address

required
port int

MQTT broker port

required
username str

Authentication username

required
password str

Authentication password

required
enable_websocket bool

Use websockets instead of TCP if True

required
Source code in src/powerstation_simulator/mqtt_client.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    client_id: str,
    host: str,
    port: int,
    username: str,
    password: str,
    enable_websocket: bool,
):
    """
    Initialize an MQTT client.

    Args:
        client_id (str): Unique identifier for this client
        host (str): MQTT broker hostname or IP address
        port (int): MQTT broker port
        username (str): Authentication username
        password (str): Authentication password
        enable_websocket (bool): Use websockets instead of TCP if True
    """
    self.host: str = host
    self.port: int = port
    self.connected: bool = False

    self.client: mqtt_client.Client = mqtt_client.Client(
        client_id=f"{client_id}_".join(choices(ascii_letters + digits, k=6)),
        transport="websockets" if enable_websocket else "tcp",
    )
    if username and password:
        self.client.username_pw_set(username, password)

    self.client.on_connect = self.on_connect
    self.client.on_disconnect = self.on_disconnect

connect()

Connect to the MQTT broker and start the network loop.

Source code in src/powerstation_simulator/mqtt_client.py
55
56
57
58
59
60
def connect(self):
    """
    Connect to the MQTT broker and start the network loop.
    """
    self.client.connect(self.host, self.port)
    self.client.loop_start()

disconnect()

Disconnect from the MQTT broker and stop the network loop.

Source code in src/powerstation_simulator/mqtt_client.py
62
63
64
65
66
67
def disconnect(self):
    """
    Disconnect from the MQTT broker and stop the network loop.
    """
    self.client.loop_stop()
    self.client.disconnect()

on_connect(client, userdata, flags, rc)

Callback for when the client connects to the broker.

Parameters:

Name Type Description Default
client Client

The client instance

required
userdata Any

User data of any type

required
flags dict

Response flags sent by the broker

required
rc int

Connection result code

required
Source code in src/powerstation_simulator/mqtt_client.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def on_connect(
    self, client: mqtt_client.Client, userdata: Any, flags: dict, rc: int
) -> None:
    """
    Callback for when the client connects to the broker.

    Args:
        client: The client instance
        userdata: User data of any type
        flags: Response flags sent by the broker
        rc (int): Connection result code
    """
    if rc == 0:
        self.connected = True
        logger.info(f"Connected to MQTT broker at {self.host}:{self.port}")
    else:
        logger.error(f"Failed to connect, return code {rc}")

on_disconnect(client, userdata, rc)

Callback for when the client disconnects from the broker.

Parameters:

Name Type Description Default
client Client

The client instance

required
userdata Any

User data of any type

required
rc int

Disconnection result code

required
Source code in src/powerstation_simulator/mqtt_client.py
87
88
89
90
91
92
93
94
95
96
97
def on_disconnect(self, client: mqtt_client.Client, userdata: Any, rc: int) -> None:
    """
    Callback for when the client disconnects from the broker.

    Args:
        client: The client instance
        userdata: User data of any type
        rc (int): Disconnection result code
    """
    self.connected = False
    logger.info("Disconnected from MQTT broker")

publish(topic, payload, qos=1)

Publish a message to a topic.

Parameters:

Name Type Description Default
topic str

The topic to publish to

required
payload Any

The message to publish (dictionaries will be JSON-encoded)

required
qos int

Quality of Service level. Defaults to 1.

1

Returns:

Name Type Description
bool bool

True if the message was published successfully, False otherwise

Source code in src/powerstation_simulator/mqtt_client.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def publish(self, topic: str, payload: Any, qos: int = 1) -> bool:
    """
    Publish a message to a topic.

    Args:
        topic (str): The topic to publish to
        payload (Any): The message to publish (dictionaries will be JSON-encoded)
        qos (int, optional): Quality of Service level. Defaults to 1.

    Returns:
        bool: True if the message was published successfully, False otherwise
    """
    if not self.connected:
        logger.warning("MQTT client not connected. Cannot publish.")
        return False

    result = self.client.publish(
        topic=topic,
        payload=dumps(payload) if isinstance(payload, dict) else payload,
        qos=qos,
    )

    if not result.rc:
        logger.debug(f"Published to topic {topic}: {payload}")
        return True
    logger.error(f"Failed to publish to topic {topic}")
    return False

subscribe(topic, qos=1, on_message=None)

Subscribe to a topic.

Parameters:

Name Type Description Default
topic str

The topic to subscribe to

required
qos int

Quality of Service level. Defaults to 1.

1
on_message callable

Callback for when a message is received. If None, a default handler will be used.

None
Source code in src/powerstation_simulator/mqtt_client.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def subscribe(
    self, topic: str, qos: int = 1, on_message: Any | None = None
) -> None:
    """
    Subscribe to a topic.

    Args:
        topic (str): The topic to subscribe to
        qos (int, optional): Quality of Service level. Defaults to 1.
        on_message (callable, optional): Callback for when a message is received.
            If None, a default handler will be used.
    """

    def __on_message(client, userdata, msg):
        logger.info(f"Received message on {msg.topic}: {msg.payload.decode()}")

    self.client.subscribe(topic, qos=qos)
    self.client.on_message = on_message or __on_message

get_mqtt_client(app_config)

Create and return an MQTT client using configuration from the application settings.

This function initializes an MQTTClient instance with connection parameters extracted from the provided AppConfig object.

Parameters:

Name Type Description Default
app_config AppConfig

Application configuration containing MQTT connection settings

required

Returns:

Name Type Description
MQTTClient MQTTClient

A configured MQTT client instance ready for connection

Source code in src/powerstation_simulator/mqtt_client.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_mqtt_client(app_config: AppConfig) -> MQTTClient:
    """
    Create and return an MQTT client using configuration from the application settings.

    This function initializes an MQTTClient instance with connection parameters
    extracted from the provided AppConfig object.

    Args:
        app_config (AppConfig): Application configuration containing MQTT connection settings

    Returns:
        MQTTClient: A configured MQTT client instance ready for connection
    """
    return MQTTClient(
        client_id=app_config.POWER_STATION_ID,
        host=app_config.MQTT_HOST,
        port=app_config.MQTT_PORT,
        username=app_config.MQTT_USERNAME,
        password=app_config.MQTT_PASSWORD,
        enable_websocket=app_config.ENABLE_WEBSOCKET,
    )

AppConfig

Bases: BaseSettings

AppConfig is the main configuration class for the power station application.

This class defines all the necessary configuration parameters for the application, including station metadata, MQTT broker settings, and simulator configurations. It uses Pydantic's BaseSettings for environment variable loading and validation.

Environment variables are loaded from a .env.sample file by default.

Source code in src/powerstation_simulator/config.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class AppConfig(BaseSettings):
    """
    AppConfig is the main configuration class for the power station application.

    This class defines all the necessary configuration parameters for the application,
    including station metadata, MQTT broker settings, and simulator configurations.
    It uses Pydantic's BaseSettings for environment variable loading and validation.

    Environment variables are loaded from a .env.sample file by default.
    """

    model_config = SettingsConfigDict(
        env_file=".env.sample",
        env_file_encoding="utf-8",
        validate_by_name=False,
        case_sensitive=True,
        extra="ignore",
    )

    # Metadata (static info)
    POWER_STATION_ID: Annotated[str, Field()] = "PS_001"
    LOCATION: Annotated[str, Field()] = "Dhaka, Bangladesh"
    CAPACITY_KW: Annotated[int, Field()] = 1000

    # MQTT broker config
    MQTT_HOST: Annotated[str, Field()] = "127.0.0.1"
    MQTT_PORT: Annotated[int, Field()] = 1883
    MQTT_USERNAME: Annotated[str, Field()] = "extinctcoder"
    MQTT_PASSWORD: Annotated[str, Field()] = "Mosquitto123456#"
    MQTT_TOPIC_PREFIX: Annotated[str, Field()] = "smartgrid/powerstation"

    ENABLE_WEBSOCKET: Annotated[bool, Field()] = False

    # Simulator settings
    PUBLISH_INTERVAL_SECONDS: Annotated[int, Field()] = 1

    STATUS_PUBLISH_INTERVAL_SECONDS: int = PUBLISH_INTERVAL_SECONDS * 2
    METADATA_PUBLISH_INTERVAL_SECONDS: int = PUBLISH_INTERVAL_SECONDS * 5

load_power_station_configs(station_prefix=None)

Dynamically loads the config for a specific power station prefix, e.g., 'PS_001'. If no prefix is provided explicitly, reads from the STATION_PREFIX env var.

Source code in src/powerstation_simulator/config.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def load_power_station_configs(station_prefix: str | None = None) -> AppConfig:
    """
    Dynamically loads the config for a specific power station prefix, e.g., 'PS_001'.
    If no prefix is provided explicitly, reads from the STATION_PREFIX env var.
    """
    station_prefix = station_prefix or getenv("STATION_PREFIX")

    logger.info(f"Simple Power Station SIMULATOR serving station : {station_prefix}")

    if not station_prefix:
        return AppConfig()  # fallback to default or global settings

    class AppConfigWithPrefix(AppConfig):
        model_config = {
            **AppConfig.model_config,
            "env_prefix": station_prefix.upper().replace("-", "_") + "_",
        }

    return AppConfigWithPrefix()

ColoredFormatter

Bases: Formatter

A custom logging formatter that adds color to log level names in terminal output.

This formatter wraps the log level name with ANSI color codes based on the severity level. Colors are defined in the COLORS dictionary.

Methods:

Name Description
format

Overrides the base Formatter's format method to add colors.

Source code in src/powerstation_simulator/logger.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ColoredFormatter(logging.Formatter):
    """
    A custom logging formatter that adds color to log level names in terminal output.

    This formatter wraps the log level name with ANSI color codes based on the
    severity level. Colors are defined in the COLORS dictionary.

    Methods:
        format: Overrides the base Formatter's format method to add colors.
    """

    def format(self, record: LogRecord) -> str:
        """
        Format the specified record with colored level names.

        Args:
            record: A LogRecord object containing all the information
                   needed to generate a log message.

        Returns:
            str: The formatted log message with colored level name.
        """
        color = COLORS.get(record.levelname, COLORS["RESET"])
        record.levelname = f"{color}{record.levelname}{COLORS['RESET']}"
        return super().format(record)

format(record)

Format the specified record with colored level names.

Parameters:

Name Type Description Default
record LogRecord

A LogRecord object containing all the information needed to generate a log message.

required

Returns:

Name Type Description
str str

The formatted log message with colored level name.

Source code in src/powerstation_simulator/logger.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def format(self, record: LogRecord) -> str:
    """
    Format the specified record with colored level names.

    Args:
        record: A LogRecord object containing all the information
               needed to generate a log message.

    Returns:
        str: The formatted log message with colored level name.
    """
    color = COLORS.get(record.levelname, COLORS["RESET"])
    record.levelname = f"{color}{record.levelname}{COLORS['RESET']}"
    return super().format(record)

getLogger(name)

Get a logger with the specified name, ensuring the base logger is configured.

This function calls setup_base_logger() to ensure that logging is properly configured with colored output, then returns a logger with the given name.

Parameters:

Name Type Description Default
name str

A string that identifies the logger.

required

Returns:

Type Description
Logger

logging.Logger: A configured logger instance with the specified name.

Source code in src/powerstation_simulator/logger.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def getLogger(name: str) -> Logger:
    """
    Get a logger with the specified name, ensuring the base logger is configured.

    This function calls setup_base_logger() to ensure that logging is properly
    configured with colored output, then returns a logger with the given name.

    Args:
        name: A string that identifies the logger.

    Returns:
        logging.Logger: A configured logger instance with the specified name.
    """
    setup_base_logger()
    return logging.getLogger(name)

setup_base_logger(level=logging.DEBUG)

Configure and return the root logger with colored output.

This function sets up the root logger with a StreamHandler that outputs to stdout and formats messages using the ColoredFormatter. If the root logger already has handlers configured, this function does nothing.

Parameters:

Name Type Description Default
level int | str

The logging level to set for the root logger. Default is logging.DEBUG.

DEBUG

Returns:

Name Type Description
Logger Logger

The configured root logger instance.

Source code in src/powerstation_simulator/logger.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def setup_base_logger(level: int | str = logging.DEBUG) -> Logger:
    """
    Configure and return the root logger with colored output.

    This function sets up the root logger with a StreamHandler that outputs
    to stdout and formats messages using the ColoredFormatter. If the root
    logger already has handlers configured, this function does nothing.

    Args:
        level:
            The logging level to set for the root logger.
            Default is logging.DEBUG.

    Returns:
        Logger: The configured root logger instance.
    """
    root_logger = logging.getLogger()
    if not root_logger.hasHandlers():
        root_logger.setLevel(level)
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(
            ColoredFormatter("%(asctime)s %(filename)s: %(levelname)s, %(message)s")
        )
        root_logger.addHandler(handler)
        root_logger.propagate = False
    return root_logger