Pausing and resuming a simulation in mosaik

In some scenarios, you may want to pause and resume a running simulation. For instance, this feature can be useful when wanting to inspect intermediate results without terminating the simulation or when using graphical display tools (e.g. WebVis). mosaik provides a simple mechanism to pause and resume a simulation using an Event.

This guide will walk you through setting up a simulation that can be controlled via keyboard input. Pressing p will pause the simulation, and pressing r will resume it.

Step 1: Setting up the simulation

To begin, we need to define a function to start our simulation. We create a AsyncWorld, which represents our simulation, and define the simulators we want to use. We also use an Event called world.running, which we will later use to control pausing and resuming.

# Simulation set up

import asyncio
import queue
import threading
from functools import partial

from loguru import logger
from pynput import keyboard

import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.scenario import SimConfig

END = 10000


async def start_mosaik(
    pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
    END: int,
):
    SIM_CONFIG: SimConfig = {
        "OutputSim": {
            "python": "mosaik.basic_simulators:OutputSimulator",
        },
        "InputSim": {
            "python": "tests.simulators.generic_test_simulator:TestSim",
        },
    }

    loop = asyncio.get_running_loop()
    async with mosaik.AsyncWorld(SIM_CONFIG) as world:
        pause_queue.put((world.running, loop))

Step 2: Creating and connecting simulators

Next, we create instances of the simulators. In this example:

  • OutputSim generates output values.

  • InputSim provides input data.

We also connect the simulators so that data flows between them.

        # Simulator set up
        output_dict = await world.start("OutputSim")
        output_model = await output_dict.Dict.create(2)

        input = await world.start("InputSim", step_size=1)
        input_model_const = await input.A.create(1)

        world.connect(
            input_model_const[0],
            output_model[0],
            ("val_out", "value"),
        )

        world.connect(input_model_const[0], output_model[0], "val_out_2")

        world.connect(
            input_model_const[0],
            output_model[1],
            ("val_out", "value"),
        )

        await world.run(until=END)

Step 3: Handling keyboard input

To allow users to pause and resume the simulation interactively, we use the pynput library to listen for key presses.

  • Pressing p pauses the simulation by clearing the event.

  • Pressing r resumes it by setting the event.

# Keyboard input
def on_press(key, event, loop):
    try:
        if key.char == "p":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.clear())  # Pause
                logger.info("mosaik simulation is paused.")
        elif key.char == "r":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.set())  # Resume
                logger.info("mosaik simulation is resumed.")
    except AttributeError:
        # This handles cases where `key` does not have a `char`
        # attribute, which happens for special keys like Shift or Ctrl.
        # Instead of failing, the function just ignores these cases.
        pass


Step 4: Running the simulation in a separate thread

Since we want user inputs to arrive on the main thread to be computed by the listener, we need to start mosaik in a separate thread. The asyncio event loop and the event itself are acquired from the mosaik thread by using a queue that was previously given to it during startup.

# Start keyboard listener and mosaik in different threads
def main():
    comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
        queue.Queue()
    )

    mosaik_thread = threading.Thread(
        target=asyncio.run,
        args=(start_mosaik(comm_queue, END),),
        daemon=True,
    )
    mosaik_thread.start()

    world_running, loop = comm_queue.get()

    on_press_with_args = partial(on_press, event=world_running, loop=loop)
    listener = keyboard.Listener(on_press=lambda key: on_press_with_args(key))
    listener.start()
    mosaik_thread.join()

    listener.stop()


if __name__ == "__main__":
    main()

Full code

# Simulation set up

import asyncio
import queue
import threading
from functools import partial

from loguru import logger
from pynput import keyboard

import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.scenario import SimConfig

END = 10000


async def start_mosaik(
    pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
    END: int,
):
    SIM_CONFIG: SimConfig = {
        "OutputSim": {
            "python": "mosaik.basic_simulators:OutputSimulator",
        },
        "InputSim": {
            "python": "tests.simulators.generic_test_simulator:TestSim",
        },
    }

    loop = asyncio.get_running_loop()
    async with mosaik.AsyncWorld(SIM_CONFIG) as world:
        pause_queue.put((world.running, loop))
        # End: Simulation set up

        # Simulator set up
        output_dict = await world.start("OutputSim")
        output_model = await output_dict.Dict.create(2)

        input = await world.start("InputSim", step_size=1)
        input_model_const = await input.A.create(1)

        world.connect(
            input_model_const[0],
            output_model[0],
            ("val_out", "value"),
        )

        world.connect(input_model_const[0], output_model[0], "val_out_2")

        world.connect(
            input_model_const[0],
            output_model[1],
            ("val_out", "value"),
        )

        await world.run(until=END)
        # End: Simulator set up


# Keyboard input
def on_press(key, event, loop):
    try:
        if key.char == "p":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.clear())  # Pause
                logger.info("mosaik simulation is paused.")
        elif key.char == "r":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.set())  # Resume
                logger.info("mosaik simulation is resumed.")
    except AttributeError:
        # This handles cases where `key` does not have a `char`
        # attribute, which happens for special keys like Shift or Ctrl.
        # Instead of failing, the function just ignores these cases.
        pass


# End: Keyboard input


# Start keyboard listener and mosaik in different threads
def main():
    comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
        queue.Queue()
    )

    mosaik_thread = threading.Thread(
        target=asyncio.run,
        args=(start_mosaik(comm_queue, END),),
        daemon=True,
    )
    mosaik_thread.start()

    world_running, loop = comm_queue.get()

    on_press_with_args = partial(on_press, event=world_running, loop=loop)
    listener = keyboard.Listener(on_press=lambda key: on_press_with_args(key))
    listener.start()
    mosaik_thread.join()

    listener.stop()


if __name__ == "__main__":
    main()
# End: Start keyboard listener and mosaik in different threads

Addendum: Pausing automatically at a certain simulation step

Instead of hitting the p key to pause, we could also set a certain step number the simulation should stop at. This could be used for debugging purposes or analysis of a certain intermediate state of the simulation.

To accomplish this, we can get the ProgressProxy object from one of the participating simulators

        # get progress
        progress = output_dict.get_progress()

and write a small method that clears the asyncio event, pausing the simulation.

# pause_sim_method
async def pause_sim(pause_step: int, world: AsyncWorld, progress: ProgressProxy):
    await progress.has_reached(pause_step)
    logger.info(
        "The simulation has reached the pre-defined pause "
        f"step {pause_step} and is now paused."
    )
    world.running.clear()
    return


We can then set the pause_step variable to a valid step within our maximum simulation step range and it will pause when reaching that step. If we use the keyboard listener from step 3, we can simply resume by pressing r.

The full code for this example looks as follows.

import asyncio
import queue
import threading
from functools import partial

from loguru import logger
from pynput import keyboard

import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.async_scenario import AsyncModelFactory, AsyncWorld
from mosaik.progress import ProgressProxy
from mosaik.scenario import SimConfig


async def start_mosaik(
    pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
):
    SIM_CONFIG: SimConfig = {
        "OutputSim": {
            "python": "mosaik.basic_simulators:OutputSimulator",
        },
        "InputSim": {
            "python": "tests.simulators.generic_test_simulator:TestSim",
        },
    }

    END = 1000  # 1000 seconds
    pause_step = 15
    loop = asyncio.get_running_loop()
    async with mosaik.AsyncWorld(SIM_CONFIG, cache=False) as world:
        pause_queue.put((world.running, loop))
        output_dict: AsyncModelFactory = await world.start("OutputSim")
        output_model = await output_dict.Dict.create(2)
        # get progress
        progress = output_dict.get_progress()
        # End: get progress
        input = await world.start("InputSim", step_size=1)
        input_model_const = await input.A.create(1)

        world.connect(
            input_model_const[0],
            output_model[0],
            ("val_out", "value"),
        )

        world.connect(
            input_model_const[0],
            output_model[0],
            "val_out_2",
        )

        world.connect(
            input_model_const[0],
            output_model[1],
            ("val_out", "value"),
        )

        await asyncio.gather(
            world.run(until=END), pause_sim(pause_step, world, progress)
        )


# pause_sim_method
async def pause_sim(pause_step: int, world: AsyncWorld, progress: ProgressProxy):
    await progress.has_reached(pause_step)
    logger.info(
        "The simulation has reached the pre-defined pause "
        f"step {pause_step} and is now paused."
    )
    world.running.clear()
    return


# End: pause_sim_method


def on_press(key, event, loop):
    try:
        if key.char == "p":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.clear())  # Pause
                logger.info("mosaik simulation is paused.")
        elif key.char == "r":
            if event and loop.is_running():
                loop.call_soon_threadsafe(lambda: event.set())  # Resume
                logger.info("mosaik simulation is resumed.")
    except AttributeError:
        # This handles cases where `key` does not have a `char`
        # attribute, which happens for special keys like Shift or Ctrl.
        # Instead of failing, the function just ignores these cases.
        pass


def main():
    comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
        queue.Queue()
    )

    mosaik_thread = threading.Thread(
        target=asyncio.run,
        args=(start_mosaik(comm_queue),),
        daemon=False,
    )
    mosaik_thread.start()

    (event, loop) = comm_queue.get()

    on_press_with_args = partial(on_press, event=event, loop=loop)

    listener = keyboard.Listener(on_press=on_press_with_args)
    listener.start()
    mosaik_thread.join()
    listener.stop()

    print("Stopping listener and mosaik thread...")


main()