Skip to content

Kafka 目标(Sink)

该插件将分析结果发送到 Kafka 中。

编译插件&创建插件

本地构建

shell
# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt

镜像构建

shell
docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp  90eae15a7245:/workspace/_plugins/debian/sinks /tmp

Dockerfile 如下所示:

dockerfile
## plase check go version that kuiper used
ARG GO_VERSION=1.23.1
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]

在 Makefile 中添加:

dockerfile
PLUGINS_CUSTOM := sinks/kafka

.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)

$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
  @$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)

重新启动 eKuiper 服务器以激活插件。

属性

属性名称是否可选说明
brokersbroker地址列表 ,用 "," 分割
topickafka 主题
saslAuthTypesasl 认证类型 , 支持none,plain,scram
saslUserNamesasl 用户名
saslPasswordsasl 密码
insecureSkipVerify是否忽略 SSL 验证
certificationPathKafka 客户端 ssl 验证的 crt 文件路径
privateKeyPathKafka 客户端 ssl 验证的 key 文件路径
rootCaPathKafka 客户端 ssl 验证的 ca 证书文件路径
certficationRawKafka 客户端 ssl 验证,经过 base64 编码过的的 crt 原文, 如果同时定义了 certificationPath 将会先用该参数。
privateKeyRawKafka 客户端 ssl 验证,经过 base64 编码过的的 key 原文, 如果同时定义了 privateKeyPath 将会先用该参数。
rootCARawKafka 客户端 ssl 验证,经过 base64 编码过的的 ca 原文, 如果同时定义了 rootCAPath 将会先用该参数。
maxAttemptsKafka 客户端向 server 发送消息的重试次数,默认为1
requiredACKsKafka 客户端确认消息的机制,-1 代表等待 leader 确认,1 代表等待所有副本确认, 0 代表不等待确认, 默认为 -1
keyKafka 客户端向 server 发送消息所携带的 Key 信息
headersKafka 客户端向 server 发送消息所携带的 headers 信息
compressionKafka 客户端向 server 发送消息时是否开启压缩,仅支持 gzip,snappy,lz4,zstd

其他通用的 sink 属性也支持,请参阅公共属性

你可以通过 api 的方式提前检查对应 sink 端点的连通性: 连通性检查

设置 key 和 headers

通过 key 和 headers 设置 Kafka 客户端发送消息时的元数据:

json
{
    "key": "keyValue",
    "headers": {
        "headerKey1": "headerValue1",
        "headerKey2": "headerValue2"
    }
}

通过 template 模板,动态设置 Kafka 客户端发送消息时的元数据:

json
{
    "key": "{{.data.key}}",
    "headers": {
        "headerKey1": "{{.data.col1}}",
        "headerKey2": "{{.data.col2}}"
    }
}

在 Kafka 客户端中设置 map 结构的 key 元数据:

json
{
  "key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}

示例用法

下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。

/tmp/kafkaRule.txt

json
{
  "id": "kafka",
  "sql": "SELECT * from demo_stream where temperature > 50",
  "actions": [
    {
      "log": {}
    },
    {
      "kafka":{
        "brokers": "127.0.0.1:9092,127.0.0.2:9092",
        "topic": "test_topic",
        "saslAuthType": "none"
      }
    }
  ]
}

/tmp/kafkaPlugin.txt

json
{
  "file":"http://localhost:8080/kafka.zip"
}

注意事项

如果通过 docker compose 将 ekuiper 与 kafka 部署在同一容器网络中,可在 ekuiper 中通过 kafka 主机名配置 brokers 地址。 但是 kafka 需要特别注意 KAFKA_CFG_ADVERTISED_LISTENERS 需要配置为主机 IP 地址, 如下所示

yaml
    zookeeper:
     image: docker.io/bitnami/zookeeper:3.8
     hostname: zookeeper
     container_name: zookeeper
     ports:
      - "2181:2181"
     volumes:
      - "zookeeper_data:/bitnami"
     environment:
       - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
     image: docker.io/bitnami/kafka:3.4
     hostname: kafka
     container_name: kafka
     ports:
      - "9092:9092"
     volumes:
      - "kafka_data:/bitnami"
     environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://122.9.166.75:9092
     depends_on:
      - zookeeper