# Kafka Sink
The sink will publish the result into a Kafka .
# Compile & deploy plugin
# build in shell
# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt
2
3
4
5
6
# build with image
docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp 90eae15a7245:/workspace/_plugins/debian/sinks /tmp
2
3
Dockerfile like this:
## plase check go version that kuiper used
ARG GO_VERSION=1.18.5
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]
2
3
4
5
6
7
8
add this in Makefile:
PLUGINS_CUSTOM := sinks/kafka
.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)
$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
@$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)
2
3
4
5
6
7
8
9
Restart the eKuiper server to activate the plugin.
# Properties
Property name | Optional | Description |
---|---|---|
brokers | false | The broker address list ,split with "," |
topic | false | The topic of the Kafka |
saslAuthType | false | The Kafka sasl authType, support none,plain,scram |
saslUserName | true | The sasl user name |
saslPassword | true | The sasl password |
insecureSkipVerify | true | whether to ignore SSL verification |
certificationPath | true | Kafka client ssl verification crt file path |
privateKeyPath | true | Key file path for Kafka client SSL verification |
rootCaPath | true | Kafka client ssl verified ca certificate file path |
maxAttempts | true | The number of retries the Kafka client sends messages to the server, the default is 1 |
batchSize | true | The number of messages in a single batch sent by the Kafka client to the server, the default is 1 |
key | true | Key information carried by the Kafka client in messages sent to the server |
headers | true | The header information carried by the Kafka client in the message sent to the server |
# Setting Kafka Key and Headers
Set the metadata when the Kafka client sends messages through keys and headers:
{
"key": "keyValue",
"headers": {
"headerKey1": "headerValue1",
"headerKey2": "headerValue2"
}
}
2
3
4
5
6
7
Through the template template, dynamically set the metadata when the Kafka client sends a message:
{
"key": "{{.data.key}}",
"headers": {
"headerKey1": "{{.data.col1}}",
"headerKey2": "{{.data.col2}}"
}
}
2
3
4
5
6
7
Set the key metadata of the map structure in the Kafka client:
{
"key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}
2
3
Other common sink properties are supported. Please refer to the sink common properties for more information.
# Sample usage
Below is a sample for selecting temperature great than 50 degree, and some profiles only for your reference.
# /tmp/kafkaRule.txt
{
"id": "kafka",
"sql": "SELECT * from demo_stream where temperature > 50",
"actions": [
{
"log": {}
},
{
"kafka":{
"brokers": "127.0.0.1:9092,127.0.0.2:9092",
"topic": "test_topic",
"saslAuthType": "none"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# /tmp/kafkaPlugin.txt
{
"file":"http://localhost:8080/kafka.zip"
}
2
3
# Notice
If ekuiper and kafka are deployed in the same container network through docker compose, you can configure the brokers address through the kafka hostname in ekuiper. But kafka needs special attention KAFKA_CFG_ADVERTISED_LISTENERS
needs to be configured as the host IP address, as shown below
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.4
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://<YOUR_HOST_IP>:9092
depends_on:
- zookeeper
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25