Source code for datalogd.plugins.influxdb_datasink
import logging
from datetime import datetime, timezone
from datalogd import DataSink, listify
try:
from influxdb import InfluxDBClient
except (ModuleNotFoundError, ImportError):
log = logging.getLogger(__name__.rpartition(".")[2])
log.warning("influxdb module not found. To use the InfluxDBDataSink, install the module with \"pip install influxdb\" or similar.")
else:
# Required modules present, continue loading rest of this module
[docs] class InfluxDBDataSink(DataSink):
"""
Connection to a InfluxDB database for storing time-series data.
.. Note::
This plugin is outdated. For new InfluxDB 2.x installs, use the
:class:`~datalogd.plugins.influxdb2_datasink.InfluxDB2DataSink` plugin instead.
Note that this doesn't actually run the InfluxDB database service, but simply connects to an
existing InfluxDB database via a network (or localhost) connection. See the `getting started
<https://v2.docs.influxdata.com/v2.0/get-started/>`_ documentation for details on
configuring a new database server.
:param host: Host name or IP address of InfluxDB server.
:param port: Port used by InfluxDB server.
:param user: Name of database user.
:param password: Password for database user.
:param dbname: Name of database in which to store data.
:param session: A name for the measurement session.
:param run: A tag to identify commits from this run. Default of ``None``
will use a date/time stamp.
"""
def __init__(self, host="localhost", port=8086, user="root", password="root", dbname="datalogd", session="default", run=None):
self.log = logging.getLogger("InfluxDBDataSink")
try:
self.client = InfluxDBClient(host, port, user, password, dbname)
except Exception as Ex:
self.log.warning("Unable to make connection to InfluxDB database.")
self.session = session
if run is None:
self.run = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
else:
self.run = run
[docs] def receive(self, data):
"""
Commit data to the InfluxDB database.
Multiple items of data can be submitted at once if ``data`` is a list. A typical format
of ``data`` would be::
[
{'type': 'temperature', 'id': '0', 'value': 22.35},
{'type': 'humidity', 'id': '0', 'value': 55.0},
{'type': 'temperature', 'id': '1', 'value': 25.80},
]
The data point will have its data field generated using the form ``<type>_<id> =
<value>``.
A timestamp for the commit will be generated using the current system clock if a
"timestamp" field does not already exist.
:param data: Data to commit to the database.
"""
if data is None or data == []: return
data = listify(data)
# Start building the structure to enter into database
datapoints = {
"measurement": self.session,
"tags": {"run": self.run},
"fields": {},
}
for d in data:
if "type" in d.keys():
k = str(d["type"]) + ("_" + str(d["id"])) if "id" in d.keys() else ""
v = d["value"] if "value" in d.keys() else None
else:
k = str(d)
v = None
datapoints["fields"][k] = v
if "timestamp" in d.keys():
if type(d["timestamp"]) == datetime:
datapoints["time"] = d["timestamp"].isoformat()
else:
datapoints["time"] = d["timestamp"]
else:
datapoints["time"] = datetime.now(timezone.utc).astimezone().isoformat()
# Send structure out to database
try:
self.log.debug(f"Comitting: {datapoints}")
self.client.write_points([datapoints])
except Exception as ex:
self.log.warning("Unable to commit data to InfluxDB database.", exc_info=True)
[docs] def close(self):
"""
Close the connection to the database.
"""
self.client.close()