Using Multiple Data Sources, Filters, and Sinks¶
This recipe demonstrates a more complicated scenario where multiple data sources are used. The data is manipulated using filters, and then sent to three different destinations.
A PicoTech TC-08 thermocouple data logger is used to monitor several temperatures. Additionally, an Arduino interfaces with commonly available DS18B20 temperature sensors and a fluid flow sensor (such as the YF-S401, YF-S402B etc). Data filters are used to calibrate/correct the sensor values, and finally to compute power dissipation into the cooling water.
Data is timestamped, converted to CSV, and saved to files. It is also displayed on the console and sent to a remote InfluxDB instance.
Hardware¶
The PicoTech TC-08 is connected via USB. The appropriate drivers and interface library
(libusbtc08.so
on Linux, libusbtc08.dll
on Windows) must be installed. Six type-K
thermocouples are connected to ports 1, 2, 4, 5, 7, 8.
For the Arduino, two temperature sensors are required, one each for the inlet and outlet water temperatures. The flow sensor should be connected in line, preferably on the cold side. The calculations need to convert the pulse counts from the flow sensor to a fluid flow rate. Combined with the temperature difference of the water, the power dissipated into the liquid can be determined.
#include <Arduino.h>
#include <Wire.h>
#include <SPI.h>
////////////////////////////////////////////////////////////
// Device Configuration Settings
////////////////////////////////////////////////////////////
// An ID string for this Arduino
#define BOARD_ID_STRING "A"
// Interval between reads of devices
#define READ_INTERVAL 2000
// Interval between empty "keep alive" messages to maintain connection
#define KEEPALIVE_INTERVAL 1000
// Select which types of sensors to use
#define USE_DIGITAL_PINS true
#define USE_ANALOG_PINS true
#define USE_DS18B20_TEMPERATURE true
#define USE_BH1750_LUX false
#define USE_COUNTER false
////////////////////////////////////////////////////////////
// Pin Definitions and Sensor Configuration
////////////////////////////////////////////////////////////
#if USE_ANALOG_PINS
// Set of analog input pins to read
const int analog_pins[] = {A0, A1, A2, A3, A4, A5};
#endif
#if USE_DIGITAL_PINS
// Set of digital input pins to read
const int digital_pins[] = {4, 5, 6};
#endif
#if USE_DS18B20_TEMPERATURE
// Run the 1-wire bus on pin 12
const int onewire_pin = 12;
#endif
#if USE_COUNTER
// Flow sensor pulse pin input, must be interrupt enabled
// These are pins 0, 1, 2, 3, 7 for a Leonardo board
// Note that Leonardo needs pins 0+1 for Serial1 and 2+3 for I2C
const int counter_pin = 7;
// Pin where an LED is connected, will toggle LED in sync with incoming pulses
// Set to 0 to disable
const int led_pin = 13;
#endif
////////////////////////////////////////////////////////////
#if USE_DS18B20_TEMPERATURE
#include <OneWire.h>
#include <DallasTemperature.h>
// Initialise the 1-Wire bus
OneWire oneWire(onewire_pin);
// Pass our 1-Wire reference to Dallas Temperature
DallasTemperature thermometers(&oneWire);
#endif
#if USE_BH1750_LUX
#include <hp_BH1750.h>
// Reference to the BH1750 light meter module over I2C
hp_BH1750 luxmeter;
#endif
#if USE_COUNTER
// Number of pulses read from the flow meter
volatile unsigned long counter_count = 0;
// Stored start time and pulse count for flow rate calculation
unsigned long counter_start_millis = 0;
unsigned long counter_start_count = 0;
volatile unsigned int led_state = LOW;
#endif
// Variable to record last data acquisition time
unsigned long measurement_start_millis = 0;
unsigned long keepalive_start_millis = 0;
// Variable to keep track of whether record separators (comma) needs to be prepended to output
bool first_measurement = true;
#if USE_DS18B20_TEMPERATURE
// Format a DS18B20 device address to a 16-char hex string
String formatAddress(DeviceAddress address) {
String hex = "";
for (uint8_t i = 0; i < 8; i++) {
if (address[i] < 16) hex += "0";
hex += String(address[i], HEX);
}
return hex;
}
#endif
// Print out a measurement to the serial port
void printMeasurement(String type, String id, String value, String units="") {
// A comma separator needs to be prepended to measurements other than the first
if (first_measurement) {
first_measurement = false;
} else {
Serial.print(",");
}
Serial.print("{\"type\":\"");
Serial.print(type);
Serial.print("\",\"source\":\"");
Serial.print(BOARD_ID_STRING);
Serial.print("\",\"id\":\"");
Serial.print(BOARD_ID_STRING);
Serial.print("_");
Serial.print(id);
Serial.print("\",\"value\":\"");
Serial.print(value);
if (units.length() > 0) {
Serial.print("\",\"units\":\"");
Serial.print(units);
}
Serial.print("\"}");
}
#if USE_COUNTER
// Interrupt handler for a pulse from the flow meter
void counterIncrement() {
counter_count++;
if (led_pin != 0) {
digitalWrite(led_pin, led_state = !led_state);
}
}
#endif
void setup(void)
{
// Open serial port
Serial.begin(115200);
#if USE_DS18B20_TEMPERATURE
// Initialise I2C bus
Wire.begin();
pinMode(onewire_pin, INPUT_PULLUP);
#endif
#if USE_DIGITAL_PINS
// Configure set of digital input pins
for (uint8_t i = 0; i < uint8_t(sizeof(digital_pins)/sizeof(digital_pins[0])); i++) {
pinMode(digital_pins[i], INPUT);
}
#endif
#if USE_COUNTER
// Configure the flow meter input pin and interrupt for pulse counting
pinMode(counter_pin, INPUT_PULLUP);
attachInterrupt(digitalPinToInterrupt(counter_pin), counterIncrement, RISING);
// LED to toggle if defined
if (led_pin != 0) {
pinMode(led_pin, OUTPUT);
digitalWrite(led_pin, led_state);
}
counter_start_millis = millis();
#endif
}
void loop(void)
{
// Record current time
unsigned long current_millis = millis();
// Check if it's time to take some new measurements
if (current_millis - measurement_start_millis >= READ_INTERVAL) {
measurement_start_millis = current_millis;
// The first measurement in this cycle doesn't need a comma delimiter prepended
first_measurement = true;
// Print message start
Serial.print("{\"board\":\"" + String(BOARD_ID_STRING) + "\",");
Serial.print("\"timestamp\":\"" + String(measurement_start_millis) + "\",");
Serial.print("\"message\":\"measurement\",\"data\":[");
///////////////////////////////////////////////////////////////////////////
// Arduino Digital Pins
///////////////////////////////////////////////////////////////////////////
#if USE_DIGITAL_PINS
// Read digital pins
unsigned int d = 0;
for (uint8_t i = 0; i < uint8_t(sizeof(digital_pins)/sizeof(digital_pins[0])); i++) {
d += digitalRead(digital_pins[i]) << i;
}
printMeasurement("digital", "0", String(d));
#endif
///////////////////////////////////////////////////////////////////////////
// Arduino Analog Pins
///////////////////////////////////////////////////////////////////////////
#if USE_ANALOG_PINS
// Read analog pins
for (uint8_t i = 0; i < uint8_t(sizeof(analog_pins)/sizeof(analog_pins[0])); i++) {
printMeasurement("analog", String(i), String(analogRead(analog_pins[i])));
}
#endif
///////////////////////////////////////////////////////////////////////////
// DS18B20 Temperature Probes
///////////////////////////////////////////////////////////////////////////
#if USE_DS18B20_TEMPERATURE
// We'll reinitialise the temperature probes each time inside the loop so that
// devices can be connected/disconnected while running
thermometers.begin();
// Temporary variable for storing 1-Wire device addresses
DeviceAddress address;
// Grab a count of temperature probes on the wire
unsigned int numberOfDevices = thermometers.getDeviceCount();
// Loop through each device, set requested precision
for(unsigned int i = 0; i < numberOfDevices; i++) {
if(thermometers.getAddress(address, i)) {
thermometers.setResolution(address, 12);
}
}
// Issue a global temperature request to all devices on the bus
if (numberOfDevices > 0) {
thermometers.requestTemperatures();
}
// Loop through each device, print out temperature data
for(unsigned int i = 0; i < numberOfDevices; i++) {
if(thermometers.getAddress(address, i)) {
printMeasurement("temperature", formatAddress(address), String(thermometers.getTempC(address), 2), "C");
}
}
#endif
///////////////////////////////////////////////////////////////////////////
// BH1750 Lux Meter
///////////////////////////////////////////////////////////////////////////
#if USE_BH1750_LUX
// Attempt to initialise and read light meter sensor
if (luxmeter.begin(BH1750_TO_GROUND)) {
luxmeter.start();
printMeasurement("lux", "0", String(luxmeter.getLux(), 0), "lux");
}
#endif
///////////////////////////////////////////////////////////////////////////
// Fluid Flow Meter
///////////////////////////////////////////////////////////////////////////
#if USE_COUNTER
unsigned long counter_end_count = counter_count;
unsigned long counter_end_millis = millis();
// Total volume in sensor pulses
printMeasurement("counter_total", "0", String(counter_end_count), "counts");
// Current flow rate in pulses per minute
float counter_rate = 1000.0*(counter_end_count - counter_start_count)/(counter_end_millis - counter_start_millis);
printMeasurement("counter_rate", "0", String(counter_rate, 4), "Hz");
counter_start_count = counter_end_count;
counter_start_millis = counter_end_millis;
#endif
// Print message end
Serial.println("]}");
} else if (current_millis - keepalive_start_millis >= KEEPALIVE_INTERVAL) {
// Send keepalive packet to maintain serial communications
keepalive_start_millis = current_millis;
// Print empty message
Serial.print("{\"board\":\"" + String(BOARD_ID_STRING) + "\",");
Serial.print("\"timestamp\":\"" + String(keepalive_start_millis) + "\",");
Serial.println("\"message\":\"measurement\",\"data\":[]}");
}
}
Ensure the appropriate sensors are selected in the Arduino code using the define statements, for example:
#define USE_DIGITAL_PINS false #define USE_ANALOG_PINS false #define USE_DS18B20_TEMPERATURE true #define USE_BH1750_LUX false #define USE_COUNTER true
Connect the thermometer’s VCC to +5 V, GND to ground, and DATA to pin 12 (or another, to match that specified in the code for the 1-Wire bus). You may need a pullup resistor between the VCC and DATA pins if your thermometer modules don’t include one. Connect the flow meter’s VCC to +5 V, GND to ground, and sense wire to pin 7 (or another interrupt-enabled pin, to match that specified in the code for the pulse counter). Plug the Arduino into your computer USB cable.
Recipe¶
[datalogd]
connection_graph =
digraph {
// PicoTech TC08 thermocouples
tc08 [class=PicoTC08DataSource, interval=1.0, probes="[
[1, 'Reactor1', 'K', 'C'],
[2, 'Reactor2', 'K', 'C'],
[4, 'Radiator', 'K', 'C'],
[5, 'Waterbath', 'K', 'C'],
[7, 'Exhaust', 'K', 'C'],
[8, 'Extraction', 'K', 'C']
]"];
tc08_timestamp [class=TimeStampDataFilter];
// Save locally to CSV file
tc08_csv [class=CSVDataFilter, header="once"];
tc08_file [class=FileDataSink, filename="cooling_power_tc08.csv", filename_timestamp=True];
// Arduino for water temperatures, flow rate, light sensor...
arduino [class=SerialDataSource];
// Some processing to compute power dissipated into cooling water
a_timestamp [class=TimeStampDataFilter];
flowrate [class=FlowSensorCalibrationDataFilter, a=6011, k=0.15476481, x0=0, b=0.29542879];
tempcorrect [class=PolynomialFunctionDataFilter, match_keyvals="[['type', 'temperature'], ['id', 'A_2827e0853219013e']]", coeffs="[-0.44, 1.0]", rounding=2];
coolingpower [class=CoolingPowerDataFilter, temperature_id_in="A_28969c7e321901a7", temperature_id_out="A_2827e0853219013e"];
// Save locally to CSV file
a_csv [class=CSVDataFilter, header="once"];
a_file [class=FileDataSink, filename="cooling_power_arduino.csv", filename_timestamp=True];
// Display in console
print [class=LoggingDataSink];
// Save to remote InfluxDB
influx [class=InfluxDB2DataSink,
url="http://localhost:8086",
token="...APITokenWithWriteAccess...",
org="organisation_name",
bucket="bucket_name",
measurement="measurement_name"
];
// Hook everything up
tc08 -> tc08_timestamp -> tc08_csv -> tc08_file;
arduino -> a_timestamp -> flowrate -> tempcorrect -> coolingpower -> a_csv -> a_file;
tc08_timestamp -> print;
coolingpower -> print;
tc08_timestamp -> influx;
coolingpower -> influx;
}
The data input and processing steps are:
- Thermocouple readings are taken from the TC-08 using the
PicoTC08DataSource
data source. The parameters specify the sampling interval (in seconds) and the types of probes attached. Each probe is specified with the channel it is attached to, a label, thermocouple type, and desired units.- The serial data is read in from the Arduino using the
SerialDataSource
data source. The data contains the temperatures from the thermometers and the pulse counter’s number of pulses per second. The IDs of the temperatures correspond to the serial numbers of the thermometer devices.- Timestamps are added to both sources of data using two instances of
TimeStampDataFilter
.- Next, the pulse counter’s pulses-per-second is converted to a flow rate in litres-per-minute using the
FlowSensorCalibrationDataFilter
. The parameters are used in a calibration curve and have been experimentally determined for a YF-S401 flow sensor.- A simple offset is applied to one of the thermometers to correct for a slight difference in readings between the two thermometers. This uses a
PolynomialFunctionDataFilter
to subtract 0.44 ℃ from the reading.
The data flow is then split to several separate destinations:
- The data is formatted into a row of comma-separated values and written to a file. This uses a
CSVDataFilter
feeding in to aFileDataSink
. There is a separate file for each of the two data sources.- The raw data is displayed on the console using a
LoggingDataSink
.- The raw data is sent to a InfluxDB database instance. Example database details have been used here, and will need to be changed to match your specific database.