Skip to content

MQTT动作

该操作用于将输出消息发布到 MQTT 服务器中。

属性名称是否可选说明
serverMQTT 服务器地址,例如 tcp://127.0.0.1:1883
topicMQTT 主题,例如 analysis/result , 也可设置为动态属性,例如 $.col, 将会把结果中的 col 列的值作为主题
clientIdMQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid
protocolVersionMQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。
qos消息转发的服务质量
username连接用户名
password连接密码
certificationPath证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 /var/kuiper 中运行 bin/kuiperd ,那么父目录为 /var/kuiper; 如果运行从 /var/kuiper/bin 中运行./kuiperd,那么父目录为 /var/kuiper/bin
privateKeyPath私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 certificationPath 类似。
rootCaPath根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 certificationPath 类似。
certficationRaw经过 base64 编码过的证书原文, 如果同时定义了 certificationPath 将会先用该参数。
privateKeyRaw经过 base64 编码过的密钥原文, 如果同时定义了 privateKeyPath 将会先用该参数。
rootCARaw经过 base64 编码过的根证书原文, 如果同时定义了 rootCAPath 将会先用该参数。
insecureSkipVerify如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为 false。配置项只能用于TLS连接。
retained如果 retained 设置为 true,Broker会存储每个 Topic 的最后一条保留消息及其 Qos。默认值是 false
compression使用指定的压缩方法压缩 Payload。当前支持 zlib, gzip, flate, zstd 算法。
connectionSelector重用到 MQTT Broker 的连接,详细信息,请参考

其他通用的 sink 属性也支持,请参阅公共属性

以下为使用 SAS 连接到 Azure IoT Hub 的样例。

json
    {
      "mqtt": {
        "server": "ssl://xyz.azure-devices.net:8883",
        "topic": "devices/demo_001/messages/events/",
        "protocolVersion": "3.1.1",
        "qos": 1,
        "clientId": "demo_001",
        "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
        "password": "SharedAccessSignature sr=*******************",
        "retained": false
      }
    }

以下为使用证书和私钥连接到 AWS IoT的另一个样例。

json
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "keys/d3807d9fa5-certificate.pem",
        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
        "retained": false
      }
    }

你可以通过 api 的方式提前检查对应 sink 端点的连通性: 连通性检查

动态主题

若结果数据中包含主题内容,可以将其作为主题属性,从而实现动态主题的需求。假设 SQL 选出的数据包含 mytopic, 则可以使用数据模板的语法将其设置为 topic 属性的值,如下所示:

json
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "{{.mytopic}}",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "keys/d3807d9fa5-certificate.pem",
        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
        "retained": false
      }
    }