Source code for datalogd.plugins.heartbeat_datasource
import asyncio
import logging
from datalogd import DataSource
[docs]
class HeartbeatDataSource(DataSource):
"""
Generate a regular heartbeat message for failsafe or keep-alive type purposes.
A basic message will be generated at regular intervals (default 1.0 second), controlled by the
``interval`` parameter. By default an integer counter will be appended to the message as the
``value`` field. The initial value for the counter can be set using the ``counter`` paramter, or
set ``counter=False`` to disable.
The message can be set with the ``message`` parameter. To use the counter functionality, the
message should be of a dictionary type.
:param interval: Time interval between heartbeat messages, in seconds.
:param counter: Append an integer counter to the message data.
:param message: Message to use as the heartbeat data.
"""
def __init__(self, sinks=[], interval=1.0, counter=True, message={"type": "config", "id": "heartbeat"}):
super().__init__(sinks=sinks)
self._log = logging.getLogger("HeartbeatDataSource")
self.interval = float(interval)
try:
self.counter = int(counter)
except:
self.counter = int(bool(counter))
if self.counter and not isinstance(message, dict):
self._log.warning("Heartbeat counter only works with dictionary type messages.")
self.counter = 0
self.message = message
# Queue first call of update routine
self._loop = asyncio.get_event_loop()
self._loop.call_soon(self.generate_heartbeat)
[docs]
def generate_heartbeat(self):
"""
Generate the heartbeat message and send to any connected sinks.
"""
if isinstance(self.message, dict):
data = self.message.copy()
else:
data = self.message
# If non-zero counter, use as value in message and increment it
if self.counter:
data.update({"value": self.counter})
self.counter += 1
self.send([data])
# Schedule next update
self._loop.call_later(self.interval, self.generate_heartbeat)