Rules Objects¶
A rule is used to perform powerful transformations of data messages right inside the Connectware data processing.
Rules are always specified as a list (array). The order is important, as rules are executed from top to bottom. Some rules might have requirements regarding the format of the input message. Some rules may or may not pass on data messages to the next rule or the output, e.g. when the COV (change of value) filter decides whether the input value needs to be passed on or not.
Rules are always configured as named object with properties specific to the rule type.
The following rules are available today:
parse
¶
This rule parses any non-JSON data from input bytes into a JSON object of the chosen type. The resulting JSON object can then be passed on to other rule operations.
Property |
Type |
Required |
---|---|---|
|
Required |
format¶
is required
type:
enum
The value of this property must be equal to one of the below:json
interprets the input bytes as a stringified JSON object which is parsed back into the JSON objectutf8
interprets the input bytes as a utf-8 encoded string and parses this into a stringlatin1
interprets the input bytes as a latin-1 encoded string (ISO-8859-1) and parses this into a stringascii
interprets the input bytes as an ascii encoded string (for 7-bit ascii data only) and parses this into a string. Additionally, supports parsing integer primitives directly.boolean
interprets the first byte of the input message as a boolean value. If this first byte (see note below) is non-zero, true is passed on, otherwise false.int8
interprets the input bytes as a signed 8-bit integer, using exactly 1 byte.int16BE
interprets the input bytes as a signed, big-endian 16-bit integer, using exactly 2 bytes.int16LE
interprets the input bytes as a signed, little-endian 16-bit integer, using exactly 2 bytes.int32BE
interprets the input bytes as a signed, big-endian 32-bit integer, using exactly 4 bytes.int32LE
interprets the input bytes as a signed, little-endian 32-bit integer, using exactly 4 bytes.uint8
interprets the input bytes as an unsigned 8-bit integer, using exactly 1 byte.uint16BE
interprets the input bytes as an unsigned, big-endian 16-bit integer, using exactly 2 bytes.uint16LE
interprets the input bytes as an unsigned, little-endian 16-bit integer, using exactly 2 bytes.uint32BE
interprets the input bytes as an unsigned, big-endian 32-bit integer, using exactly 4 bytes.uint32LE
interprets the input bytes as an unsigned, little-endian 32-bit integer, using exactly 4 bytes.floatBE
interprets the input bytes as a big-endian IEEE 754 float, using exactly 4 bytes.floatLE
interprets the input bytes as a little-endian IEEE 754 float, using exactly 4 bytes.doubleBE
interprets the input bytes as a big-endian IEEE 754 double, using exactly 8 bytes.doubleLE
interprets the input bytes as a little-endian IEEE 754 double, using exactly 8 bytes.toBase64
(alsobase64
) parses a plain text input string into a base64 encoded output stringfromBase64
parses a base64 encoded input string into a plain text output string
For reference: Big-endian byte order (BE, sometimes also called Motorola or network byte order) refers to the storage order where the most significant byte of the value is stored at the smallest memory address, whereas little-endian (LE, sometimes also called Intel byte order) refers to the other way round, where the least-significant byte is stored at the smallest address. (See also: https://en.wikipedia.org/wiki/Endianness )
For more details about the parsing of input bytes into JSON values see also: https://nodejs.org/api/buffer.html
Note about the boolean parsing: If the input message represents an integer value that is larger than one byte (e.g. a int16 or int32 value), you cannot directly parse this into a boolean by this parse rule, because this rule considers only the first byte. Instead, for multi-byte integers you first have to apply a parse rule with the correct integer size, then apply a parse rule for parsing into boolean.
Example:
1description: >
2 A parse rule usage example.
3
4metadata:
5 name: Parse
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: factory/machine/power
14 publish:
15 topic: factory/machine/state
16 rules:
17 - parse:
18 format: boolean
Download: parse.yml
The service installed using the commissioning file above will collect the machine power
status published on the MQTT topic factory/machine/power
and publish it on the
factory/machine/state
, interpreting the input as boolean.
transform
¶
This rule is used to transform payloads into a new format or structure. Expressions are written in the JSONATA language (see http://jsonata.org/). This is a direct transformation, for every input message, exactly one output message is generated.
Property |
Type |
Required |
---|---|---|
expression |
|
Required |
expression¶
The JSONata expression to apply to the given payload. See http://docs.jsonata.org for a full language reference.
Some simple examples:
$
- the input value (payload) in unchanged format, regardless of its type$.value
or just shorthandvalue
- the property value of the input JSON object($.temp - 32) * 5 / 9
- the property temp which denoted a temperature in degrees Fahrenheit, converted to degrees Celsius$number($) * 0.1
- interpreting a non-JSON input value as a number and multiplying it by 0.1
The service installed with the commissioning file below will collect a machine
temperature, given in Fahrenheit, published on the MQTT topic
machine/temperature/fahrenheit
. Then it will convert it to degrees Celsius
and publish it to the topic machine/temperature/celsius
.
1description: >
2 A temperature value transformation example: Fahrenheit to degree Celsius.
3
4metadata:
5 name: Value transform
6
7resources:
8 mapping:
9 type: 'Cybus::Mapping'
10 properties:
11 mappings:
12 - subscribe:
13 topic: "machine/temperature/farenheit"
14 publish:
15 topic: "machine/temperature/celsius"
16 rules:
17 - transform:
18 expression:
19 ( $number($) - 32 ) * 5 / 9
Download: value-transform.yml
There are many functions in JSONata that can be used for all common data types. See http://docs.jsonata.org for a full language reference.
Apart from the payload object and the full set of JSONata functions and data types, the following additional functions and values are defined in the context of the Connectware rule engine:
- $context
contains protocol specific context information. For example for MQTT, this contains the raw MQTT packet with QoS and Retain information and the topic this message has arrived at. For HTTP, this contains the full response object with statusCodes and all headers.
- $source
contains all entries of the source part of this mapping
- $target
contains all entries of the target part of this mapping
$context.vars If using wildcard topics, this gives you access to the field values at the wildcard positions. Example: if you have specified a topic
factory/productionline1/+machineId/#data
you will be able to access the fields$context.vars.machineId
and$context.vars.data
A $context
usage example in the transformation rule:
1description: >
2 This commissioning file is an example of using a $context in a
3 transformation rule to reference a topic path specified with a wildcard.
4
5metadata:
6 name: Context transform
7
8resources:
9 mapping:
10 type: 'Cybus::Mapping'
11 properties:
12 mappings:
13 - subscribe:
14 topic: "factory/+machineId/temperature"
15 publish:
16 topic: "factory/all/temperature"
17 rules:
18 - transform:
19 expression: |
20 {
21 "machine": $context.vars.machineId,
22 "temperature": $
23 }
Download: context-transform.yml
- $last([label])
refers to the last message that has been sent on this rule stream. This function allows you to aggregate consecutive messages.
label is optional. It can be used to refer to other stash points. If omitted, it defaults to the last message that has completed the full rule chain.
Note: When writing expressions using
$last()
and object properties, it must be considered that in JSONATA, the dot operator*.*
is a map operator, not (only) an object dereferencing operator (see JSONATA reference: path operators ). This can lead to unexpected results if the input message is not a plain value but rather an array of values.If the expression using
$last
is not intended to be applied to each input value, potentially the function$lookup
(see JSONATA reference: object functions) may be a more useful choice compared to the dot operator:$lookup($last(), "val")
resulting in theval
property of the object stored in$last
.For example, if the last value stored was
{ time: 1632908050235 }
, and the transform-rule is used with the following expression:{ "previous": $last().time, "time": $millis(), }
If the input value is an array with value
[1, 2, 3]
, the result will unexpectedly contain an array value:{ previous: [ 1632908050235, 1632908050235, 1632908050235, sequence: true ], time: 1632908154336 }
Instead, the dereferencing of the
.time
property can be done using$lookup
, like so:{ "previous": $lookup($last(), "time"), "time": $millis() }
This yields the expected result:
{ "previous": 1632908050235, "time": 1632908154336 }
In addition to the default Jsonata Functions, the following special functions are available:
- $encodeBitmask(array)
takes an array of boolean values and converts into a bitmask. Returns a number
- Sample:
[ true, false, false, true, false ] 2^0 2^1 2^2 2^3 2^4 = 9 ( 00001001 )
Attention: first value in the array will be the right-most bit
- $decodeBitmask(number, array)
takes a number and an array of strings as arguments. Returns an object with keys taken from the arrays and values being booleans that result from the bitmask of the key-position being applied to the number
- Sample:
$decodeBitmask(9, ["ok", "alarm", "door", "powerOn", "warning"]) 00001001 -> 1 0 0 1 0 { "ok": true, "alarm": false, "door": false, "powerOn": true, "warning": false }
- $bitmask(number, index)
returns a boolean value depending on the bit at position <index> in <number> being set
- Sample:
$bitmask(9, 3) -> true $bitmask(9, 2) -> false
filter
¶
The Filter rule breaks or forwards the message based on the result
of an expression. The expression is also written in JSONATA language. This
rule does not modify the message in any way. It breaks the message
flow, if the expression evaluates to a false value (false
, 0
or
''
)
Property |
Type |
Required |
---|---|---|
expression |
|
Required |
expression¶
Must be a JSONata expression that evaluates to a boolean, but can use the full JSONata function set, see expression.
Example:
1description: >
2 A filter rule usage example.
3
4metadata:
5 name: Filter
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: factory/machine/temperature
14 publish:
15 topic: 'factory/warning/overheat'
16 rules:
17 - filter:
18 expression: $number($) > 95
Download: filter.yml
The service installed using the commissioning file above will publish on an
overheat warning topic /factory/warning/overheat
if the machine temperature
in the factory exceeds the value of 95 degrees.
Everything below this value will be filtered out.
setContextVars
¶
This rule will store values as context variables. This can be used to pass data from one rule step to a later. Some target protocols (like MQTT and HTTP) also support using those context variables in the output topics or output properties. This can be used to implement MQTT topics depending on the actual data, or dynamic HTTP headers. It is also useful when working with named wildcards.
The rule must be configured with a vars
property that lists each identifier.
For each identifier, a JSONATA expression is specified that has the same context
variables available as the Transform and Filter rule.
Property |
Type |
Required |
---|---|---|
vars |
|
Required |
setMissingAsUndefined |
|
Optional, default: false |
vars¶
An object of key: expression
properties where key
is any identifier to be
stored as a context variable, and expression
is the JSONata expression whose
evaluated value is assigned to the context variable key
.
setMissingAsUndefined (default: false
)¶
Chooses the behaviour if configured context variable cannot be set from the
input message values. If true, any missing context variables from the vars
object are set to the string undefined
, and the rule engine and output
publish continues as normal. If false, the input message is discarded and an
error message is logged, like so: Failed processing setContextVars, because:
setContextVars rule should set value “contextKey” from property “someKey” but it
is missing in the message: { “wrongKey”: “123” }.
Example¶
The following example takes the productionLine
and machineId
wildcard
from the source subscription and creates a concatenated string to be stored in
the context variable identifier
. This variable can be used in the target
publish topic as another wildcard name.
1description: >
2 A setContextVars rule usage example.
3
4metadata:
5 name: setContextVars
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: factory/+productionLine/+machineId/temperature
14 publish:
15 topic: factory/temperatures
16 rules:
17 - setContextVars:
18 vars:
19 identifier: |
20 $join([
21 $context.vars.productionLine,
22 $context.vars.machineId
23 ], '-')
24 - transform:
25 expression: |
26 {
27 "id": $context.vars.identifier,
28 "temperature": $
29 }
Download: setcontextvars.yml
cov
¶
Simple Change-of-Value (COV) filter that only forwards data when it has changed compared to the previous message. The filter does not distinguish different input topics and works on all messages of a rule chain and output topic as if they belonged to the same source.
Supports only looking at one specified key (e.g. to avoid confusion by timestamps) and deadbands (to filter noise). Expects JSON input format. Usually outputs less messages than it receives.
Property |
Type |
Required |
---|---|---|
|
Required |
|
|
Optional |
|
|
Optional |
|
label |
|
Optional |
key¶
The name of the JSON property to filter. Currently, this filter only supports a single key, either as top-level property of the JSON object (when given as a string) or as a property inside some nested object in the JSON object (when given as an array of string, which specify the property path). You can chain multiple cov rule definitions to look at multiple keys.
Example for filtering any message { "temperature": 12.34 }
for changing
value of the temperature property:
1description: >
2 A COV on change rule example.
3
4metadata:
5 name: COV on change
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: 'machine/sensor/raw'
14 publish:
15 topic: 'machine/sensor/changed'
16 rules:
17 - cov:
18 key: temperature
Download: cov-on-change.yml
If a list of keys is specified, they are treated as a deep path to a JSON object property. If the path is not resolvable, because on item of the path does not exist in a message, no error is thrown but all input messages are ignored. The COV filter will accept and forward only those messages on which the path is resolvable.
Example for filtering a message such as { "values": [ { "temperature": 85.3 }
] }
according to the temperature property:
1description: >
2 A COV on change rule with deep path key example.
3
4metadata:
5 name: COV on change with deep path
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: 'machine/sensor/raw'
14 publish:
15 topic: 'machine/sensor/changed'
16 rules:
17 - cov:
18 key:
19 - values
20 - 0
21 - temperature
Download: cov-on-change-deep-key.yml
(Note: Integer numbers such as the index 0
above can be written either as
plain integer values, or within quotes '0'
so that they are interpreted as
strings. Both is allowed.)
deadband¶
for numeric data, you can specify a deadband that must be exceeded before a change is detected. If this parameter is omitted, the filter compares for exact matches.
deadbandMode¶
value values are relative
and absolute
. In relative mode, the
deadband parameter is treated as a percentage, thus if the parameter is 10,
the next value is only released if it deviates by more than 10% from the
previous value. Default is absolute
.
label¶
The Cov filter looks up the stash to compare the current value with the last one. By default, the CoV Filter compares the current input value with the last value that was let through by this filter. You can let the filter compare the value to another stored message by specifying a label, either a label you have specified yourself in another stash, or you can use the labels input or lastOutput or lastInput, each being set automatically at the beginning and the end of the rule engine chain. If custom labels are used, be aware that you need to explicitely store them with a stash rule, otherwise the filter will let everything through.
An example of deadband, which apply the COV rule on a temperature change over 10%, is shown below:
1description: >
2 A COV on percentage change rule example.
3
4metadata:
5 name: COV on percentage change
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: 'machine/sensor/raw'
14 publish:
15 topic: 'machine/sensor/changed'
16 rules:
17 - cov:
18 key: temperature
19 deadband: 10
20 deadbandMode: relative
Download: cov-on-percentage-change.yml
burst
¶
Simple burst-mode that allows aggregation of many messages into a payload array. This operates on a time interval or on a fixed size. It collects all messages that arrive, joins them into an array (with their respective timestamps) and forwards them as a single message. Interval and size constraints can be combined. If none of them is specified, the rule has no effect.
Property |
Type |
Required |
---|---|---|
|
Optional |
|
|
Optional |
|
label |
|
Optional |
interval¶
Time interval in milliseconds to send out the combined message after its first message was received.
(Note: If no message at all has been received, no output message will be sent, too. This option does not implement a regular time-based output message for the case when no input data arrives.)
maxSize¶
Maximum number of messages in the burst aggregation. The aggregation message will be forwarded once this number of messages have arrived.
If both maxSize and interval were specified, the combined message will be sent according to whatever criterion is fulfilled first. This probably means the output message triggered by the interval criterion contains less array elements than the output message triggered by the maxSize criterion.
label¶
The burst rule stores the current burst aggregation in the stash using a label.
If more than one aggregation is used, different labels must be assigned by the
user. The default is burst
The example service commissioning file below will collect every message
published on the MQTT topic factory/machine/temperature/single
in a buffer.
Once the number of messages reaches the configured maxSize
or the time since
last publishing of the buffer reaches the configured interval
an array of
all the collected values is published to factory/machine/temperature/combined
.
1description: >
2 A burts rule example.
3
4metadata:
5 name: Burst
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: 'factory/machine/temperature/single'
14 publish:
15 topic: 'factory/machine/temperature/combined'
16 rules:
17 - burst:
18 interval: 300000
19 maxSize: 100
Download: burst.yml
stash
¶
This rule can be used to stash intermediate states of messages for later reference. These can be refered to by filter or transform rule. This may for example be used when Cov and Burst rule are used in combination.
Property |
Type |
Required |
---|---|---|
label |
|
Optional |
label¶
This label is used to refer to this stash-point. This needs to be unique
only within one rules statement. It is no problem to use the same label in
multiple rules statements. The default is stash
As the stash rule is used only to store intermediate states for later reference it can’t be used on its own. Instead another rule provide values to this rule like in the following example:
1description: >
2 A stash usage example
3
4metadata:
5 name: Stash
6
7resources:
8 mapping:
9 type: 'Cybus::Mapping'
10 properties:
11 mappings:
12 - subscribe:
13 topic: "maschine/raw/temperature"
14 publish:
15 topic: "machine/enhanced/temperature"
16 rules:
17 - transform:
18 expression: |
19 {
20 "previous": $last("cache").current,
21 "current": current
22 }
23 - stash:
24 label: cache
Download: stash.yml
The service installed using the commissioning file above will collect the current
temperature published on the MQTT topics factory/machine/raw/temperature
,
and will publish it, enhanced with the value of the previous measurement,
to the factory/machine/enhanced/temperature
.
collect
¶
This rule can be used to collect values from multiple endpoints configured in
the subscribe
property of a Cybus::Mapping
and publish them together in
a single object. The rule will function as a Last Value Cache, so that the
output object contains a key for each of the subscribed endpoints/topics,
potentially using its respective label property if it
was set.
The last message on each of the subscribe entries will be sent to the output. At start-up, the output object starts empty, and keys will be added as they arrive, one after the other.
This rule does not take any property. Due to YAML format rules, you must write the rule together with the expression for an empty object {}, like so:
1description: >
2 A collect rule usage example.
3
4metadata:
5 name: Collect
6
7resources:
8 mapping:
9 type: Cybus::Mapping
10 properties:
11 mappings:
12 - subscribe:
13 topic: factory/+machine/+sensor
14 label: "{machine}.{sensor}"
15 publish:
16 topic: 'factory/data'
17 rules:
18 - collect: {}
Download: collect.yml
The service installed using the commissioning file above will collect the data
published on all MQTT topics factory/+machine/+sensor
and publish it on
the factory/data
using the label machine.sensor
.
The example uses dynamic label which means the label is going to be constructed from the values of the actual topic to which data was published too.