Source code for datalogd.plugins.randomwalk_datasource
import asyncio
import logging
import random
from datalogd import DataSource
[docs]class RandomWalkDataSource(DataSource):
"""
Generate test or demonstration data using a random walk algorithm.
For each iteration of the algorithm, the output value will either be
unchanged, increase, or decrease by a fixed increment. The options are
chosen randomly with equal probability.
Multiple walkers can be initialised to produce several sources of random
data. The ``walkers`` parameter is a list, the length of which determines
the number of walkers to use. Each item in the list must be a list/tuple of
two items: the walker's initial value and increment.
:param seed: Seed used to initialise the random number generator.
:param interval: How often to run an iteration of the algorithm, in seconds.
:param walkers: List defining number of walkers and their parameters in the
form ``[[init, increment], ...]``.
"""
def __init__(self, sinks=[], seed=None, interval=1.0, walkers=[[0.0, 1.0], [0.0, 2.0]]):
super().__init__(sinks=sinks)
self.log = logging.getLogger("RandomWalkDataSource")
random.seed(seed)
self.interval = interval
self.walkers = []
for w in walkers:
try:
self.walkers.append([float(w[0]), float(w[1])])
except Exception as ex:
self.log.warning(f"Invalid parameters for walker: {w} {ex}")
# Queue first call of update routine
asyncio.get_event_loop().call_soon(self.generate_data)
[docs] def generate_data(self):
"""
Run one iteration of the random walk algorithm and send the value to any
connected sinks.
"""
loop = asyncio.get_event_loop()
# Update walker values and notify sinks
data = []
for w_i, w in enumerate(self.walkers):
w[0] += random.choice([-w[1], 0.0, w[1]])
data.append({"type": "randomwalk", "id": f"{w_i}", "value": w[0]})
self.send(data)
# Reschedule next update
loop.call_later(self.interval, self.generate_data)