Kafka 目标(Sink) 
该插件将分析结果发送到 Kafka 中。
编译插件&创建插件 
本地构建 
shell
# cd $eKuiper_src
# go build -trimpath --buildmode=plugin -o plugins/sinks/kafka.so extensions/sinks/kafka/kafka.go
# zip kafka.zip plugins/sinks/kafka.so
# cp kafka.zip /root/tomcat_path/webapps/ROOT/
# bin/kuiper create plugin sink kafka -f /tmp/kafkaPlugin.txt
# bin/kuiper create rule kafka -f /tmp/kafkaRule.txt镜像构建 
shell
docker build -t demo/plugins:v1 -f build/plugins/Dockerfile .
docker run demo/plugins:v1
docker cp  90eae15a7245:/workspace/_plugins/debian/sinks /tmpDockerfile 如下所示:
dockerfile
## plase check go version that kuiper used
ARG GO_VERSION=1.18.5
FROM ghcr.io/lf-edge/ekuiper/base:$GO_VERSION-debian AS builder
WORKDIR /workspace
ADD . /workspace/
RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN make plugins_c
CMD ["sleep","3600"]在 Makefile 中添加:
dockerfile
PLUGINS_CUSTOM := sinks/kafka
.PHONY: plugins_c $(PLUGINS_CUSTOM)
plugins_c: $(PLUGINS_CUSTOM)
$(PLUGINS_CUSTOM): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS_CUSTOM): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS_CUSTOM):
  @$(CURDIR)/build-plugins.sh $(PLUGIN_TYPE) $(PLUGIN_NAME)重新启动 eKuiper 服务器以激活插件。
属性 
| 属性名称 | 是否可选 | 说明 | 
|---|---|---|
| brokers | 否 | broker地址列表 ,用 "," 分割 | 
| topic | 否 | kafka 主题 | 
| saslAuthType | 否 | sasl 认证类型 , 支持none,plain,scram | 
| saslUserName | 是 | sasl 用户名 | 
| saslPassword | 是 | sasl 密码 | 
| insecureSkipVerify | 是 | 是否忽略 SSL 验证 | 
| certificationPath | 是 | Kafka 客户端 ssl 验证的 crt 文件路径 | 
| privateKeyPath | 是 | Kafka 客户端 ssl 验证的 key 文件路径 | 
| rootCaPath | 是 | Kafka 客户端 ssl 验证的 ca 证书文件路径 | 
| maxAttempts | 是 | Kafka 客户端向 server 发送消息的重试次数,默认为1 | 
| requiredACKs | 是 | Kafka 客户端确认消息的机制,-1 代表等待 leader 确认,1 代表等待所有副本确认, 0 代表不等待确认, 默认为 -1 | 
| key | 是 | Kafka 客户端向 server 发送消息所携带的 Key 信息 | 
| headers | 是 | Kafka 客户端向 server 发送消息所携带的 headers 信息 | 
其他通用的 sink 属性也支持,请参阅公共属性。
设置 key 和 headers 
通过 key 和 headers 设置 Kafka 客户端发送消息时的元数据:
json
{
    "key": "keyValue",
    "headers": {
        "headerKey1": "headerValue1",
        "headerKey2": "headerValue2"
    }
}通过 template 模板,动态设置 Kafka 客户端发送消息时的元数据:
json
{
    "key": "{{.data.key}}",
    "headers": {
        "headerKey1": "{{.data.col1}}",
        "headerKey2": "{{.data.col2}}"
    }
}在 Kafka 客户端中设置 map 结构的 key 元数据:
json
{
  "key": "{\"keyMapkey\":\"{{.data.key.value}}\"}"
}示例用法 
下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。
/tmp/kafkaRule.txt 
json
{
  "id": "kafka",
  "sql": "SELECT * from demo_stream where temperature > 50",
  "actions": [
    {
      "log": {}
    },
    {
      "kafka":{
        "brokers": "127.0.0.1:9092,127.0.0.2:9092",
        "topic": "test_topic",
        "saslAuthType": "none"
      }
    }
  ]
}/tmp/kafkaPlugin.txt 
json
{
  "file":"http://localhost:8080/kafka.zip"
}注意事项 
如果通过 docker compose 将 ekuiper 与 kafka 部署在同一容器网络中,可在 ekuiper 中通过 kafka 主机名配置 brokers 地址。 但是 kafka 需要特别注意 KAFKA_CFG_ADVERTISED_LISTENERS 需要配置为主机 IP 地址, 如下所示
yaml
    zookeeper:
     image: docker.io/bitnami/zookeeper:3.8
     hostname: zookeeper
     container_name: zookeeper
     ports:
      - "2181:2181"
     volumes:
      - "zookeeper_data:/bitnami"
     environment:
       - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
     image: docker.io/bitnami/kafka:3.4
     hostname: kafka
     container_name: kafka
     ports:
      - "9092:9092"
     volumes:
      - "kafka_data:/bitnami"
     environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://122.9.166.75:9092
     depends_on:
      - zookeeper