Source code for datalogd.plugins.polynomialfunction_datafilter


import logging

try:
    import numpy as np
except (ModuleNotFoundError, ImportError):
    log = logging.getLogger(__name__.rpartition(".")[2])
    log.warning("numpy module not found. Install it with \"pip install numpy\" or similar.")
else:
    # OK, continue loading rest of module
    import re
    
    from datalogd import DataFilter, listify

[docs] class PolynomialFunctionDataFilter(DataFilter): r""" Select data based on key--value pairs, then apply a polynomial function to a value. Any data which matches all of the ``match_keyvals`` key--value pairs will be processed. The format of the ``match_keyvals`` parameter is a list in the form ``[[key, value], [key2, value2]...]``. For example, ``match_keyvals=[["type", "temperature"], ["id", "123"]]`` will process any data which has a ``"type"`` field of ``"temperature"`` and an ``"id"`` field of ``"123"``. A ``value`` of the python special ``NotImplemented`` will match any value for the given key. In the case that values are strings, they will be matched as regular expressions, for example ``".*"`` will match any string. Once a data item is matched, a value will be selected to apply the polynomial function to, selected by the ``value`` parameter. By default this is the value stored under the ``"value"`` key. The polynomial function is defined by a set of coefficients, given by the ``coeffs`` parameter. This is an array of :math:`n` coefficients, :math:`c_n`, which forms the function :math:`x^\prime = \sum_n c_n x^{(n-1)} \equiv c_0 + c_1x + c_2x^2 \ldots c_nx^n`. For example, ``coeffs=[1.23, 1.0]`` would add 1.23 to a value, while ``coeffs=[0, 10]`` would multiply a value by 10. Specifying additional coefficients include quadratic, cubic terms etc. Rounding may be applied to the result by supplying the number of decimal places in the ``rounding`` parameter. Rounding behaviour is determined by the numpy ``around()`` function. Negative numbers specify positions to the left of the decimal point. The value of the data entry's ``"units"`` field can be modified or created using the ``units`` parameter. For example, ``units="V"`` might be used to indicate that an analogue measurement in arbitrary units now equates to voltage, determined by the polynomial function calibration curve. :param match_keyvals: Key--value pairs to match to data items. :param value: Key from data item containing the value to modify. :param coeffs: Coefficients of the polynomial function to apply. :param rounding: Number of decimal places to round the result. :param units: New value of units field for the modified data item. """ def __init__(self, sinks=[], match_keyvals=[["type", ".*"], ["id", ".*"]], value="value", coeffs=[0.0, 1.0], rounding=None, units=None): super().__init__(sinks=sinks) self._keyvals = match_keyvals self._value = value self._coeffs = coeffs self._rounding = rounding self._units = units
[docs] def receive(self, data): """ Accept the provided ``data``, select based on key/value pairs, apply function, and pass onto connected sinks. The selection is based upon the parameters provided to the constructor of this :class:`~datalogd.plugins.polynomialfunction_datafilter.PolynomialFunctionDataFilter`. :param data: Data to correct. """ data = listify(data) for d in data: try: # Look for matches to all key/value pairs match = True for kv_k, kv_v in self._keyvals: # Try looking for this key in this data entry v = d[kv_k] # This key exists, check its value if kv_v == v or ((type(kv_v) == type(v) == str) and re.fullmatch(kv_v, v)) or kv_v is NotImplemented: # Value matches, keep checking any remaining keyval pairs continue else: # Value doesn't match, can stop checking now match = False break if match: # Retrieve specified value to correct x = d[self._value] # Allow correcting numpy arrays if isinstance(x, np.ndarray): y = np.zeros_like(x) else: y = 0 # Apply polynomial function for n, c in enumerate(self._coeffs): y += c*x**n # Apply rounding if requested if self._rounding is not None: y = np.round(y, int(self._rounding)) d[self._value] = y # Change or add new units to the data if given if self._units is not None: d["units"] = self._units except (IndexError, KeyError, ValueError): # An exception means we couldn't match this data entry, or find specified value pass self.send(data)