Skip to content

Kafka 目标(Sink)

该插件将分析结果发送到 Kafka 中。

编译插件&创建插件

本地构建

shell
# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt

镜像构建

shell
docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp  90eae15a7245:/workspace/_plugins/debian/sinks /tmp

Dockerfile 如下所示:

dockerfile
## plase check go version that kuiper used
ARG GO_VERSION=1.25.4
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]

在 Makefile 中添加:

dockerfile
PLUGINS_CUSTOM := sinks/kafka

.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)

$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
  @$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)

重新启动 eKuiper 服务器以激活插件。

属性

属性名称是否可选说明
connectionSelector重用选中的 Kafka 连接。设置该参数后,brokers、SASL、TLS 等 Kafka 连接相关配置会从选中的连接中复制。
brokersbroker 地址列表,用 "," 分割。未设置 connectionSelector 时必填。
topickafka 主题
saslAuthTypesasl 认证类型 , 支持none,plain,scram
saslUserNamesasl 用户名
passwordsasl 密码
insecureSkipVerify是否忽略 SSL 验证
certificationPathKafka 客户端 ssl 验证的 crt 文件路径
privateKeyPathKafka 客户端 ssl 验证的 key 文件路径
rootCaPathKafka 客户端 ssl 验证的 ca 证书文件路径
certficationRawKafka 客户端 ssl 验证,经过 base64 编码过的的 crt 原文, 如果同时定义了 certificationPath 将会先用该参数。
privateKeyRawKafka 客户端 ssl 验证,经过 base64 编码过的的 key 原文, 如果同时定义了 privateKeyPath 将会先用该参数。
rootCARawKafka 客户端 ssl 验证,经过 base64 编码过的的 ca 原文, 如果同时定义了 rootCAPath 将会先用该参数。
maxAttemptsKafka 客户端向 server 发送消息的重试次数,默认为1
requiredACKsKafka 客户端确认消息的机制,1 代表等待 leader 确认,-1 代表等待所有副本确认, 0 代表不等待确认, 默认为 1
keyKafka 客户端向 server 发送消息所携带的 Key 信息
headersKafka 客户端向 server 发送消息所携带的 headers 信息
compressionKafka 客户端向 server 发送消息时是否开启压缩,仅支持 gzip,snappy,lz4,zstd
batchBytes设置 Kafka 客户端向 server 发送 batch 消息的最大 byte, 默认为 1048576

其他通用的 sink 属性也支持,请参阅公共属性

你可以通过 api 的方式提前检查对应 sink 端点的连通性: 连通性检查

连接重用

可以创建 Kafka 连接,并在 Kafka sink 中通过 connectionSelector 重用其中的连接相关配置。Kafka 连接会 ping 配置的 broker 并管理连接状态。Kafka sink 会复制选中连接的配置,并创建自己的 Kafka producer 用于发送消息。

创建 Kafka 连接:

shell
POST http://localhost:9081/connections
{
  "id": "kafka-1",
  "typ": "kafka",
  "props": {
    "brokers": "127.0.0.1:9092",
    "saslAuthType": "none"
  }
}

在 Kafka sink 中使用该连接:

json
{
  "id": "kafka",
  "sql": "SELECT * FROM demo_stream",
  "actions": [
    {
      "kafka": {
        "connectionSelector": "kafka-1",
        "topic": "test_topic"
      }
    }
  ]
}

设置 connectionSelector 后,sink action 中直接配置的连接相关参数会被忽略,包括 brokerssaslAuthTypesaslUserNamepasswordinsecureSkipVerify 以及 TLS 证书相关参数。

设置 key 和 headers

通过 key 和 headers 设置 Kafka 客户端发送消息时的元数据:

json
{
    "key": "keyValue",
    "headers": {
        "headerKey1": "headerValue1",
        "headerKey2": "headerValue2"
    }
}

通过 template 模板,动态设置 Kafka 客户端发送消息时的元数据:

json
{
    "key": "{{.data.key}}",
    "headers": {
        "headerKey1": "{{.data.col1}}",
        "headerKey2": "{{.data.col2}}"
    }
}

在 Kafka 客户端中设置 map 结构的 key 元数据:

json
{
  "key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}

示例用法

下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。

/tmp/kafkaRule.txt

json
{
  "id": "kafka",
  "sql": "SELECT * from demo_stream where temperature > 50",
  "actions": [
    {
      "log": {}
    },
    {
      "kafka":{
        "brokers": "127.0.0.1:9092,127.0.0.2:9092",
        "topic": "test_topic",
        "saslAuthType": "none"
      }
    }
  ]
}

/tmp/kafkaPlugin.txt

json
{
  "file":"http://localhost:8080/kafka.zip"
}

注意事项

如果通过 docker compose 将 ekuiper 与 kafka 部署在同一容器网络中,可在 ekuiper 中通过 kafka 主机名配置 brokers 地址。 但是 kafka 需要特别注意 KAFKA_CFG_ADVERTISED_LISTENERS 需要配置为主机 IP 地址, 如下所示

yaml
    zookeeper:
     image: docker.io/bitnami/zookeeper:3.8
     hostname: zookeeper
     container_name: zookeeper
     ports:
      - "2181:2181"
     volumes:
      - "zookeeper_data:/bitnami"
     environment:
       - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
     image: docker.io/soldevelo/kafka:3.4
     hostname: kafka
     container_name: kafka
     ports:
      - "9092:9092"
     volumes:
      - "kafka_data:/bitnami"
     environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://122.9.166.75:9092
     depends_on:
      - zookeeper