Source code for datalogd.plugins.aggregator_datafilter

from datalogd import DataFilter, listify

[docs]class AggregatorDataFilter(DataFilter): """ Aggregates consecutively received data values into a list and passes the new array(s) on to sinks. The aggregated data can be sent at different intervals to that which it is received by using the ``send_every`` parameter. A value of 1 will send the aggregated data every time new data is received. A value of ``send_every`` equal to ``buffer_size`` will result in the data being passed on only once the buffer is filled. A typical usage example would be for data which is received once every second, using ``buffer_size=86400`` and ``send_every=60`` to store up to 24 hours of data, and update sinks every minute. :param buffer_size: Maximum length for lists of aggregated data. :param send_every: Send data to connected sinks every *n* updates. :param aggregate: List of data keys for which the values should be aggregated. """ def __init__(self, sinks=[], buffer_size=100, send_every=100, aggregate=["timestamp", "value"]): super().__init__(sinks=sinks) self.buffer_size = buffer_size self.aggregate = aggregate self.send_every = send_every if send_every > 1 else 1 self._send_count = 0 self._prev_data = []
[docs] def receive(self, data): """ Accept ``data``, aggregate selected values, and pass on aggregated data to any connected sinks. :param data: Data containing values to aggregate. """ data = listify(data) for d_i, d in enumerate(data): # Check the last entry in the buffer. If it has the same keys/values # (except for the "value" entry) then append value to previous one. try: if self._prev_data[d_i].keys() == d.keys(): # Keys matching for k, v in d.items(): if k not in self.aggregate and self._prev_data[d_i][k] != v: # A (non-aggregating) value doesn't match previous entry raise ValueError(f"Value of field {k} doesn't match previous entry.") # All (non-aggregating) values match previous entry for k, v in d.items(): if k in self.aggregate: # An aggregating field, append to last entry self._prev_data[d_i][k] = listify(self._prev_data[d_i][k]) self._prev_data[d_i][k].append(v) # Simple buffer size control, probably horribly inefficient compared to a circular buffer if len(self._prev_data[d_i][k]) > self.buffer_size: self._prev_data[d_i][k].pop(0) # All aggregating fields now appended to last entry continue else: print("keys don't match") except (IndexError, ValueError) as ex: pass # Either first entry, or keys or (non-aggregating) values don't match # Convert the aggregating values into lists for k, v in d.items(): if k in self.aggregate: d[k] = listify(v) self._prev_data.append(d) self._send_count += 1 if self._send_count >= self.send_every: self._send_count = 0 self.send(self._prev_data)