Kafka 目标(Sink)
该插件将分析结果发送到 Kafka 中。
编译插件&创建插件
本地构建
shell
# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt
镜像构建
shell
docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp 90eae15a7245:/workspace/_plugins/debian/sinks /tmp
Dockerfile 如下所示:
dockerfile
## plase check go version that kuiper used
ARG GO_VERSION=1.23.1
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]
在 Makefile 中添加:
dockerfile
PLUGINS_CUSTOM := sinks/kafka
.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)
$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
@$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)
重新启动 eKuiper 服务器以激活插件。
属性
属性名称 | 是否可选 | 说明 |
---|---|---|
brokers | 否 | broker地址列表 ,用 "," 分割 |
topic | 否 | kafka 主题 |
saslAuthType | 否 | sasl 认证类型 , 支持none,plain,scram |
saslUserName | 是 | sasl 用户名 |
saslPassword | 是 | sasl 密码 |
insecureSkipVerify | 是 | 是否忽略 SSL 验证 |
certificationPath | 是 | Kafka 客户端 ssl 验证的 crt 文件路径 |
privateKeyPath | 是 | Kafka 客户端 ssl 验证的 key 文件路径 |
rootCaPath | 是 | Kafka 客户端 ssl 验证的 ca 证书文件路径 |
certficationRaw | 是 | Kafka 客户端 ssl 验证,经过 base64 编码过的的 crt 原文, 如果同时定义了 certificationPath 将会先用该参数。 |
privateKeyRaw | 是 | Kafka 客户端 ssl 验证,经过 base64 编码过的的 key 原文, 如果同时定义了 privateKeyPath 将会先用该参数。 |
rootCARaw | 是 | Kafka 客户端 ssl 验证,经过 base64 编码过的的 ca 原文, 如果同时定义了 rootCAPath 将会先用该参数。 |
maxAttempts | 是 | Kafka 客户端向 server 发送消息的重试次数,默认为1 |
requiredACKs | 是 | Kafka 客户端确认消息的机制,-1 代表等待 leader 确认,1 代表等待所有副本确认, 0 代表不等待确认, 默认为 -1 |
key | 是 | Kafka 客户端向 server 发送消息所携带的 Key 信息 |
headers | 是 | Kafka 客户端向 server 发送消息所携带的 headers 信息 |
compression | 是 | Kafka 客户端向 server 发送消息时是否开启压缩,仅支持 gzip ,snappy ,lz4 ,zstd |
其他通用的 sink 属性也支持,请参阅公共属性。
你可以通过 api 的方式提前检查对应 sink 端点的连通性: 连通性检查
设置 key 和 headers
通过 key 和 headers 设置 Kafka 客户端发送消息时的元数据:
json
{
"key": "keyValue",
"headers": {
"headerKey1": "headerValue1",
"headerKey2": "headerValue2"
}
}
通过 template 模板,动态设置 Kafka 客户端发送消息时的元数据:
json
{
"key": "{{.data.key}}",
"headers": {
"headerKey1": "{{.data.col1}}",
"headerKey2": "{{.data.col2}}"
}
}
在 Kafka 客户端中设置 map 结构的 key 元数据:
json
{
"key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}
示例用法
下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。
/tmp/kafkaRule.txt
json
{
"id": "kafka",
"sql": "SELECT * from demo_stream where temperature > 50",
"actions": [
{
"log": {}
},
{
"kafka":{
"brokers": "127.0.0.1:9092,127.0.0.2:9092",
"topic": "test_topic",
"saslAuthType": "none"
}
}
]
}
/tmp/kafkaPlugin.txt
json
{
"file":"http://localhost:8080/kafka.zip"
}
注意事项
如果通过 docker compose 将 ekuiper 与 kafka 部署在同一容器网络中,可在 ekuiper 中通过 kafka 主机名配置 brokers 地址。 但是 kafka 需要特别注意 KAFKA_CFG_ADVERTISED_LISTENERS
需要配置为主机 IP 地址, 如下所示
yaml
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.4
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://122.9.166.75:9092
depends_on:
- zookeeper