Source code for datalogd.plugins.sensoridentifierdatafilter

import sys
import re
import json
import logging
from datetime import datetime, timezone, timedelta
import asyncio
import math
import copy

from datalogd import DataFilter, listify

try:
    import numpy as np
except (ModuleNotFoundError, ImportError):
    log = logging.getLogger(__name__.rpartition(".")[2])
    log.warning("numpy module available, so won't be available in sensor \"eval\" expressions. Install it with \"pip install numpy\" or similar.")

[docs] class SensorIdentifierDataFilter(DataFilter): #: Message for when a sensor has never sent any data. NOT_PRESENT = "not present" #: Message for when a sensor has previously sent data, but hasn't updated within the expected timeframe. NO_UPDATES = "no updates since {}" #: Message for when a sensor value has exceeded it's expected range. OUT_OF_RANGE = "out of range" #: Dictionary of built-in variables and functions available in eval expressions. _BUILTINS = {k: v for k, v in globals()["__builtins__"].items() if k in [ "abs", "all", "any", "ascii", "bin", "chr", "divmod", "format", "hash", "hex", "len", "max", "min", "oct", "ord", "pow", "repr", "round", "sorted", "sum", "False", "True", "None", "bool", "bytearray", "bytes", "complex", "dict", "enumerate", "filter", "float", "int", "list", "map", "range", "reversed", "str", "tuple", "type", "zip"]} """ Assign identifiers and additional properties to sensor data based off matching rules. """ def __init__(self, sinks=[], rules="sensoridentifier_rules.json"): super().__init__(sinks=sinks) self._log = logging.getLogger(__name__) if rules[-5:] in (".JSON" ".json"): # If rules string looks like a filename, attempt to load JSON from file try: with open(rules) as f: self._rules = json.load(f) except Exception as ex: self._log.error(f"Couldn't load matching rules from file: {rules}, {ex}") self._rules = [] elif type(rules) == str: # Attempt to interpret string as raw JSON try: self._rules = json.loads(rules) except Exception as ex: self._log.error(f"Couldn't interpret matching rules string as JSON (note JSON files must end with a \".json\" extension):\n{rules}\n{ex}") self._rules = [] elif type(rules) == list: # Assume correct data structure passed in self._rules = rules else: self._rules = rules # Do some basic sanity checking on rules here # E.g. ensure sensor_identifier and other essentials defined for i, r in enumerate(self._rules.copy()): if not "selector" in r: self._log.warning(f"Rule set {i} does not contain a \"selector\" entry thus won't match any data, dropping rule.") self._rules.remove(r) if (not "update" in r) and (not "eval" in r): self._log.warning(f"Rule set {i} does not contain an \"update\" or \"eval\" entry thus won't modify data, dropping rule.") self._rules.remove(r) if not "sensor_identifier" in r["update"]: self._log.warning(f"Rule set {i} \"update\" entry does not contain the mandatory \"sensor_identifier\" entry, dropping rule.") self._rules.remove(r) # Dictionary of faults currently detected # Keys are sensor identifier, values list of strings describing fault(s) self._faults = {} # Determine interval needed to check sensor update age self._watchdog_interval = float("inf") for r in self._rules: try: if not r["critical"]["interval"]: # Loss/late updates not critical, don't need to check this against this rule continue try: interval = timedelta(seconds=r["update"]["interval"]) except: self._log.warning(f"Rule for {r['update']['sensor_identifier']} states update interval is critical, but no interval is defined.") continue interval = float(r["update"]["interval"]) if 0.0 < interval < self._watchdog_interval: self._watchdog_interval = interval except KeyError: pass # Get reference to event loop and schedule task (note loop probably hasn't started yet) self._loop = asyncio.get_event_loop() if self._watchdog_interval < float("inf"): self._watchdog_task = self._loop.create_task(self._watchdog_update()) # For python 3.8+: , name="SensorIdentifier Watchdog") else: self._watchdog_task = None async def _watchdog_update(self): """ Check sensor rules and send OK/Fault messages to connected sinks if update ages exceeded. """ while True: now = datetime.now(timezone.utc).astimezone() for r in self._rules: try: identifier = r['update']['sensor_identifier'] if not r["critical"]["interval"]: # Loss/late updates not critical, don't need to check this against this rule continue interval = timedelta(seconds=r["update"]["interval"]) # Interval exists, check against timestamp (or fail if no timestamp yet) if (not "timestamp" in r): # No timestamp, sensor has never sent any data description = SensorIdentifierDataFilter.NOT_PRESENT elif (now > r["timestamp"] + 1.1*interval): # Update hasn't been seen within given interval description = SensorIdentifierDataFilter.NO_UPDATES.format(r['timestamp'].isoformat()) else: description = "" if description: # A fault was detected if not identifier in self._faults: # Create list of faults for this sensor self._faults[identifier] = [] if not description in self._faults[identifier]: # Add description of faults to list if not already present self._faults[identifier].append(description) else: # OK, updated within specified interval if identifier in self._faults: if SensorIdentifierDataFilter.OUT_OF_RANGE in self._faults[identifier]: # Range error still valid, leave it in the list self._faults[identifier] = [SensorIdentifierDataFilter.OUT_OF_RANGE] else: # Clear this sensor from faults self._faults.pop(identifier) except KeyError: pass # Send watchdog message indicating OK/Fault self._send_watchdog() # Wait before continuing the loop await asyncio.sleep(self._watchdog_interval) def _send_watchdog(self): """ Send a watchdog type message with current fault conditions, if any. """ message = { "type": "watchdog", "sensor_identifier": "WATCHDOG", "sensor_name": "Watchdog", "value": "OK", "range": ["OK", "OK"], "critical_range": True, } if self._faults: message.update({ "value": "FAULT", "faults": self._faults.copy() }) self.send(message)
[docs] def receive(self, data): """ Accept the provided ``data``, and assign identifier and additional properties based on the matching rules. :param data: Data to filter. """ data = copy.deepcopy(listify(data)) # Timestamp to use for this set of data timestamp = datetime.now(timezone.utc).astimezone() # Flag to indicate fault conditions have changed updated_faults = False for d in data: # Look through rules for matching criteria for r in self._rules: # Start by assuming rules match this data item match = True try: for s_k, s_v in r["selector"]: # Try looking for this key in this data entry v = d[s_k] # This key exists, check its value if s_v == v or ((type(s_v) == type(v) == str) and re.fullmatch(s_v, v)) or s_v is NotImplemented: # Value matches this rule so far, keep checking any remaining selection criteria continue else: # Value doesn't match this rule, can stop checking now match = False break except (IndexError, KeyError, ValueError) as ex: # An exception means we couldn't match this data entry to this rule match = False # Finished checking this data item, act if it matched selection criteria if match: # This data entry matched the selection criteria for this rule # Perform requested updates to data fields to add/change data fields d.update(r["update"]) # Add critical conditions if they exist try: d["critical_range"] = r["critical"]["range"] except: pass try: d["critical_interval"] = r["critical"]["interval"] except: pass d["timestamp"] = timestamp # Keep timestamp in rules dictionary for our records (to check on update ages) r["timestamp"] = timestamp # Perform function evaluations to calculate new values, apply calibration curves etc if "eval" in r and isinstance(r["eval"], dict): for k, expr in r["eval"].items(): try: #self._log.debug(f"Evaluating {k} = {expr}") # Construct a slightly safe environment in which to run eval expressions eval_globals = {"math": math, "__builtins__": SensorIdentifierDataFilter._BUILTINS} # Add numpy functionality if numpy module has been loaded if "numpy" in sys.modules: eval_globals["np"] = np d[k] = eval(expr, eval_globals, d) except Exception as ex: self._log.warning(f"Error during eval of \"{k} = {expr}\": {ex}") # Update fault list (clear presence/update interval, check valid range) try: identifier = r['update']['sensor_identifier'] # Update occurred, clear presence/interval faults if they exist if identifier in self._faults and SensorIdentifierDataFilter.NOT_PRESENT in self._faults[identifier]: self._faults[identifier].remove(SensorIdentifierDataFilter.NOT_PRESENT) updated_faults = True if identifier in self._faults and SensorIdentifierDataFilter.NO_UPDATES in self._faults[identifier]: self._faults[identifier].remove(SensorIdentifierDataFilter.NO_UPDATES) updated_faults = True # Check valid range if range is marked as critical if r["critical"]["range"]: value = d["value"] if ((value < r["update"]["range"][0]) or (value > r["update"]["range"][1]) or (value != value) or value in (None, "NaN", "nan", "NAN")): # Value out of range or is NaN if not identifier in self._faults: # Create list of faults for this sensor self._faults[identifier] = [] if not SensorIdentifierDataFilter.OUT_OF_RANGE in self._faults[identifier]: # Add out of range description to faults list if not already present self._faults[identifier].append(SensorIdentifierDataFilter.OUT_OF_RANGE) updated_faults = True else: # Value in range, clear any previous out-of-range error if identifier in self._faults and SensorIdentifierDataFilter.OUT_OF_RANGE in self._faults[identifier]: self._faults[identifier].remove(SensorIdentifierDataFilter.OUT_OF_RANGE) updated_faults = True # Clear sensor from fault list if no faults remaining if identifier in self._faults and not self._faults[identifier]: self._faults.pop(identifier) except (KeyError, TypeError) as ex: pass # Send watchdog message indicating faults if new out-of-range sensors discovered if updated_faults: self._send_watchdog() # Send updated data onto connected sinks self.send(data)