mosaik.simmanager — Management of external processes
The simulation manager is responsible for starting simulation processes and shutting them down. It also manages the communication between mosaik and the processes.
It is able to start pure Python simulators in-process (by importing and instantiating them), to start external simulation processes and to connect to already running simulators and manage access to them.
- async start(world: AsyncWorld, sim_name: str, sim_id: SimId, time_resolution: float, sim_params: Dict[str, Any]) Proxy[source]
Start the simulator
sim_namebased on the configuration inworld.sim_config, give it the IDsim_idand pass thetime_resolutionand the parameter dictsim_paramsto it.- Parameters:
world (AsyncWorld) – the
AsyncWorldfor the simulationsim_name (str) – the name of the simulator in
world.sim_configsim_id (SimId) – the ID of this simulator instance
time_resolution (float) – the number of seconds corresponding to one mosaik time steps
sim_params (Dict[str, Any]) – the additional keyword arguments given by the user when starting this simulator
- Returns:
the
Proxyrepresenting the connection to this simulator- Return type:
Proxy
- async start_inproc(mosaik_config: MosaikConfigTotal, sim_name: str, sim_config: PythonModel, mosaik_remote: MosaikRemote) BaseProxy[source]
Import and instantiate the Python simulator
sim_namebased on its config entrysim_config.- Parameters:
mosaik_config (MosaikConfigTotal) – the configuration of mosaik itself
sim_name (str) – the name of the simulator to be started
sim_config (PythonModel) – the configuration of this specific simulator
mosaik_remote (MosaikRemote) – the object used by this simulator to make callbacks to mosaik
- Returns:
the
LocalProxyfor this simulator- Raises:
ScenarioError – if the simulator cannot be instantiated.
- Return type:
BaseProxy
- async start_proc(mosaik_config: MosaikConfigTotal, sim_name: str, sim_config: CmdModel, mosaik_remote: MosaikRemote) BaseProxy[source]
Start a new process for simulator sim_name based on its config entry sim_config.
Return a
RemoteProcessinstance.Raise a
ScenarioErrorif the simulator cannot be instantiated.- Parameters:
mosaik_config (MosaikConfigTotal)
sim_name (str)
sim_config (CmdModel)
mosaik_remote (MosaikRemote)
- Return type:
BaseProxy
- async start_connect(mosaik_config: MosaikConfigTotal, sim_name: str, sim_config: ConnectModel, mosaik_remote: MosaikRemote) BaseProxy[source]
Connect to the already running simulator sim_name based on its config entry sim_config.
Return a
RemoteProcessinstance.Raise a
ScenarioErrorif the simulator cannot be instantiated.- Parameters:
mosaik_config (MosaikConfigTotal)
sim_name (str)
sim_config (ConnectModel)
mosaik_remote (MosaikRemote)
- Return type:
BaseProxy
- class PushDescription(dest_sim: SimRunner, delay: TieredDuration, dest_port: Tuple[str, str], transform: Callable[[...], Any])[source]
Describes a connection for pushing data from one simulator to another.
- Parameters:
dest_sim (SimRunner) – The
SimRunnerinstance representing the simulator receiving the data.delay (TieredDuration) – The TieredDuration representing the time shift (or delay) applied to the data during transmission along the connection.
dest_port (Tuple[str, str]) – The Port representing the entity-attribute pair for the destination in the target simulator.
transform (Callable[[...], Any]) – A callable function applied to the data as it is pushed to the destination.
- class PullDescription(src_port: Tuple[str, str], dest_port: Tuple[str, str], transform: Callable[[...], Any])[source]
Describes a connection for pulling data into a simulator.
- Parameters:
src_port (Tuple[str, str]) – The Port representing the entity-attribute pair from which data is pulled in the connected simulator.
dest_port (Tuple[str, str]) – The Port representing the entity-attribute pair that is the destination for the pulled data.
transform (Callable[[...], Any]) – A callable function applied to the data as it is forwarded to its destination.
- class SimRunner(sid: str, connection: Proxy, check_outputs: Callable[[Dict[str, Dict[str, Any]]], None], depth: int = 1)[source]
Handler for an external simulator.
It stores its simulation state and own the proxy object to the external simulator.
- Parameters:
sid (SimId)
connection (Proxy)
check_outputs (Callable[[OutputData], None])
depth (int)
- output_time: TieredTime[source]
The output time associated with data. Usually, this will be equal to last_step but simulators may specify a different time for their output.
- next_steps: List[TieredTime][source]
The scheduled next steps this simulator will take, organized as a heap. Once the immediate next step has been chosen (and the
has_next_stepevent has been triggered), the step is moved tonext_stepinstead.
- progress: Progress[source]
This simulator’s progress in mosaik time.
This simulator has done all its work before time
progress, so other simulator can rely on this simulator’s output until this time.
- timed_input_buffer: TimedInputBuffer[source]
Inputs for this simulator.
- successors_to_wait_for: Dict[SimRunner, TieredDuration][source]
The immediate successors that we always need to wait for (due to async requests.) The duration only serves as an adapter (so we don’t need MinimalDurations here.)
- successors: Dict[SimRunner, TieredDuration][source]
The immediate successors of this simulator. This is used when lazy stepping to ensure that we don’t step ahead too far. Therefore, the duration is only used as an adapter, and will always have all tiers 0. (Thus, we don’t need MinimalDurations here.)
- triggering_ancestors: Dict[SimRunner, MinimalDurations][source]
An iterable of this sim’s ancestors that can trigger a step of this simulator. The second component specifies the least amount of time that output from the ancestor needs to reach us.
- triggers: Dict[Port, List[Tuple[SimRunner, TieredDuration]]][source]
For each port of this simulator, the simulators that are triggered by output on that port and the delay accrued along that edge.
- output_to_push: Dict[Port, List[PushDescription]][source]
This lists those connections that use the timed_input_buffer. The keys are the entity-attribute pairs (Port) of this simulator, and the values are lists of
PushDescriptionobjects. Each PushDescription specifies the destination simulator, the entity-attribute pair for the target, and the time shift occurring along the connection.
- pulled_inputs: Dict[Tuple[SimRunner, TieredDuration], List[PullDescription]][source]
Output to pull in whenever this simulator performs a step. The keys are the source
SimRunnerand the time shift, the values are lists ofPullDescriptionobjects. EachPullDescriptionspecifies the source and destination entity-attribute pairs along with an optional transformation function applied to the data.
- task: asyncio.Task[None][source]
The asyncio.Task for this simulator.
- input_delays: Dict[SimRunner, MinimalDurations][source]
For each simulator that provides data to this simulator, the minimum over all input delays. This is used while waiting for dependencies.
- schedule_step(tiered_time: TieredTime)[source]
Schedule a step for this simulator at the given time. This will trigger a re-evaluation whether this simulator’s next step is settled, provided that the new step is earlier than the old one and the simulator is currently awaiting it’s next settled step.
- Parameters:
tiered_time (TieredTime)
- class MosaikRemote(world: AsyncWorld, sid: SimId)[source]
- Parameters:
world (AsyncWorld)
sid (SimId)
- async get_progress() float[source]
Return the current simulation progress from
sim_progress.- Return type:
Return information about the related entities of entities.
If entities omitted (or
None), return the complete entity graph, e.g.:{ 'nodes': { 'sid_0.eid_0': {'type': 'A'}, 'sid_0.eid_1': {'type': 'B'}, 'sid_1.eid_0': {'type': 'C'}, }, 'edges': [ ['sid_0.eid_0', 'sid_1.eid0', {}], ['sid_0.eid_1', 'sid_1.eid0', {}], ], }
If entities is a single string (e.g.,
sid_1.eid_0), return a dict containing all entities related to that entity:{ 'sid_0.eid_0': {'type': 'A'}, 'sid_0.eid_1': {'type': 'B'}, }
If entities is a list of entity IDs (e.g.,
['sid_0.eid_0', 'sid_0.eid_1']), return a dict mapping each entity to a dict of related entities:{ 'sid_0.eid_0': { 'sid_1.eid_0': {'type': 'B'}, }, 'sid_0.eid_1': { 'sid_1.eid_1': {'type': 'B'}, }, }
- async get_data(attrs: Dict[str, List[str]]) Dict[str, Any][source]
Warning
This method is deprecated and will be removed in a future release. Implement cyclic data flow using time-shifted and weak connections instead.
Return the data for the requested attributes attrs.
attrs is a dict of (fully qualified) entity IDs mapping to lists of attribute names (
{'sid/eid': ['attr1', 'attr2']}).The return value is a dictionary, which maps the input entity IDs to data dictionaries, which in turn map attribute names to their respective values: (
{'sid/eid': {'attr1': val1, 'attr2': val2}}).
- async set_data(data: Dict[str, Dict[str, Any]])[source]
Warning
This method is deprecated and will be removed in a future release. Implement cyclic data flow using time-shifted and weak connections instead.
Set data as input data for all affected simulators.
data is a dictionary mapping source entity IDs to destination entity IDs with dictionaries of attributes and values (
{'src_full_id': {'dest_full_id': {'attr1': 'val1', 'attr2': 'val2'}}}).
- class StarterCollection[source]
This class provides a singleton instance of a collection of simulation starters. Default starters are: - python: start_inproc - cmd: start_proc - connect: start_connect
External packages may add additional methods of starting simulations by adding new elements:
from mosaik.simmanager import StarterCollection s = StarterCollection() s['my_starter'] = my_starter_func
- Return type:
OrderedDict[str, Callable[…, Coroutine[Any, Any, BaseProxy]]]
- class TimedInputBuffer[source]
A buffer to store inputs with its corresponding time.
When the data is queried for a specific step time, all entries with time <= step are added to the input_dictionary.
If there are several entries for the same connection at the same time, only the most recent value is added.