mosaik.async_scenario — Using mosaik in an async context

This module provides the async interface for users to create simulation scenarios for mosaik. For a version that handles the asynchronicity itself (but cannot be used within an existing event loop), see mosaik.scenario.

The AsyncWorld holds all necessary data for the simulation and allows the user to start simulators. It provides a AsyncModelFactory (and a AsyncModelMock) via which the user can instantiate model instances (entities). The method AsyncWorld.run finally starts the simulation.

typeddict MosaikConfig[source]

typing.TypedDict.

Optional Keys:
typeddict ModelOptionals[source]

typing.TypedDict.

Optional Keys:
  • env (Dict[str, str]) – The environment variables to set for this simulator.

  • cwd (str) – The current working directory for this simulator.

  • api_version (str) – The API version of the connected simulator. Set this to suppress warnings about this simulator being outdated.

  • posix (bool) – Whether to split the given shell command using POSIX rules. (Default: False on Windows, True otherwise.)

  • auto_terminate (bool) – Whether to terminate this process automatically at the end of the simulation. (Default true.)

typeddict PythonModel[source]

typing.TypedDict.

Required Keys:
  • python (str) – The Simulator subclass for this simulator, encoded as a string module_name:ClassName.

Optional Keys:
  • env (Dict[str, str]) – The environment variables to set for this simulator.

  • cwd (str) – The current working directory for this simulator.

  • api_version (str) – The API version of the connected simulator. Set this to suppress warnings about this simulator being outdated.

  • posix (bool) – Whether to split the given shell command using POSIX rules. (Default: False on Windows, True otherwise.)

  • auto_terminate (bool) – Whether to terminate this process automatically at the end of the simulation. (Default true.)

typeddict ConnectModel[source]

typing.TypedDict.

Required Keys:
  • connect (str) – The host:port address for this simulator.

Optional Keys:
  • env (Dict[str, str]) – The environment variables to set for this simulator.

  • cwd (str) – The current working directory for this simulator.

  • api_version (str) – The API version of the connected simulator. Set this to suppress warnings about this simulator being outdated.

  • posix (bool) – Whether to split the given shell command using POSIX rules. (Default: False on Windows, True otherwise.)

  • auto_terminate (bool) – Whether to terminate this process automatically at the end of the simulation. (Default true.)

typeddict CmdModel[source]

typing.TypedDict.

Required Keys:
  • cmd (str) – The command to start this simulator. String %(python)s will be replaced by the python command used to start this scenario, %(addr)s will be replaced by the host:port combination to which the simulator should connect.

Optional Keys:
  • env (Dict[str, str]) – The environment variables to set for this simulator.

  • cwd (str) – The current working directory for this simulator.

  • api_version (str) – The API version of the connected simulator. Set this to suppress warnings about this simulator being outdated.

  • posix (bool) – Whether to split the given shell command using POSIX rules. (Default: False on Windows, True otherwise.)

  • auto_terminate (bool) – Whether to terminate this process automatically at the end of the simulation. (Default true.)

SimConfig[source]

Description of all the simulators you intend to use in your simulation.

alias of Dict[str, PythonModel | ConnectModel | CmdModel]

class SimGroup(parent: 'SimGroup | None', name: 'str | None')[source]
Parameters:
connect_interval(src_group: SimGroup, dest_group: SimGroup, time_shifted: int = 0, weak: int = 0)[source]

Given two SimGroup`s, calculate a TieredInterval connecting simulators in these groups. The tiers will be 0, unless `time_shifted or weak are specified, in which case the given values are placed in the highest and lowest tier, respectively.

Parameters:
class AsyncWorld(sim_config: mosaik.async_scenario.SimConfig, mosaik_config: MosaikConfig | None = None, time_resolution: float = 1.0, debug: bool = False, cache: bool = True, max_loop_iterations: int = 100, configure_logging: bool = False, skip_greetings: bool = True)[source]

The world holds all data required to specify and run the scenario.

It provides a method to start a simulator process (start) and manages the simulator instances.

You have to provide a sim_config which tells the world which simulators are available and how to start them. This will be stored as sim_config; see there for details.

mosaik_config can be a dict or list of key-value pairs to set additional parameters overriding the defaults:

{
    'addr': ('127.0.0.1', 5555),
    'start_timeout': 2,  # seconds
    'stop_timeout': 2,   # seconds
}

Here, addr is the network address that mosaik will bind its socket to. start_timeout and stop_timeout specifiy a timeout (in seconds) for starting/stopping external simulator processes.

If execution_graph is set to True, an execution graph will be created during the simulation. This may be useful for debugging and testing. Note that this increases the memory consumption and simulation time.

We recommend that you use AsyncWorld in an async with block like so:

async with AsyncWorld(SIM_CONFIG) as world:
    # call setup methods on world
    ...
    await world.run(UNTIL)

This will ensure that the connections to all remote simulators are properly closed at the end of the simulation. Alternatively, you can use the shutdown method manually.

Parameters:
  • sim_config (SimConfig)

  • mosaik_config (Optional[MosaikConfig])

  • time_resolution (float)

  • debug (bool)

  • cache (bool)

  • max_loop_iterations (int)

  • configure_logging (bool)

  • skip_greetings (bool)

until: int[source]

The time until which this simulation will run.

rt_factor: float | None[source]

The number of real-time seconds corresponding to one mosaik step.

tqdm: tqdm[NoReturn][source]

The tqdm progress bar for the total progress.

sim_config: SimConfig[source]

The config dictionary that tells mosaik how to start a simulator.

The sim config is a dictionary with one entry for every simulator. The entry itself tells mosaik how to start the simulator:

{
    'ExampleSimA': {
        'python': 'example_sim.mosaik:ExampleSim',
    },
    'ExampleSimB': {
        'cmd': 'example_sim %(addr)s',
        'cwd': '.',
    },
    'ExampleSimC': {
        'connect': 'host:port',
    },
}

ExampleSimA is a pure Python simulator. Mosaik will import the module example_sim.mosaik and instantiate the class ExampleSim to start the simulator.

ExampleSimB would be started by executing the command example_sim and passing the network address of mosaik das command line argument. You can optionally specify a current working directory. It defaults to ..

ExampleSimC can not be started by mosaik, so mosaik tries to connect to it.

config: MosaikConfigTotal[source]

The config dictionary for general mosaik settings.

sims: Dict[SimId, simmanager.SimRunner][source]

A dictionary of already started simulators instances.

time_resolution: float[source]

The number of seconds that correspond to one mosaik time step in this situation. The default value is 1.0, meaning that one integer step corresponds to one second simulated time.

max_loop_iterations: int[source]

The number of iterations allowed for same-time loops within one time step. This is checked to prevent accidental infinite loops. Increase this value if your same-time loops require many iterations to converge.

entity_graph: networkx.Graph[FullId][source]

The graph of related entities. Nodes are (sid, eid) tuples. Each note has an attribute entity with an Entity.

sim_progress: float[source]

The progress of the entire simulation (in percent).

async start(sim_name: str, sim_id: str | None = None, **sim_params: Any) AsyncModelFactory[source]

Start the simulator named sim_name and return a ModelFactory for it.

Parameters:
  • sim_name (str)

  • sim_id (str | None)

  • sim_params (Any)

Return type:

AsyncModelFactory

connect(src: ~mosaik.async_scenario.Entity, dest: ~mosaik.async_scenario.Entity, *attr_pairs: str | ~typing.Tuple[str, str], async_requests: bool = False, time_shifted: bool | int = False, initial_data: ~typing.Dict[str, ~typing.Any] = {}, weak: bool = False, transform: ~typing.Callable[[~typing.Any], ~typing.Any] = <function AsyncWorld.<lambda>>)[source]

Connect the src entity to dest entity.

Establish a data-flow for each (src_attr, dest_attr) tuple in attr_pairs. If src_attr and dest_attr have the same name, you you can optionally only pass one of them as a single string.

Raise a ScenarioError if both entities share the same simulator instance, if at least one (src. or dest.) attribute in attr_pairs does not exist, or if the connection would introduce a cycle in the data-flow (e.g., A → B → C → A).

If the dest simulator may make asynchronous requests to mosaik to query data from src (or set data to it), async_requests should be set to True so that the src simulator stays in sync with dest.

An alternative to asynchronous requests are time-shifted connections. Their data flow is always resolved after normal connections so that cycles in the data-flow can be realized without introducing deadlocks. For such a connection time_shifted should be set to True and initial_data should contain a dict with input data for the first simulation step of the receiving simulator.

An alternative to using async_requests to realize cyclic data-flow is given by the time_shifted kwarg. If set to True it marks the connection as cycle-closing (e.g. C → A). It must always be used with initial_data specifying a dict with the data sent to the destination simulator at the first step (e.g. {‘src_attr’: value}).

Parameters:
set_initial_event(sid: str, time: int = 0)[source]

Set an initial step for simulator sid at time time (default=0).

Parameters:
async get_data(entity_set: Iterable[Entity], *attributes: str) Dict[Entity, Dict[str, Any]][source]

Get and return the values of all attributes for each entity of an entity_set.

The return value is a dict mapping the entities of entity_set to dicts containing the values of each attribute in attributes:

{
    Entity(...): {
        'attr_1': 'val_1',
        'attr_2': 'val_2',
        ...
    },
    ...
}
Parameters:
Return type:

Dict[Entity, Dict[str, Any]]

async run(until: int, rt_factor: float | None = None, rt_strict: bool = False, print_progress: bool | Literal['individual'] = False, lazy_stepping: bool = True)[source]

Start the simulation and run it until the simulation time until is reached.

As mosaik has no way of resetting simulators to their initial state, this method can only be called once.

Unlike its counterpart run in the synchronous World, this method will not automatically close the connection to connected simulators, even outside of a with block. You need to call shutdown manually afterwards (or use a with block).

Parameters:
  • until (int) – The end of the simulation in mosaik time steps (exclusive).

  • rt_factor (float | None) – The real-time factor. If set to a number > 0, the simulation will run in real-time mode. A real-time factor of 1. means that 1 second in simulated time takes 1 second in real time. An real-time factor of 0.5 will let the simulation run twice as fast as real time. For correct behavior of the real-time factor, the time resolution of the scenario has to be set adequately (the default is 1 second).

  • rt_strict (bool) – If the simulators are too slow for the real-time factor you chose, mosaik will only print a warning by default. In order to raise a RuntimeError instead, you can set rt_strict to True.

  • print_progress (bool | Literal['individual']) –

    Whether progress bars are printed while the simulation is running. The default is to print one bar representing the global progress of the simulation. You can also set the value to 'individual' to get one bar per simulator in your simulation (in addition to the global one). A value of False turns off the progress bars completely.

    The progress bars use tqdm; see their documentation on how to write to the console without interfering with the bars.

  • lazy_stepping (bool) – Whether to prevent simulators from running ahead of their successors by more than one step. If False a simulator always steps as long all input is provided. This might decrease the simulation time but increase the memory consumption.

cache_triggering_ancestors()[source]

Collects the ancestors of each simulator and stores them in the respective simulator object.

ensure_no_dataflow_cycles()[source]

Make sure that there is no cyclic dataflow with 0 total delay. Raise an exception with one such cycle otherwise.

async shutdown()[source]

Shut-down all simulators and close the server socket.

typeddict MinPath[source]

typing.TypedDict.

Required Keys:
class AsyncModelFactory(world: AsyncWorld, group: SimGroup, sid: str, proxy: Proxy)[source]

This is a facade for a simulator sim that allows the user to create new model instances (entities) within that simulator.

For every model that a simulator publicly exposes, the AsyncModelFactory provides a AsyncModelMock attribute that actually creates the entities.

If you access an attribute that is not a model or if the model is not marked as public, an ScenarioError is raised.

Parameters:
validate_output_dict(eid_dict: Dict[str, Dict[str, Any]])[source]

Validate the structure and contents of an output dictionary.

This method checks that the entities and attributes in the output dictionary are valid based on the simulator’s internal state. If any entity in the output dictionary was not created during initialization, or if any attribute is not part of the entity’s model’s output attributes, a warning is issued to notify of potential errors in the simulator’s get_data method.

Parameters:

eid_dict (Dict[str, Dict[str, Any]]) – A dictionary of output data, where keys are entity IDs and values are dictionaries of attributes and their corresponding data.

Raises:

UserWarning – If an entity ID in the output dictionary does not correspond to a known entity, or if an attribute is not part of the entity’s model’s defined output attributes.

parse_attrs(model_desc: ModelDescription, type: Literal['time-based', 'event-based', 'hybrid']) Tuple[FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str]][source]

Parse the attrs and their trigger/persistent state.

The guiding principle is this: The user can specify as little information as possible and the rest will be inferred, but inconsistent information will lead to an error.

If attrs, trigger and non-trigger are all given, trigger and non-trigger must form a partition of attrs. If only two are given, the third in inferred, provided this can be done in such a way that trigger and non-trigger form a partition of attrs. If any_inputs=True, the set of all possible attrs is used instead of the ones specified in attrs. If only attrs is given, a default is chosen for the others, based on the type of the simulator.

The same applieds to attrs, persistent and non-persistent, except that any_inputs is not considered (as these are outputs).

Parameters:
  • model_desc (ModelDescription) – The ModelDescription to parse

  • type (Literal['time-based', 'event-based', 'hybrid']) – The simulator’s type (for setting default attribute types).

Returns:

A four-tuple of InOrOutSet, giving the measurement inputs, event inputs, measurement outputs, and event outputs.

Raises:

ValueError – if the information is insufficient or inconsistent

Return type:

Tuple[FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str], FrozenSet[str] | OutSet[str]]

class AsyncModelMock(world: AsyncWorld, factory: AsyncModelFactory, model: str, proxy: Proxy)[source]

Instances of this class are exposed as attributes of ModelFactory and allow the instantiation of simulator models.

You can call an instance of this class to create exactly one entity: sim.ModelName(x=23). Alternatively, you can use the create method to create multiple entities with the same set of parameters at once: sim.ModelName.create(3, x=23).

Parameters:
async create(num: int, **model_params: Any)[source]

Create num entities with the specified model_params and return a list with the entity dicts.

The returned list of entities is the same as returned by mosaik_api_v3.Simulator.create, but the simulator is prepended to every entity ID to make them globally unique.

Parameters:
  • num (int)

  • model_params (Any)

class Entity(sid: str, eid: str, sim_name: str, model_mock: AsyncModelMock, children: Iterable[Entity] | None, extra_info: Any = None)[source]

An entity represents an instance of a simulation model within mosaik.

Parameters:
sid: str[source]

The ID of the simulator this entity belongs to.

eid: str[source]

The entity’s ID.

sim_name: str[source]

The entity’s simulator name.

model_mock: AsyncModelMock[source]

The entity’s type (or class).

children: List[Entity][source]

An entity set containing subordinate entities.

property full_id: str[source]

Full, globally unique entity id sid.eid.