InfluxDB¶
This protocol implementation provides an abstraction layer for connecting to InfluxDB which is a time series database.
As a time series database InfluxDB store data in collections called ‘measurements’ and each ‘measurement’ stores ‘points’ which in turn are formed by a timestamp, a value, fields and tags to achieve different groupings.
This version of the protocol supports the Flux query language for InfluxDB.
Below you can see configuration parameters and examples on how to build a commissioning file for this protocol.
Connection Properties¶
scheme
(string, enum)¶
Transport scheme to be used
This element must be one of the following enum values:
http
https
Default: "http"
Example: "https"
host
(string, required)¶
The HTTP host of the InfluxDB server
Example: "connectware"
port
(integer)¶
The port of the InfluxDB server
Default: 8086
Example: 8086
token
(string)¶
An InfluxDB authentication token
Default: ""
Example: "an-influx-auth-token"
probeInterval
(integer)¶
Time interval to check if connection is still there, in milliseconds
Default: 2000
Additional restrictions:
Minimum:
1000
timeout
(integer)¶
Connection timeout, in milliseconds
Default: 10000
Example: 10000
Additional restrictions:
Minimum:
1000
transportOptions
(object)¶
Additional connection options for InfluxDB
Default:
{}
org
(string)¶
Organization for InfluxDB. An Organization is a workspace for a group of users, all objects stored in Influxdb belong to an organization
Default: "generic"
bucket
(string)¶
InfluxDB bucket
Default: "measurement_data"
precision
(string, enum)¶
Defines the precision to use for timestamps for InfluxDB
This element must be one of the following enum values:
ns
us
ms
s
Default: "ms"
maxRetryDelay
(integer)¶
Maximum delay between write retries in milliseconds
Default: 180000
Example: 180000
minRetryDelay
(integer)¶
Minimum delay between write retries in milliseconds
Default: 5000
Example: 5000
retryJitter
(integer)¶
Random value of up to Retry Jitter is added when attempting the next retry
Default: 200
Example: 200
maxBufferLines
(integer)¶
Maximum size of the buffer for items that could not be sent in a previous write
Default: 100000
Example: 100000
maxRetries
(integer)¶
Maximum number of retries to attempt a write
Default: 3
Example: 3
exponentialBase
(integer)¶
Exponential base of the Retry Jitter used when computing the next delay
Default: 5
Example: 5
batchSize
(integer)¶
Maximum points/line to send in a single batch to InfluxDB
Default: 1000
Example: 1000
flushInterval
(integer)¶
Maximum time in milliseconds to keep points in an unflushed batch
Default: 10000
Example: 10000
Additional restrictions:
Minimum:
1
connectionStrategy
(object)¶
If a connection attempt fails, retries will be performed with increasing delay (waiting time) in between. The following parameters control how these delays behave.
Properties of the connectionStrategy
object:
initialDelay
(integer)¶
Delay (waiting time) of the first connection retry (in milliseconds). For subsequent retries, the delay will be increased according to the parameter incrementFactor which has a default value of 2.
Default: 1000
Additional restrictions:
Minimum:
1000
maxDelay
(integer)¶
Maximum delay (waiting time) to wait until the next retry (in milliseconds). The delay (waiting time) for any subsequent connection retry will not be larger than this value. Must be strictly greater than initialDelay.
Default: 30000
incrementFactor
(integer)¶
The factor used to increment initialDelay up to maxDelay. For example if initialDelay is set to 1000 and maxDelay to 5000 the values for the delay would be 1000, 2000, 4000, 5000.
Default: 2
Additional restrictions:
Minimum:
2
Endpoint Properties¶
query
(string)¶
The InfluxDB Flux query used to read or subscribe to measurements. This property is only used for read or subscribe endpoints
Example:
"from(bucket:\"my-bucket\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \"temperature\")"
interval
(integer)¶
The amount of milliseconds between queries. If not provided defaults to 1000 milliseconds
cronExpression
(string)¶
The Cron expression used to poll the endpoint. (For examples, see: https://github.com/node-cron/node-cron)
Examples: "1,2,4,5 * * * *"
, "1-5 * * * *"
, "*/2 * * * *"
,
"* * * January,September Sunday"
measurement
(string)¶
The name of the measurement on which to store data points when writing data. This property can be overriden by providing the measurement in the data of a message, but it has to be set in at least one of the two places.
Example: "exhaust_temperature"
measurementPrefix
(string)¶
An optional prefix for the measurement name that it is prepend to it.
Example: "engine_"
fields
(object)¶
An optional object made up of key value pairs. These constant fields are merged with the fields from the message.
Reading Data¶
To read data from InfluxDB, an endpoint has to be defined with either read or subscribe properties; a valid Flux query needs to be provided as part of the endpoint configuration using the field query. Subscribe works by defining a polling interval, hence the query will be executed on a regular basis. The result of the query is provided in JSON format on the MQTT broker.
The provided Flux query supports value interpolation by using tags of the type ‘@tag’.
from(bucket:”@bucket”) |> range(start: @startMeasurementTime) |> filter(fn: (r) => r._measurement == “@measurement”)
In the previous example requesting a read providing values for ‘bucket’, ‘measurement’ and ‘startMeasurementTime’ will generate a valid Flux query.
Output Format on Read¶
When data is read from InfluxDB results are published to the /res topic of the Endpoint. When a subscription is configured the results are published to the Endpoint default topic.
The output in both cases will be provided as a JSON array representing the InfluxDB query result.
[
{
"result": "_result",
"table": 0,
"_start": "2021-02-14T09:29:24.514083303Z",
"_stop": "2021-02-15T09:29:24.514083303Z",
"_time": "2021-02-15T09:29:06.059Z",
"_value": 19.7,
"_field": "value",
"_measurement": "temperature",
},
{
"result": "_result",
"table": 0,
"_start": "2021-02-14T09:29:24.514083303Z",
"_stop": "2021-02-15T09:29:24.514083303Z",
"_time": "2021-02-15T09:29:06.059623817Z",
"_value": 21.3,
"_field": "value",
"_measurement": "temperature",
},
]
Writing Data¶
To write data to InfluxDB, an endpoint with a write property has to be defined. If a measurement property is set, it is used by default for all data points sent to the endpoint. This property can also be overridden by providing a property measurement in the data message.
To write data you must send an MQTT message, to the /set topic of the Endpoint, like the following (with a measurement property set in the data message):
{
"tags": { "rpm": "8000", "oil_temp": "250" },
"value": 91,
"fields": { "engine_number": 1 },
"timestamp": 1813127450710,
"measurement": "temperature"
}
Both tags and fields are supported and if the timestamp is not present it is assigned by InfluxDB.
It is also possible to write several data points per message. To do this, they just have to be sent as an array.
Note that, respecting InfluxDB client design, the writes are asynchronous and data is written to influx based on the value of the InfluxDB connection property flushInterval which by default is 10 seconds.
Output Format on Write¶
When data is written to an InfluxDB Endpoint a message is published to the /res topic of the Endpoint. The output message is an object with two properties:
timestamp: is the unix timestamp, in milliseconds, of when the write was executed
value: is set to true when the write was successful
Sample Commissioning file¶
Download: influxdb-example.yml
1description: |
2 Sample InfluxDB commissioning file
3
4metadata:
5 name: Cybus InfluxDB Example
6 provider: cybus
7 homepage: https://cybus.io
8 version: 1.0.0
9
10#------------------------------------------------------------------------------
11# Parameters
12#------------------------------------------------------------------------------
13
14parameters:
15 influxHost:
16 type: string
17 description: "HTTP address of InfluxDB server"
18 default: "influxdbhost"
19
20 influxPort:
21 type: integer
22 description: "Influx Port"
23 default: 8086
24
25 influxScheme:
26 type: "string"
27 description: "Either use http or https for the server url"
28 default: "http"
29
30#------------------------------------------------------------------------------
31# Resources
32#------------------------------------------------------------------------------
33resources:
34
35 #----------------------------------------------------------------------------
36 # Connections
37 #----------------------------------------------------------------------------
38
39 influxdbConnection:
40 type: Cybus::Connection
41 properties:
42 protocol: Influxdb
43 connection:
44 host: !ref influxHost
45 token: "-an-influx-db-jwt-token-"
46 port: !ref influxPort
47 bucket: turbine
48 scheme: !ref influxScheme
49 flushInterval: 5000
50
51 #----------------------------------------------------------------------------
52 # Endpoints
53 #----------------------------------------------------------------------------
54
55 turbineWrite:
56 type: Cybus::Endpoint
57 properties:
58 protocol: Influxdb
59 connection: !ref influxdbConnection
60 write:
61 measurement: "turbine"
62
63 rotaryEncoderWrite:
64 type: Cybus::Endpoint
65 properties:
66 protocol: Influxdb
67 connection: !ref influxdbConnection
68 write:
69 measurement: "rotary_encoder"
70
71 rotary_encoder_angle:
72 type: Cybus::Endpoint
73 properties:
74 protocol: Influxdb
75 connection: !ref influxdbConnection
76 subscribe:
77 interval: 6000
78 query: >
79 from(bucket:"turbine") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "rotary_encoder")
80
81 #----------------------------------------------------------------------------
82 # Mappings
83 #----------------------------------------------------------------------------
84
85 # A mapping that overrides the measurement value with the name of the MQTT topic
86 mappings:
87 type: Cybus::Mapping
88 properties:
89 mappings:
90 - subscribe:
91 topic: 'turbine/#'
92 publish:
93 endpoint: !ref turbineWrite
94 rules:
95 - transform:
96 # Add the topic as measurement
97 expression: '$merge([$,{"measurement": $context.topic}])'
98 # A mapping that will pass data from the MQTT topic to the write endpoint
99 # allowing overriding the measurement by providing it in the topic
100 - subscribe:
101 topic: 'encoder/#'
102 publish:
103 endpoint: !ref rotaryEncoderWrite