Kafka¶
Overview¶
Apache Kafka is a software framework for stream-processing of large amounts of data. It is an open-source software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.
This protocol implementation allows to send the data from the Connectware to an Apache Kafka broker for message processing or to subscribe to a particular topic to receive data.
Currently, the protocol supports authentification by means of SASL and compression of produced messages by using GZIP.
Message assembly¶
The endpoints in this protocol are available for the write
and subscribe
operations only.
To transmit messages to the Kafka broker, a JSON message with the payload as
value
needs to be sent to the respective endpoint. The message may look as
follows when producing messages to the broker:
{
"id": 123321,
"topic": "some_test_topic",
"acks": 1,
"timeout": 3000,
"compression": "None",
"value": [
{
"key": "sample-key 1",
"value": "sample message #1",
"partition": 0,
"headers": {
"foo": "bar"
},
"timestamp": 1633425301000
},
{
"key": "sample-key 2",
"value": "sample message #2",
"partition": 1,
"headers": {
"foo": "bar"
},
"timestamp": 1633425301001
}
]
}
Which upon successful delivery, the following message will be generated on the /res
topic:
{
"id": 123321,
"timestamp": 1633425302878,
"result": {
"value": [
{
"topicName": "some_test_topic",
"partition": 0,
"errorCode": 0,
"baseOffset": "0",
"logAppendTime": "-1",
"logStartOffset": "0"
},
{
"topicName": "some_test_topic",
"partition": 1,
"errorCode": 0,
"baseOffset": "0",
"logAppendTime": "-1",
"logStartOffset": "0"
}
]
}
}
And when delivery is unsuccessful, you will get the following message explaining what happened:
{
"id": 123321,
"timestamp": 1633425495764,
"error": {
"code": -1,
"message": "Connection error: connect ECONNREFUSED 192.168.0.1:9092"
}
}
Upon subscription you will get messages like this:
{
"timestamp": 1649046618574,
"value": {
"topic": "some_test_topic",
"partition": 0,
"key": "totally optional msg key",
"headers": {
},
"message": "test 123 test msg body"
}
}
Commissioning File Specifics¶
The Endpoints for the Kafka connections may only be of the type write
or subscribe
.
Special care must be taken when configuring the connection properties since connections can only take all write endpoints or all subscribe endpoints. But not both at the same time.
To choose the type of connection you are planning to use, you will need to configure the property clientType
on the connection resource.
Further configuration options are listed below:
Connection Properties¶
brokers
(array, required)¶
The host names of the Kafka brokers
The object is an array with all elements of the type string
.
Examples: ["kafka1:9092","kafka2:9092"]
,
["kafka1:9092","kafka2:9092","kafka3:9092"]
clientType
(string, enum)¶
Configure the connection to be a producer for a consumer. Defaults to producer.
This element must be one of the following enum values:
producer
consumer
Default: "producer"
groupId
(string)¶
Only for consumer clients: The group id for the Kafka consumer. It defaults to the id of the Connection resource.
clientId
(string)¶
The client id used when connecting to the broker
Default: "protocol-mapper-kafka"
Example: "connectware_13462"
connectionTimeout
(integer)¶
Time in milliseconds to wait for a successful connection
Default: 1000
requestTimeout
(integer)¶
Time in milliseconds to wait for a successful request
Default: 30000
trustAllCertificates
(boolean)¶
If true, all broker certificates will be accepted, regardless of whether they can be validated or not. Use this option if self-signed server certificates should be accepted, or if there are other reasons which prevent this client to validate the certificates.
Default: false
Examples: true
, false
mutualAuthentication
(boolean)¶
If true, a full certificate chain including client certificate is expected to connect properly with validated certificates.
Default: false
Examples: true
, false
caCert
(string)¶
The root CA certificate as Base64 encoded PEM file content
clientCert
(string)¶
The device certificate as Base64 encoded PEM CRT file content
clientPrivateKey
(string)¶
The device private key as Base64 encoded PEM CRT file content
sasl
(object)¶
SASL stands for Simple Authentication and Security Layer. With this property you can configure the authentification scheme to be used when contacting the Kafka broker.
Properties of the sasl
object:
mechanism
(string, enum, required)¶
The strategy used to carry on authentification
This element must be one of the following enum values:
plain
scram-sha-256
scram-sha-512
username
(string, required)¶
The username used in the authentification procedure
password
(string, required)¶
The password used in the authentification procedure
Endpoint Properties¶
topic
(string, required)¶
Any valid topic name addressing a single data-point
Example: "topic_name"
acks
(number, enum)¶
Control the number of required acknowledgements. -1 = all insync replicas must acknowledge (default); 0 = no acknowledgments; 1 = only waits for the leader to acknowledge
This element must be one of the following enum values:
-1
0
1
Default: -1
timeout
(number)¶
The time to await a response from the broker in milliseconds
Default: 30000
compression
(string, enum)¶
The compression codec used to compress the messages
This element must be one of the following enum values:
None
GZIP
Default: "None"
fromBeginning
(boolean)¶
Only for subscribe: Where to let the consumer start consuming messages. If true, start from the beginning of the topic. If false, start at the end.
Default: false
Sample Commissioning File¶
Download producer example: kafka-example-producer.yml
1---
2# ----------------------------------------------------------------------------#
3# Commissioning File
4# ----------------------------------------------------------------------------#
5# Copyright: Cybus GmbH (2020)
6# Contact: support@cybus.io
7# ----------------------------------------------------------------------------#
8# Source Interface Definition - Kafka broker
9# ----------------------------------------------------------------------------#
10
11description: >
12 Sample commissioning file for communicating with Kafka broker (producer)
13
14metadata:
15 name: Apache Kafka Connectivity
16 icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
17 provider: cybus
18 homepage: https://www.cybus.io
19 version: 0.0.1
20
21parameters:
22 param_brokers:
23 description: Hosts or IP addresses of the Apache Kafka broker
24 type: string
25 default: 192.168.0.1:9092
26
27resources:
28 kafkaConnection:
29 type: Cybus::Connection
30 properties:
31 protocol: Kafka
32 connection:
33 brokers: [
34 !ref param_brokers
35 ]
36
37 kafkaWriteExample:
38 type: Cybus::Endpoint
39 properties:
40 protocol: Kafka
41 connection: !ref kafkaConnection
42 write:
43 topic: some_test_topic
Download consumer example: kafka-example-consumer.yml
1---
2# ----------------------------------------------------------------------------#
3# Commissioning File
4# ----------------------------------------------------------------------------#
5# Copyright: Cybus GmbH (2020)
6# Contact: support@cybus.io
7# ----------------------------------------------------------------------------#
8# Source Interface Definition - Kafka broker
9# ----------------------------------------------------------------------------#
10
11description: >
12 Sample commissioning file for communicating with Kafka broker (consumer)
13
14metadata:
15 name: Apache Kafka Connectivity
16 icon: https://www.cybus.io/wp-content/uploads/2017/10/for-whom1.svg
17 provider: cybus
18 homepage: https://www.cybus.io
19 version: 0.0.1
20
21parameters:
22 param_brokers:
23 description: Hosts or IP addresses of the Apache Kafka broker
24 type: string
25 default: 192.168.0.1:9092
26
27resources:
28 kafkaConnection:
29 type: Cybus::Connection
30 properties:
31 protocol: Kafka
32 connection:
33 clientType: 'consumer'
34 brokers: [
35 !ref param_brokers
36 ]
37
38 kafkaSubscribeExample:
39 type: Cybus::Endpoint
40 properties:
41 protocol: Kafka
42 connection: !ref kafkaConnection
43 subscribe:
44 fromBeginning: false
45 topic: some_test_topic