Skip to content

MQTT action

The action is used for publish output message into an MQTT server.

Property nameOptionalDescription
serverfalseThe broker address of the MQTT server, such as tcp://127.0.0.1:1883
topicfalseThe MQTT topic, such as analysis/result
clientIdtrueThe client id for MQTT connection. If not specified, an uuid will be used
protocolVersiontrueMQTT protocol version. 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4). If not specified, the default value is 3.1.
qostrueThe QoS for message delivery. Only int type value 0 or 1 or 2.
usernametrueThe username for the connection.
passwordtrueThe password for the connection.
certificationPathtrueThe certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the kuiperd command. For example, if you run bin/kuiperd from /var/kuiper, then the base path is /var/kuiper; If you run ./kuiperd from /var/kuiper/bin, then the base path is /var/kuiper/bin.
privateKeyPathtrueThe private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath.
rootCaPathtrueThe location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.
certficationRawtruebase64 encoded original text of cert, use certificationPath first if both defined
privateKeyRawtruebase64 encoded original text of key, use privateKeyPath if both defined
rootCARawtruebase64 encoded original text of CA, use rootCaPath first if both defined
tlsMinVersiontrueSpecifies the minimum version of the TLS protocol that will be negotiated with the client. Accept values are tls1.0, tls1.1, tls1.2 and tls1.3. Default: tls1.2.
renegotiationSupporttrueDetermines how and when the client handles server-initiated renegotiation requests. Support never, once or freely options. Default: never.
insecureSkipVerifytrueIf InsecureSkipVerify is true, TLS accepts any certificate presented by the server and any host name in that certificate. In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is false. The configuration item can only be used with TLS connections.
retainedtrueIf retained is true,The broker stores the last retained message and the corresponding QoS for that topic.The default value is false.
compressiontrueCompress the payload with the specified compression method. Support zlib, gzip, flate, zstd method now.
connectionSelectortruereuse the connection to mqtt broker. more info

Other common sink properties are supported. Please refer to the sink common properties for more information.

Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.

json
    {
      "mqtt": {
        "server": "ssl://xyz.azure-devices.net:8883",
        "topic": "devices/demo_001/messages/events/",
        "protocolVersion": "3.1.1",
        "qos": 1,
        "clientId": "demo_001",
        "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
        "password": "SharedAccessSignature sr=*******************",
        "retained": false
      }
    }

Below is another sample configuration for connecting to AWS IoT by using certification and privte key auth.

json
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "keys/d3807d9fa5-certificate.pem",
        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
        "insecureSkipVerify": false,
        "retained": false
      }
    }

You can check the connectivity of the corresponding sink endpoint in advance through the API: Connectivity Check

Dynamic Topic

If the result data contains the topic name, we can use it as the property of the mqtt action to achieve dynamic topic support. Assume the selected data has a field named mytopic, we can use data template syntax to set it as the property value for topic as below:

json
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "{{.mytopic}}",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "keys/d3807d9fa5-certificate.pem",
        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
        "retained": false
      }
    }