import logging
from datetime import datetime, timezone
from datalogd import DataSink, listify
try:
from influxdb_client import InfluxDBClient
#from influxdb_client.client.write_api import SYNCHRONOUS
except (ModuleNotFoundError, ImportError):
log = logging.getLogger(__name__.rpartition(".")[2])
log.warning("influxdb_client module not found. Install it with \"pip install influxdb_client\" or similar.")
else:
# Required modules present, continue loading rest of this module
[docs]
class InfluxDB2DataSink(DataSink):
"""
Connection to a InfluxDB 2.x (or 1.8+) database for storing time-series data.
Note that this doesn't actually run the InfluxDB database service, but simply connects to an
existing InfluxDB database via a network (or localhost) connection. See the `getting started
<https://docs.influxdata.com/influxdb/v2.4/get-started/>`__ documentation for details on
configuring a new database server.
The ``url`` parameter should be a string specifying the protocol, server ip or name, and
port. For example, ``url="http://localhost:8086"``.
The authentication ``token`` parameter needs to be specified to allow commits to the
database. See the `token <https://docs.influxdata.com/influxdb/v2.4/security/tokens/>`__
documentation to see how to create and obtain tokens.
Parameters for ``org``, ``bucket`` must correspond to a valid organisation and bucket
created in the database for which the authentication token is valid. See the documentation
for `organisations <https://docs.influxdata.com/influxdb/v2.4/organizations/>`__ and
`buckets <https://docs.influxdata.com/influxdb/v2.4/organizations/buckets/>`__ for details.
The ``measurement`` parameter specifies the data point measurement (or "table") the data
will be entered into, and does not need to already exist. See the documentation on `data
elements
<https://docs.influxdata.com/influxdb/v2.4/reference/key-concepts/data-elements/#measurement>`__
for details.
A ``run_id`` parameter may be passed which will be added as a tag to the data points. It may
be used to identify data obtained from this particular run of the data logging session. If
no value is provided, a value will be generated from a YYYYMMDD-HHMMSS formatted time stamp.
The data point field key will be attempted to be determined automatically from the incoming
data dictionaries. If the data dictionary contains a ``name`` or ``label`` key, then its
value will be used as the database point field key. Alternatively, a field key will be
generated from the values of ``type`` and ``id`` if present. Finally, a default field key of
``data`` will be used. To instead specify the data entry which should provide the field key,
specify it as the ``field_key`` parameter. If the field is specified by a parameter or taken
from a name or label, then those will not also be included in the entry's database keys.
However, if the field name is automatically built from type and id values, these will still
be part of the entries keys.
Similarly, the data point field value will use the value from the incoming data dictionary's
``value`` field if present. To instead specify the data entry which should provide the field
value, specify it as the ``field_value`` parameter. The value won't also appear in the
database entry's keys.
:param url: Protocol, host name or IP address, and port number of InfluxDB server.
:param token: API token used to authenticate with the InfluxDB server.
:param org: Name of InfluxDB organisation in which to store data.
:param bucket: Name of InfluxDB bucket in which to store data.
:param measurement: Name for the InfluxDB measurement session.
:param run_id: A tag to identify commits from this run.
:param field_key: A field from the incoming data used to determine the data point field key.
:param field_value: A field from the incoming data used to determine the data point field value.
"""
def __init__(self, url="http://localhost:8086", token="", org="default", bucket="default", measurement="measurement", run_id=None, field_key=None, field_value=None):
self.log = logging.getLogger("InfluxDB2DataSink")
try:
self.client = InfluxDBClient(url, token=token, org=org)
except Exception as ex:
self.log.exception("Unable to make connection to InfluxDB database.")
self.write_api = self.client.write_api()
self.org = org
self.bucket = bucket
self.measurement = measurement
if run_id is None:
self.run_id = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
else:
self.run_id = run_id
self.field_key = field_key
self.field_value = field_value
[docs]
def receive(self, data):
"""
Commit data to the InfluxDB database.
Multiple items of data can be submitted at once if ``data`` is a
list. A typical format of ``data`` would be::
[
{'type': 'temperature', 'id': '0', 'value': 22.35},
{'type': 'humidity', 'id': '0', 'value': 55.0},
{'type': 'temperature', 'id': '1', 'value': 25.80},
]
In the above case (assuming the ``field_key`` and ``field_value`` parameters were not
supplied when initialising the plugin), the InfluxDB data point field would be generated
as ``<type>_<id> = <value>``, and only the global ``run_id`` parameter would be entered
into the data point keys.
If a ``name`` or ``label`` field is present, then it will instead be used as the
InfluxDB data point field key. For example::
[
{'name': 'Temperature', 'type': 'temperature', 'id': '0', 'value': 22.35},
{'name': 'Humidity', 'type': 'humidity', 'id': '0', 'value': 55.0},
]
In this case, the InfluxDB data point field would be generated as ``<name> = <value>``,
and the remaining fields (``type`` and ``id``) would be added as data point field keys,
along with the ``run_id``.
A timestamp for the commit will be generated using the current system clock if a
"timestamp" field does not already exist.
:param data: Data to commit to the database.
"""
if data is None or data == []: return
if not self.client.ping():
self.log.debug("No database connection, data won't be stored.")
return
data = listify(data)
# Loop through each element in data list
for d in data:
if type(d) == dict:
# Don't modify the original dict data
d = d.copy()
# Start building the structure to enter into database
datapoint = {
"measurement": self.measurement,
"tags": {"run_id": self.run_id},
"fields": {},
}
# Attempt to determine a suitable field name
if self.field_key is not None and self.field_key in d.keys():
# If database field key was specified, use it (don't also add as a tag)
k = str(d.pop(self.field_key))
elif "name" in d.keys():
# Format as <name> = <value> (don't also add as a tag)
k = str(d.pop("name"))
elif "label" in d.keys():
# Format as <label> = <value> (don't also add as a tag)
k = str(d.pop("label"))
elif "type" in d.keys():
# Format as <type>_<id> = <value> (but keep type, id as tags)
k = str(d["type"]) + ("_" + str(d["id"])) if "id" in d.keys() else ""
else:
k = "data"
# Attempt to determine the correct field value
if self.field_value is not None and self.field_value in d.keys():
# If database field value was specified, use it
v = str(d.pop(self.field_value))
elif "value" in d.keys():
v = d.pop("value")
else:
v = None
# Add the field to the data point
datapoint["fields"][k] = v
# Use the provided timestamp if available, else create one
if "timestamp" in d.keys():
timestamp = d.pop("timestamp")
if type(timestamp) == datetime:
datapoint["time"] = timestamp.isoformat()
else:
datapoint["time"] = timestamp
else:
datapoint["time"] = datetime.now(timezone.utc).astimezone().isoformat()
# Add any entries which weren't used in the field to the tags
datapoint["tags"].update(d)
# Send data point out to database
try:
self.log.debug(f"Committing: {datapoint}")
self.write_api.write(self.bucket, self.org, datapoint)
except Exception as ex:
self.log.warning("Unable to commit data to InfluxDB database.")
[docs]
def close(self):
"""
Close the connection to the database.
"""
self.client.close()