mosaik.simmanager — Management of external processes

The simulation manager is responsible for starting simulation processes and shutting them down. It also manages the communication between mosaik and the processes.

It is able to start pure Python simulators in-process (by importing and instantiating them), to start external simulation processes and to connect to already running simulators and manage access to them.

typeddict mosaik.simmanager.MosaikConfigTotal[source]

A total version for :cls:`MosaikConfig` for internal use.

Required Keys:
async mosaik.simmanager.start(world, sim_name, sim_id, time_resolution, sim_params)[source]

Start the simulator sim_name based on the configuration im world.sim_config, give it the ID sim_id and pass the time_resolution and the parameters of the dict sim_params to it.

The sim config is a dictionary with one entry for every simulator. The entry itself tells mosaik how to start the simulator:

{
    'ExampleSimA': {
        'python': 'example_sim.mosaik:ExampleSim',
    },
    'ExampleSimB': {
        'cmd': 'example_sim %(addr)s',
        'cwd': '.',
    },
    'ExampleSimC': {
        'connect': 'host:port',
    },
}

ExampleSimA is a pure Python simulator. Mosaik will import the module example_sim.mosaik and instantiate the class ExampleSim to start the simulator.

ExampleSimB would be started by executing the command example_sim and passing the network address of mosaik das command line argument. You can optionally specify a current working directory. It defaults to ..

ExampleSimC can not be started by mosaik, so mosaik tries to connect to it.

time_resolution (in seconds) is a global scenario parameter, which tells the simulators what the integer time step means in seconds. Its default value is 1., meaning one integer step corresponds to one second simulated time.

The function returns a mosaik_api_v3.Simulator instance.

It raises a SimulationError if the simulator could not be started.

Return a SimProxy instance.

Parameters:
  • world (World) –

  • sim_name (str) –

  • sim_id (SimId) –

  • time_resolution (float) –

  • sim_params (Dict[str, Any]) –

Return type:

Proxy

async mosaik.simmanager.start_inproc(mosaik_config, sim_name, sim_config, mosaik_remote)[source]

Import and instantiate the Python simulator sim_name based on its config entry sim_config.

Return a LocalProcess instance.

Raise a ScenarioError if the simulator cannot be instantiated.

Parameters:
Return type:

BaseProxy

async mosaik.simmanager.start_proc(mosaik_config, sim_name, sim_config, mosaik_remote)[source]

Start a new process for simulator sim_name based on its config entry sim_config.

Return a RemoteProcess instance.

Raise a ScenarioError if the simulator cannot be instantiated.

Parameters:
Return type:

BaseProxy

async mosaik.simmanager.start_connect(mosaik_config, sim_name, sim_config, mosaik_remote)[source]

Connect to the already running simulator sim_name based on its config entry sim_config.

Return a RemoteProcess instance.

Raise a ScenarioError if the simulator cannot be instantiated.

Parameters:
Return type:

BaseProxy

mosaik.simmanager.Port[source]

Pair of an entity ID and an attribute of that entity

alias of Tuple[str, str]

class mosaik.simmanager.SimRunner(sid, connection, depth=1)[source]

Handler for an external simulator.

It stores its simulation state and own the proxy object to the external simulator.

Parameters:
  • sid (SimId) –

  • connection (Proxy) –

  • depth (int) –

rt_start: float[source]

The real time when this simulator started (as returned by perf_counter().

output_time: TieredTime[source]

The output time associated with data. Usually, this will be equal to last_step but simulators may specify a different time for their output.

data: OutputData[source]

The newest data returned by this simulator.

sid: SimId[source]

This simulator’s ID.

last_step: TieredTime[source]

The most recent step this simulator performed.

next_steps: List[TieredTime][source]

The scheduled next steps this simulator will take, organized as a heap. Once the immediate next step has been chosen (and the has_next_step event has been triggered), the step is moved to next_step instead.

next_self_step: TieredTime | None[source]

The next self-scheduled step for this simulator.

progress: Progress[source]

This simulator’s progress in mosaik time.

This simulator has done all its work before time progress, so other simulator can rely on this simulator’s output until this time.

inputs_from_set_data: InputData[source]

Inputs received via set_data.

persistent_inputs: InputData[source]

Memory of previous inputs for persistent attributes.

timed_input_buffer: TimedInputBuffer[source]

Inputs for this simulator.

triggering_ancestors: Dict[SimRunner, TieredInterval][source]

An iterable of this sim’s ancestors that can trigger a step of this simulator. The second component specifies the least amount of time that output from the ancestor needs to reach us.

triggers: Dict[Port, List[Tuple[SimRunner, TieredInterval]]][source]

For each port of this simulator, the simulators that are triggered by output on that port and the delay accrued along that edge.

output_to_push: Dict[Port, List[Tuple[SimRunner, TieredInterval, Port]]][source]

This lists those connections that use the timed_input_buffer. The keys are the entity-attribute pairs of this simulator with the corresponding list of simulator-time-entity-attribute triples describing the destinations for that data and the time-shift occuring along the connection.

pulled_inputs: Dict[Tuple[SimRunner, TieredInterval], Set[Tuple[Port, Port]]][source]

Output to pull in whenever this simulator performs a step. The keys are the source SimRunner and the time shift, the values are the source and destination entity-attribute pairs.

task: asyncio.Task[None][source]

The asyncio.Task for this simulator.

input_delays: Dict[SimRunner, TieredInterval][source]

For each simulator that provides data to this simulator, the minimum over all input delays. This is used while waiting for dependencies.

schedule_step(tiered_time)[source]

Schedule a step for this simulator at the given time. This will trigger a re-evaluation whether this simulator’s next step is settled, provided that the new step is earlier than the old one and the simulator is currently awaiting it’s next settled step.

Parameters:

tiered_time (TieredTime) –

async stop()[source]

Stop the simulator behind the proxy.

class mosaik.simmanager.MosaikRemote(world, sid)[source]
Parameters:
  • world (World) –

  • sid (SimId) –

async get_progress()[source]

Return the current simulation progress from sim_progress.

Return type:

float

Return information about the related entities of entities.

If entities omitted (or None), return the complete entity graph, e.g.:

{
    'nodes': {
        'sid_0.eid_0': {'type': 'A'},
        'sid_0.eid_1': {'type': 'B'},
        'sid_1.eid_0': {'type': 'C'},
    },
    'edges': [
        ['sid_0.eid_0', 'sid_1.eid0', {}],
        ['sid_0.eid_1', 'sid_1.eid0', {}],
    ],
}

If entities is a single string (e.g., sid_1.eid_0), return a dict containing all entities related to that entity:

{
    'sid_0.eid_0': {'type': 'A'},
    'sid_0.eid_1': {'type': 'B'},
}

If entities is a list of entity IDs (e.g., ['sid_0.eid_0', 'sid_0.eid_1']), return a dict mapping each entity to a dict of related entities:

{
    'sid_0.eid_0': {
        'sid_1.eid_0': {'type': 'B'},
    },
    'sid_0.eid_1': {
        'sid_1.eid_1': {'type': 'B'},
    },
}
Parameters:

entities (str | List[str] | None) –

Return type:

Dict[str, Any] | Dict[str, Dict[str, Any]]

async get_data(attrs)[source]

Return the data for the requested attributes attrs.

attrs is a dict of (fully qualified) entity IDs mapping to lists of attribute names ({'sid/eid': ['attr1', 'attr2']}).

The return value is a dictionary, which maps the input entity IDs to data dictionaries, which in turn map attribute names to their respective values: ({'sid/eid': {'attr1': val1, 'attr2': val2}}).

Parameters:

attrs (Dict[str, List[str]]) –

Return type:

Dict[str, Any]

async set_data(data)[source]

Set data as input data for all affected simulators.

data is a dictionary mapping source entity IDs to destination entity IDs with dictionaries of attributes and values ({'src_full_id': {'dest_full_id': {'attr1': 'val1', 'attr2': 'val2'}}}).

async set_event(event_time)[source]

Schedules an event/step at simulation time event_time.

Parameters:

event_time (int) –

class mosaik.simmanager.StarterCollection[source]

This class provides a singleton instance of a collection of simulation starters. Default starters are: - python: start_inproc - cmd: start_proc - connect: start_connect

External packages may add additional methods of starting simulations by adding new elements:

from mosaik.simmanager import StarterCollection s = StarterCollection() s[‘my_starter’] = my_starter_func

Return type:

OrderedDict[str, Callable[…, Coroutine[Any, Any, BaseProxy]]]

class mosaik.simmanager.TimedInputBuffer[source]

A buffer to store inputs with its corresponding time.

When the data is queried for a specific step time, all entries with time <= step are added to the input_dictionary.

If there are several entries for the same connection at the same time, only the most recent value is added.