Using Multiple Data Sources, Filters, and Sinks

This recipe demonstrates a more complicated scenario where multiple data sources are used. The data is manipulated using filters, and then sent to three different destinations.

A PicoTech TC-08 thermocouple data logger is used to monitor several temperatures. Additionally, an Arduino interfaces with commonly available DS18B20 temperature sensors and a fluid flow sensor (such as the YF-S401, YF-S402B etc). Data filters are used to calibrate/correct the sensor values, and finally to compute power dissipation into the cooling water.

Data is timestamped, converted to CSV, and saved to files. It is also displayed on the console and sent to a remote InfluxDB instance.

Hardware

The PicoTech TC-08 is connected via USB. The appropriate drivers and interface library (libusbtc08.so on Linux, libusbtc08.dll on Windows) must be installed. Six type-K thermocouples are connected to ports 1, 2, 4, 5, 7, 8.

For the Arduino, two temperature sensors are required, one each for the inlet and outlet water temperatures. The flow sensor should be connected in line, preferably on the cold side. The calculations need to convert the pulse counts from the flow sensor to a fluid flow rate. Combined with the temperature difference of the water, the power dissipated into the liquid can be determined.

Get the Arduino code from GitLab or copy and paste below into a new Arduino sketch. Upload it to your Arduino.
datalog.ino
#include <Arduino.h>
#include <Wire.h>
#include <SPI.h>

////////////////////////////////////////////////////////////
// Device Configuration Settings
////////////////////////////////////////////////////////////

// An ID string for this Arduino
#define BOARD_ID_STRING "A"

// Interval between reads of devices
#define READ_INTERVAL 2000
// Interval between empty "keep alive" messages to maintain connection
#define KEEPALIVE_INTERVAL 1000

// Select which types of sensors to use
#define USE_DIGITAL_PINS true
#define USE_ANALOG_PINS true
#define USE_DS18B20_TEMPERATURE true
#define USE_BH1750_LUX false
#define USE_COUNTER false

////////////////////////////////////////////////////////////
// Pin Definitions and Sensor Configuration
////////////////////////////////////////////////////////////

#if USE_ANALOG_PINS
  // Set of analog input pins to read
  const int analog_pins[] = {A0, A1, A2, A3, A4, A5};
#endif

#if USE_DIGITAL_PINS
  // Set of digital input pins to read
  const int digital_pins[] = {4, 5, 6};
#endif

#if USE_DS18B20_TEMPERATURE
  // Run the 1-wire bus on pin 12
  const int onewire_pin = 12;
#endif

#if USE_COUNTER
  // Flow sensor pulse pin input, must be interrupt enabled
  // These are pins 0, 1, 2, 3, 7 for a Leonardo board
  // Note that Leonardo needs pins 0+1 for Serial1 and 2+3 for I2C
  const int counter_pin = 7;
  // Pin where an LED is connected, will toggle LED in sync with incoming pulses
  // Set to 0 to disable
  const int led_pin = 13;
#endif

////////////////////////////////////////////////////////////


#if USE_DS18B20_TEMPERATURE
  #include <OneWire.h>
  #include <DallasTemperature.h>
  // Initialise the 1-Wire bus
  OneWire oneWire(onewire_pin);
  // Pass our 1-Wire reference to Dallas Temperature
  DallasTemperature thermometers(&oneWire);
#endif

#if USE_BH1750_LUX
  #include <hp_BH1750.h>
  // Reference to the BH1750 light meter module over I2C
  hp_BH1750 luxmeter;
#endif

#if USE_COUNTER
  // Number of pulses read from the flow meter
  volatile unsigned long counter_count = 0;
  // Stored start time and pulse count for flow rate calculation
  unsigned long counter_start_millis = 0;
  unsigned long counter_start_count = 0;
  volatile unsigned int led_state = LOW;
#endif


// Variable to record last data acquisition time
unsigned long measurement_start_millis = 0;
unsigned long keepalive_start_millis = 0;

// Variable to keep track of whether record separators (comma) needs to be prepended to output
bool first_measurement = true;


#if USE_DS18B20_TEMPERATURE
  // Format a DS18B20 device address to a 16-char hex string
  String formatAddress(DeviceAddress address) {
    String hex = "";
    for (uint8_t i = 0; i < 8; i++) {
      if (address[i] < 16) hex += "0";
      hex += String(address[i], HEX);
    }
    return hex;
  }
#endif

// Print out a measurement to the serial port
void printMeasurement(String type, String id, String value, String units="") {
  // A comma separator needs to be prepended to measurements other than the first
  if (first_measurement) {
    first_measurement = false;
  } else {
    Serial.print(",");
  }
  Serial.print("{\"type\":\"");
  Serial.print(type);
  Serial.print("\",\"source\":\"");
  Serial.print(BOARD_ID_STRING);
  Serial.print("\",\"id\":\"");
  Serial.print(BOARD_ID_STRING);
  Serial.print("_");
  Serial.print(id);
  Serial.print("\",\"value\":\"");
  Serial.print(value);
  if (units.length() > 0) {
    Serial.print("\",\"units\":\"");
    Serial.print(units);
  }
  Serial.print("\"}");
}

#if USE_COUNTER
  // Interrupt handler for a pulse from the flow meter
  void counterIncrement() {
    counter_count++;
    if (led_pin != 0) {
      digitalWrite(led_pin, led_state = !led_state);
    }
  }
#endif

void setup(void)
{
  // Open serial port
  Serial.begin(115200);

  #if USE_DS18B20_TEMPERATURE
    // Initialise I2C bus
    Wire.begin();
    pinMode(onewire_pin, INPUT_PULLUP);
  #endif

  #if USE_DIGITAL_PINS
    // Configure set of digital input pins
    for (uint8_t i = 0; i < uint8_t(sizeof(digital_pins)/sizeof(digital_pins[0])); i++) {
      pinMode(digital_pins[i], INPUT);
    }
  #endif

  #if USE_COUNTER
    // Configure the flow meter input pin and interrupt for pulse counting
    pinMode(counter_pin, INPUT_PULLUP);
    attachInterrupt(digitalPinToInterrupt(counter_pin), counterIncrement, RISING);
    // LED to toggle if defined
    if (led_pin != 0) {
      pinMode(led_pin, OUTPUT);
      digitalWrite(led_pin, led_state);
    }
    counter_start_millis = millis();
  #endif
}


void loop(void)
{
  // Record current time
  unsigned long current_millis = millis();
  // Check if it's time to take some new measurements
  if (current_millis - measurement_start_millis >= READ_INTERVAL) {
    measurement_start_millis = current_millis;
    // The first measurement in this cycle doesn't need a comma delimiter prepended
    first_measurement = true;

    // Print message start
    Serial.print("{\"board\":\"" + String(BOARD_ID_STRING) + "\",");
    Serial.print("\"timestamp\":\"" + String(measurement_start_millis) + "\",");
    Serial.print("\"message\":\"measurement\",\"data\":[");
    
    ///////////////////////////////////////////////////////////////////////////
    // Arduino Digital Pins
    ///////////////////////////////////////////////////////////////////////////
    #if USE_DIGITAL_PINS
      // Read digital pins
      unsigned int d = 0;
      for (uint8_t i = 0; i < uint8_t(sizeof(digital_pins)/sizeof(digital_pins[0])); i++) {
        d += digitalRead(digital_pins[i]) << i;
      }
      printMeasurement("digital", "0", String(d));
    #endif

    ///////////////////////////////////////////////////////////////////////////
    // Arduino Analog Pins
    ///////////////////////////////////////////////////////////////////////////
    #if USE_ANALOG_PINS
      // Read analog pins
      for (uint8_t i = 0; i < uint8_t(sizeof(analog_pins)/sizeof(analog_pins[0])); i++) {
        printMeasurement("analog", String(i), String(analogRead(analog_pins[i])));
      }
    #endif

    ///////////////////////////////////////////////////////////////////////////
    // DS18B20 Temperature Probes
    ///////////////////////////////////////////////////////////////////////////
    #if USE_DS18B20_TEMPERATURE
      // We'll reinitialise the temperature probes each time inside the loop so that
      // devices can be connected/disconnected while running
      thermometers.begin();
      // Temporary variable for storing 1-Wire device addresses
      DeviceAddress address; 
      // Grab a count of temperature probes on the wire
      unsigned int numberOfDevices = thermometers.getDeviceCount();
      // Loop through each device, set requested precision
      for(unsigned int i = 0; i < numberOfDevices; i++) {
        if(thermometers.getAddress(address, i)) {
          thermometers.setResolution(address, 12);
        }
      }
      // Issue a global temperature request to all devices on the bus
      if (numberOfDevices > 0) {
        thermometers.requestTemperatures();
      }
      // Loop through each device, print out temperature data
      for(unsigned int i = 0; i < numberOfDevices; i++) {
        if(thermometers.getAddress(address, i)) {
          printMeasurement("temperature", formatAddress(address), String(thermometers.getTempC(address), 2), "C");
        }
      }
    #endif

    ///////////////////////////////////////////////////////////////////////////
    // BH1750 Lux Meter
    ///////////////////////////////////////////////////////////////////////////
    #if USE_BH1750_LUX
      // Attempt to initialise and read light meter sensor
      if (luxmeter.begin(BH1750_TO_GROUND)) {
        luxmeter.start();
        printMeasurement("lux", "0", String(luxmeter.getLux(), 0), "lux");
      }
    #endif

    ///////////////////////////////////////////////////////////////////////////
    // Fluid Flow Meter
    ///////////////////////////////////////////////////////////////////////////
    #if USE_COUNTER
      unsigned long counter_end_count = counter_count;
      unsigned long counter_end_millis = millis();
      // Total volume in sensor pulses
      printMeasurement("counter_total", "0", String(counter_end_count), "counts");
      // Current flow rate in pulses per minute
      float counter_rate = 1000.0*(counter_end_count - counter_start_count)/(counter_end_millis - counter_start_millis);
      printMeasurement("counter_rate", "0", String(counter_rate, 4), "Hz");
      counter_start_count = counter_end_count;
      counter_start_millis = counter_end_millis;
    #endif

    // Print message end
    Serial.println("]}");
  } else if (current_millis - keepalive_start_millis >= KEEPALIVE_INTERVAL) {
    // Send keepalive packet to maintain serial communications
    keepalive_start_millis = current_millis;
    // Print empty message
    Serial.print("{\"board\":\"" + String(BOARD_ID_STRING) + "\",");
    Serial.print("\"timestamp\":\"" + String(keepalive_start_millis) + "\",");
    Serial.println("\"message\":\"measurement\",\"data\":[]}");
  }
}

Ensure the appropriate sensors are selected in the Arduino code using the define statements, for example:

#define USE_DIGITAL_PINS false
#define USE_ANALOG_PINS false
#define USE_DS18B20_TEMPERATURE true
#define USE_BH1750_LUX false
#define USE_COUNTER true

Connect the thermometer’s VCC to +5 V, GND to ground, and DATA to pin 12 (or another, to match that specified in the code for the 1-Wire bus). You may need a pullup resistor between the VCC and DATA pins if your thermometer modules don’t include one. Connect the flow meter’s VCC to +5 V, GND to ground, and sense wire to pin 7 (or another interrupt-enabled pin, to match that specified in the code for the pulse counter). Plug the Arduino into your computer USB cable.

Recipe

recipes/serial_tc08_csv_file_influxdb.config
[datalogd]

connection_graph =
  digraph {

    // PicoTech TC08 thermocouples
    tc08 [class=PicoTC08DataSource, interval=1.0, probes="[
      [1, 'Reactor1', 'K', 'C'],
      [2, 'Reactor2', 'K', 'C'],
      [4, 'Radiator', 'K', 'C'],
      [5, 'Waterbath', 'K', 'C'],
      [7, 'Exhaust', 'K', 'C'],
      [8, 'Extraction', 'K', 'C']
    ]"];
    tc08_timestamp [class=TimeStampDataFilter];
    // Save locally to CSV file
    tc08_csv [class=CSVDataFilter, header="once"];
    tc08_file [class=FileDataSink, filename="cooling_power_tc08.csv", filename_timestamp=True];

    // Arduino for water temperatures, flow rate, light sensor...
    arduino [class=SerialDataSource];
    // Some processing to compute power dissipated into cooling water
    a_timestamp [class=TimeStampDataFilter];
    // Coffee machine flow meter a=1950, k=0.0965882, x0=0, b=0.721649
    // YF-S401 flow meter a=5975, k=0.173734, x0=0, b=0.284333
    flowrate [class=FlowSensorCalibrationDataFilter, a=1950, k=0.0965882, x0=0, b=0.721649];
    tempcorrect [class=PolynomialFunctionDataFilter, match_keyvals="[['type', 'temperature'], ['id', 'A_2827e0853219013e']]", coeffs="[-0.44, 1.0]", rounding=2];
    coolingpower [class=CoolingPowerDataFilter, temperature_id_in="A_28969c7e321901a7", temperature_id_out="A_2827e0853219013e"];
    // Save locally to CSV file
    a_csv [class=CSVDataFilter, header="once"];
    a_file [class=FileDataSink, filename="cooling_power_arduino.csv", filename_timestamp=True];

    // Display in console
    print [class=LoggingDataSink];

    // Save to remote InfluxDB
    influx [class=InfluxDB2DataSink,
      url="http://localhost:8086",
      token="...APITokenWithWriteAccess...",
      org="organisation_name",
      bucket="bucket_name",
      measurement="measurement_name"
    ];

    // Hook everything up
    tc08 -> tc08_timestamp -> tc08_csv -> tc08_file;
    arduino -> a_timestamp -> flowrate -> tempcorrect -> coolingpower -> a_csv -> a_file;

    tc08_timestamp -> print;
    coolingpower -> print;

    tc08_timestamp -> influx;
    coolingpower -> influx;
  }
../_images/serial_tc08_csv_file_influxdb.svg

The data input and processing steps are:

  • Thermocouple readings are taken from the TC-08 using the PicoTC08DataSource data source. The parameters specify the sampling interval (in seconds) and the types of probes attached. Each probe is specified with the channel it is attached to, a label, thermocouple type, and desired units.
  • The serial data is read in from the Arduino using the SerialDataSource data source. The data contains the temperatures from the thermometers and the pulse counter’s number of pulses per second. The IDs of the temperatures correspond to the serial numbers of the thermometer devices.
  • Timestamps are added to both sources of data using two instances of TimeStampDataFilter.
  • Next, the pulse counter’s pulses-per-second is converted to a flow rate in litres-per-minute using the FlowSensorCalibrationDataFilter. The parameters are used in a calibration curve and have been experimentally determined for a YF-S401 flow sensor.
  • A simple offset is applied to one of the thermometers to correct for a slight difference in readings between the two thermometers. This uses a PolynomialFunctionDataFilter to subtract 0.44 ℃ from the reading.

The data flow is then split to several separate destinations:

  • The data is formatted into a row of comma-separated values and written to a file. This uses a CSVDataFilter feeding in to a FileDataSink. There is a separate file for each of the two data sources.
  • The raw data is displayed on the console using a LoggingDataSink.
  • The raw data is sent to a InfluxDB database instance. Example database details have been used here, and will need to be changed to match your specific database.