# Kafka 目标(Sink)

该插件将分析结果发送到 Kafka 中。

# 编译插件&创建插件

# 本地构建

# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt
1
2
3
4
5
6

# 镜像构建

docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp  90eae15a7245:/workspace/_plugins/debian/sinks /tmp
1
2
3

Dockerfile 如下所示:

## plase check go version that kuiper used
ARG GO_VERSION=1.18.5
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]
1
2
3
4
5
6
7
8

在 Makefile 中添加:

PLUGINS_CUSTOM := sinks/kafka

.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)

$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
  @$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)
1
2
3
4
5
6
7
8
9

重新启动 eKuiper 服务器以激活插件。

# 属性

属性名称是否可选说明
brokersbroker地址列表 ,用 "," 分割
topickafka 主题
saslAuthTypesasl 认证类型 , 支持none,plain,scram
saslUserNamesasl 用户名
saslPasswordsasl 密码
insecureSkipVerify是否忽略 SSL 验证
certificationPathKafka 客户端 ssl 验证的 crt 文件路径
privateKeyPathKafka 客户端 ssl 验证的 key 文件路径
rootCaPathKafka 客户端 ssl 验证的 ca 证书文件路径
maxAttemptsKafka 客户端向 server 发送消息的重试次数,默认为1
batchSizeKafka 客户端向 server 发送单批消息的消息条数,默认为 1
keyKafka 客户端向 server 发送消息所携带的 Key 信息
headersKafka 客户端向 server 发送消息所携带的 headers 信息

其他通用的 sink 属性也支持,请参阅公共属性

# 设置 key 和 headers

通过 key 和 headers 设置 Kafka 客户端发送消息时的元数据:

{
    "key": "keyValue",
    "headers": {
        "headerKey1": "headerValue1",
        "headerKey2": "headerValue2"
    }
}
1
2
3
4
5
6
7

通过 template 模板,动态设置 Kafka 客户端发送消息时的元数据:

{
    "key": "{{.data.key}}",
    "headers": {
        "headerKey1": "{{.data.col1}}",
        "headerKey2": "{{.data.col2}}"
    }
}
1
2
3
4
5
6
7

在 Kafka 客户端中设置 map 结构的 key 元数据:

{
  "key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}
1
2
3

# 示例用法

下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。

# /tmp/kafkaRule.txt

{
  "id": "kafka",
  "sql": "SELECT * from demo_stream where temperature > 50",
  "actions": [
    {
      "log": {}
    },
    {
      "kafka":{
        "brokers": "127.0.0.1:9092,127.0.0.2:9092",
        "topic": "test_topic",
        "saslAuthType": "none"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# /tmp/kafkaPlugin.txt

{
  "file":"http://localhost:8080/kafka.zip"
}
1
2
3

# 注意事项

如果通过 docker compose 将 ekuiper 与 kafka 部署在同一容器网络中,可在 ekuiper 中通过 kafka 主机名配置 brokers 地址。 但是 kafka 需要特别注意 KAFKA_CFG_ADVERTISED_LISTENERS 需要配置为主机 IP 地址, 如下所示

    zookeeper:
     image: docker.io/bitnami/zookeeper:3.8
     hostname: zookeeper
     container_name: zookeeper
     ports:
      - "2181:2181"
     volumes:
      - "zookeeper_data:/bitnami"
     environment:
       - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
     image: docker.io/bitnami/kafka:3.4
     hostname: kafka
     container_name: kafka
     ports:
      - "9092:9092"
     volumes:
      - "kafka_data:/bitnami"
     environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://122.9.166.75:9092
     depends_on:
      - zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26