Pausing and resuming a simulation in mosaik
In some scenarios, you may want to pause and resume a running simulation.
For instance, this feature can be useful when wanting to inspect intermediate results without terminating the simulation or when using graphical display tools (e.g. WebVis).
mosaik provides a simple mechanism to pause and resume a simulation using an Event
.
This guide will walk you through setting up a simulation that can be controlled via keyboard input.
Pressing p
will pause the simulation, and pressing r
will resume it.
Step 1: Setting up the simulation
To begin, we need to define a function to start our simulation.
We create a AsyncWorld
, which represents our simulation, and define the simulators we want to use.
We also use an Event
called world.running
, which we will later use to control pausing and resuming.
# Simulation set up
import asyncio
import queue
import threading
from functools import partial
from loguru import logger
from pynput import keyboard
import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.scenario import SimConfig
END = 10000
async def start_mosaik(
pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
END: int,
):
SIM_CONFIG: SimConfig = {
"OutputSim": {
"python": "mosaik.basic_simulators:OutputSimulator",
},
"InputSim": {
"python": "tests.simulators.generic_test_simulator:TestSim",
},
}
loop = asyncio.get_running_loop()
async with mosaik.AsyncWorld(SIM_CONFIG) as world:
pause_queue.put((world.running, loop))
Step 2: Creating and connecting simulators
Next, we create instances of the simulators. In this example:
OutputSim generates output values.
InputSim provides input data.
We also connect the simulators so that data flows between them.
# Simulator set up
output_dict = await world.start("OutputSim")
output_model = await output_dict.Dict.create(2)
input = await world.start("InputSim", step_size=1)
input_model_const = await input.A.create(1)
world.connect(
input_model_const[0],
output_model[0],
("val_out", "value"),
)
world.connect(input_model_const[0], output_model[0], "val_out_2")
world.connect(
input_model_const[0],
output_model[1],
("val_out", "value"),
)
await world.run(until=END)
Step 3: Handling keyboard input
To allow users to pause and resume the simulation interactively, we use the pynput library to listen for key presses.
Pressing
p
pauses the simulation by clearing the event.Pressing
r
resumes it by setting the event.
# Keyboard input
def on_press(key, event, loop):
try:
if key.char == "p":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.clear()) # Pause
logger.info("mosaik simulation is paused.")
elif key.char == "r":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.set()) # Resume
logger.info("mosaik simulation is resumed.")
except AttributeError:
# This handles cases where `key` does not have a `char`
# attribute, which happens for special keys like Shift or Ctrl.
# Instead of failing, the function just ignores these cases.
pass
Step 4: Running the simulation in a separate thread
Since we want user inputs to arrive on the main thread to be computed by the listener, we need to start mosaik in a separate thread. The asyncio event loop and the event itself are acquired from the mosaik thread by using a queue that was previously given to it during startup.
# Start keyboard listener and mosaik in different threads
def main():
comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
queue.Queue()
)
mosaik_thread = threading.Thread(
target=asyncio.run,
args=(start_mosaik(comm_queue, END),),
daemon=True,
)
mosaik_thread.start()
world_running, loop = comm_queue.get()
on_press_with_args = partial(on_press, event=world_running, loop=loop)
listener = keyboard.Listener(on_press=lambda key: on_press_with_args(key))
listener.start()
mosaik_thread.join()
listener.stop()
if __name__ == "__main__":
main()
Full code
# Simulation set up
import asyncio
import queue
import threading
from functools import partial
from loguru import logger
from pynput import keyboard
import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.scenario import SimConfig
END = 10000
async def start_mosaik(
pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
END: int,
):
SIM_CONFIG: SimConfig = {
"OutputSim": {
"python": "mosaik.basic_simulators:OutputSimulator",
},
"InputSim": {
"python": "tests.simulators.generic_test_simulator:TestSim",
},
}
loop = asyncio.get_running_loop()
async with mosaik.AsyncWorld(SIM_CONFIG) as world:
pause_queue.put((world.running, loop))
# End: Simulation set up
# Simulator set up
output_dict = await world.start("OutputSim")
output_model = await output_dict.Dict.create(2)
input = await world.start("InputSim", step_size=1)
input_model_const = await input.A.create(1)
world.connect(
input_model_const[0],
output_model[0],
("val_out", "value"),
)
world.connect(input_model_const[0], output_model[0], "val_out_2")
world.connect(
input_model_const[0],
output_model[1],
("val_out", "value"),
)
await world.run(until=END)
# End: Simulator set up
# Keyboard input
def on_press(key, event, loop):
try:
if key.char == "p":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.clear()) # Pause
logger.info("mosaik simulation is paused.")
elif key.char == "r":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.set()) # Resume
logger.info("mosaik simulation is resumed.")
except AttributeError:
# This handles cases where `key` does not have a `char`
# attribute, which happens for special keys like Shift or Ctrl.
# Instead of failing, the function just ignores these cases.
pass
# End: Keyboard input
# Start keyboard listener and mosaik in different threads
def main():
comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
queue.Queue()
)
mosaik_thread = threading.Thread(
target=asyncio.run,
args=(start_mosaik(comm_queue, END),),
daemon=True,
)
mosaik_thread.start()
world_running, loop = comm_queue.get()
on_press_with_args = partial(on_press, event=world_running, loop=loop)
listener = keyboard.Listener(on_press=lambda key: on_press_with_args(key))
listener.start()
mosaik_thread.join()
listener.stop()
if __name__ == "__main__":
main()
# End: Start keyboard listener and mosaik in different threads
Addendum: Pausing automatically at a certain simulation step
Instead of hitting the p
key to pause, we could also set a certain step number the simulation should stop at.
This could be used for debugging purposes or analysis of a certain intermediate state of the simulation.
To accomplish this, we can get the ProgressProxy object from one of the participating simulators
# get progress
progress = output_dict.get_progress()
and write a small method that clears the asyncio event, pausing the simulation.
# pause_sim_method
async def pause_sim(pause_step: int, world: AsyncWorld, progress: ProgressProxy):
await progress.has_reached(pause_step)
logger.info(
"The simulation has reached the pre-defined pause "
f"step {pause_step} and is now paused."
)
world.running.clear()
return
We can then set the pause_step variable to a valid step within our maximum simulation step range and it will pause when reaching that step.
If we use the keyboard listener from step 3, we can simply resume by pressing r
.
The full code for this example looks as follows.
import asyncio
import queue
import threading
from functools import partial
from loguru import logger
from pynput import keyboard
import mosaik
import mosaik.basic_simulators
import mosaik.util
from mosaik.async_scenario import AsyncModelFactory, AsyncWorld
from mosaik.progress import ProgressProxy
from mosaik.scenario import SimConfig
async def start_mosaik(
pause_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]],
):
SIM_CONFIG: SimConfig = {
"OutputSim": {
"python": "mosaik.basic_simulators:OutputSimulator",
},
"InputSim": {
"python": "tests.simulators.generic_test_simulator:TestSim",
},
}
END = 1000 # 1000 seconds
pause_step = 15
loop = asyncio.get_running_loop()
async with mosaik.AsyncWorld(SIM_CONFIG, cache=False) as world:
pause_queue.put((world.running, loop))
output_dict: AsyncModelFactory = await world.start("OutputSim")
output_model = await output_dict.Dict.create(2)
# get progress
progress = output_dict.get_progress()
# End: get progress
input = await world.start("InputSim", step_size=1)
input_model_const = await input.A.create(1)
world.connect(
input_model_const[0],
output_model[0],
("val_out", "value"),
)
world.connect(
input_model_const[0],
output_model[0],
"val_out_2",
)
world.connect(
input_model_const[0],
output_model[1],
("val_out", "value"),
)
await asyncio.gather(
world.run(until=END), pause_sim(pause_step, world, progress)
)
# pause_sim_method
async def pause_sim(pause_step: int, world: AsyncWorld, progress: ProgressProxy):
await progress.has_reached(pause_step)
logger.info(
"The simulation has reached the pre-defined pause "
f"step {pause_step} and is now paused."
)
world.running.clear()
return
# End: pause_sim_method
def on_press(key, event, loop):
try:
if key.char == "p":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.clear()) # Pause
logger.info("mosaik simulation is paused.")
elif key.char == "r":
if event and loop.is_running():
loop.call_soon_threadsafe(lambda: event.set()) # Resume
logger.info("mosaik simulation is resumed.")
except AttributeError:
# This handles cases where `key` does not have a `char`
# attribute, which happens for special keys like Shift or Ctrl.
# Instead of failing, the function just ignores these cases.
pass
def main():
comm_queue: queue.Queue[tuple[asyncio.Event, asyncio.AbstractEventLoop]] = (
queue.Queue()
)
mosaik_thread = threading.Thread(
target=asyncio.run,
args=(start_mosaik(comm_queue),),
daemon=False,
)
mosaik_thread.start()
(event, loop) = comm_queue.get()
on_press_with_args = partial(on_press, event=event, loop=loop)
listener = keyboard.Listener(on_press=on_press_with_args)
listener.start()
mosaik_thread.join()
listener.stop()
print("Stopping listener and mosaik thread...")
main()