Source code for datalogd.plugins.sinktimer_datafilter

from time import perf_counter
import logging

from datalogd import DataFilter, listify

[docs] class SinkTimerDataFilter(DataFilter): """ Time how long each sink takes to receive data and warn if any exceed a given threshold. This filter may be used for testing and debugging. If a sink takes longer than a given time to process data, report the event using the python logging system. Two levels of reporting are enabled, controlled by the ``info`` and ``warning`` parameters. These parameters are the threshold times, in seconds. :param info: Threshold time to log at the "info" level. :param warning: Threshold time to log at the "warning" level. """ def __init__(self, sinks=[], info=0.1, warning=0.5): super().__init__(sinks=sinks) self._info_t = info self._warning_t = warning self._log = logging.getLogger("SinkTimer")
[docs] def receive(self, data): """ Accept ``data`` and pass on to connected sinks. :param data: Data to process. """ self.send(data)
[docs] def send(self, data): """ Send ``data`` to each connected sink, alerting if their processing time exceeds given values. """ for s in self.sinks: t_start = perf_counter() s.receive(data) t = perf_counter() - t_start if t >= self._warn_t: self._log.warning(f"{s} processed in {t:g} s") elif t >= self._info_t: self._log.info(f"{s} processed in {t:g} s")