# EdgeX 消息总线目标

该目标用于将消息发送到 EdgeX 消息总线上。

请注意,如果你使用的是 ZeorMQ 消息总线,那么该 sink 会创建一个新的 EdgeX 消息总线(绑定到 eKuiper 服务所运行的地址),而不是利用原来既有的消息总线(通常为 application 服务所暴露的地址和端口)。

另外,如果你需要在别的主机上对你的端口可以进行访问,你需要在开始运行 eKuiper 服务之前,把端口号映射到主机上。

名称可选Description
type消息总线类型,目前支持三种类型的消息总线, redis, zero 或者 mqtt,其中 redis 为缺省类型。
protocol协议,如未指定,使用缺省值 tcp
host消息总线主机地址,使用缺省值 *
port消息总线端口号。 如未指定,使用缺省值 5563
connectionSelector重用到 EdgeX 消息总线的连接,详细信息,请参考
topic发布的主题名称。该主题为固定值。若不同的消息需要动态指定主题,则将该属性置空,并设置 topicPrefix 属性。这两个属性只能设置一个。若两者都未设置,则使用缺省主题 application
topicPrefix发布的主题的前缀。发送的主题将采用动态拼接,格式为$topicPrefix/$profileName/$deviceName/$sourceName
contentType发布消息的内容类型,如未指定,使用缺省值 application/json
messageTypeEdgeX 消息模型类型。若要将消息发送为类似 apllication service 的 event 类型,则应设置为 event。否则,若要将消息发送为类似 device service 或者 core data service 的 event request 类型,则应设置为 request。如未指定,使用缺省值 event
metadata该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 meta(*) AS xxx ,用于选出消息中所有的 EdgeX 元数据 。
profileName允许用户指定 Profile 名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的 profile 名称。若在 metadata 中设置了 profileName 将会优先采用。
deviceName允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的设备名称。若在 metadata 中设置了 deviceName 将会优先采用。
sourceName允许用户指定源名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的源名称。若在 metadata 中设置了 sourceName 将会优先采用。
optional如果指定了 mqtt 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。

以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。

  • optional
    • ClientId
    • Username
    • Password
    • Qos
    • KeepAlive
    • Retained
    • ConnectionPayload
    • CertFile
    • KeyFile
    • CertPEMBlock
    • KeyPEMBlock
    • SkipCertVerify

其他通用的 sink 属性也支持,请参阅公共属性

EdgeX 动作可支持数据模板对结果格式进行变化,但是数据模板的结果必须为 JSON 字符串的 object 形式,例如 "{\"key\":\"{{.key}}\"}"。数组形式的 JSON 字符串或者非 JSON 字符串都不支持。 ::

# 发送到各种目标

通过设置不同的属性组合,我们可以将结果采用不同的格式发送到不同的 EdgeX 消息总线设置中。

# 像 application service 一样发送到 redis 消息总线

使用默认配置,EdgeX sink 会将消息以 event 格式发送到默认的 redis 消息总线中。

{
  "id": "ruleRedisEvent",
  "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
  "actions": [
    {
      "edgex": {
        "protocol": "redis",
        "host": "localhost",
        "port": 6379,
        "topic": "application",
        "profileName": "ekuiperProfile",
        "deviceName": "ekuiper",      
        "contentType": "application/json"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 像 device service 一样发送到 redis 消息总线

通过更改 topicPrefixmessageType 属性,我们可以让 EdgeX sink 模拟设备。设备默认情况下会发送消息到 edgex/events/device/$profileName/$deviceName/$sourceName 格式的主题中。所以,我们需要设置 topicPrefix 属性为 edgex/events/device 以确保消息路由为设备消息。此外,通过与 metadata 结合,我们可以发送到动态的主题中,从而模拟多个设备。详情参考下一节动态元数据

{
  "id": "ruleRedisDevice",
  "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
  "actions": [
    {
      "edgex": {
        "protocol": "redis",
        "host": "localhost",
        "port": 6379,
        "topicPrefix": "edgex/events/device",
        "messageType": "request",
        "metadata": "metafield_name",
        "contentType": "application/json"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 发送到 MQTT 消息总线

以下是将分析结果发送到 MQTT 消息总线的规则,请注意在 optional 中是如何指定 ClientId 的。

{
  "id": "ruleMqtt",
  "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
  "actions": [
    {
      "edgex": {
        "protocol": "tcp",
        "host": "127.0.0.1",
        "port": 1883,
        "topic": "result",
        "type": "mqtt",
        "metadata": "edgex_meta",
        "contentType": "application/json",
        "optional": {
          "ClientId": "edgex_message_bus_001"
        }
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 发送到 zeromq 消息总线

以下是将分析结果发送到 zeromq 消息总线的规则。

{
  "id": "ruleZmq",
  "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
  "actions": [
    {
      "edgex": {
        "protocol": "tcp",
        "host": "*",
        "port": 5571,
        "topic": "application",
        "profileName": "myprofile",
        "deviceName": "mydevice",      
        "contentType": "application/json"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 使用连接重用功能发布

以下是如何使用连接重用功能的示例。我们只需要删除连接相关的参数并使用 connectionSelector 指定要重用的连接。更多信息

{
  "id": "ruleRedisDevice",
  "sql": "SELECT temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
  "actions": [
    {
      "edgex": {
        "connectionSelector": "edgex.redisMsgBus",
        "topic": "application",
        "profileName": "myprofile",
        "deviceName": "mydevice",      
        "contentType": "application/json"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 动态元数据

# 发布结果到 EdgeX 消息总线,而不保留原有的元数据

在此情况下,原有的元数据 (例如 Events 结构体中的 id,deviceName,profileName,sourceName,origin,tags,以及Reading 结构体中的 id,deviceName,profileName,origin,valueType 不会被保留)。eKuiper 在此情况下作为 EdgeX 的一个单独微服务,它有自己的 device nameprofile namesource name。 提供了属性 deviceNameprofileName, 这两个属性允许用户指定 eKuiper 的设备名称和 profile 名称。而 sourceName 默认为 topic 属性的值。如下所示,

  1. 从 EdgeX 消息总线上的 events 主题上收到的消息,

    {
      "DeviceName": "demo", "Origin": 000,"readings":
      [
         {"ResourceName": "Temperature", value: "30", "Origin":123},
         {"ResourceName": "Humidity", value: "20", "Origin":456}
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
  2. 使用如下的规则,并且在 edgex action 中给属性 deviceName 指定 kuiper,属性 profileName 指定 kuiperProfile

    {
      "id": "rule1",
      "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
      "actions": [
        {
          "edgex": {
            "topic": "application",
            "deviceName": "kuiper",
            "profileName": "kuiperProfile",
            "contentType": "application/json"
          }
        }
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  3. 发送到 EdgeX 消息总线上的数据。

    {
      "DeviceName": "kuiper", "ProfileName": "kuiperProfile",  "Origin": 0,"readings":
      [
         {"ResourceName": "t1", value: "90", "Origin": 0},
         {"ResourceName": "humidity", value: "20" , "Origin": 0}
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

请注意:

  • Event 结构体中的设备名称(DeviceName)变成了 kuiper,profile 名称( ProfileName)变成了 kuiperProfile
  • Events and Readings 结构体中的数据被更新为新的值。 字段 Origin 被 eKuiper 更新为新的值 (这里为 0).

# 发布结果到 EdgeX 消息总线,并保留原有的元数据

但是在某些场景中,你可能需要保留原来的元数据。比如保留发送到 eKuiper 的设备名称,在本例中为 demo, 还有 reading 数组中的其它元数据。在此情况下,eKuiper 更像是一个过滤器 - 将不关心的数据过滤掉,但是依然保留原有的数据。

参考以下的例子,

  1. 从 EdgeX 消息总线上的 events 主题上收到的消息,

    {
      "DeviceName": "demo", "Origin": 000,"readings":
      [
         {"ResourceName": "Temperature", value: "30", "Origin":123},
         {"ResourceName": "Humidity", value: "20", "Origin":456}
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
  2. 使用如下规则,在edgex action 中,为 metadata 指定值 edgex_meta

    {
      "id": "rule1",
      "sql": "SELECT meta(*) AS edgex_meta, temperature * 3 AS t1, humidity FROM events WHERE temperature > 30",
      "actions": [
        {
          "edgex": {
            "topic": "application",
            "metadata": "edgex_meta",
            "contentType": "application/json"
          }
        }
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    请注意,

    • 用户需要在 SQL 子句中加 meta(*) AS edgex_meta ,函数 meta(*) 返回所有的元数据。
    • edgex action里, 属性 metadata 指定值 edgex_meta 。该属性指定哪个字段包含了元数据。
  3. 发送给 EdgeX 消息总线的数据

    {
      "DeviceName": "demo", "Origin": 000,"readings":
      [
         {"ResourceName": "t1", value: "90" , "Origin": 0},
         {"ResourceName": "humidity", value: "20", "Origin":456}
      ]
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

    请注意,

    • Events 结构体的元数据依然保留,例如 DeviceName & Origin.
    • 对于在原有消息中可以找到的 reading,元数据将继续保留。 比如 humidity 的元数据就是从 EdgeX 消息总线里接收到的原值 - 或者说是旧值
    • 对于在原有消息中无法找到的 reading,元数据将不会被设置。如例子中的 t1 的元数据被设置为 eKuiper 产生的缺省值。
    • 如果你的 SQL 包含了聚合函数,那保留原有的元数据就没有意义,但是 eKuiper 还是会使用时间窗口中的某一条记录的元数据。例如,在下面的 SQL 里, SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)。 这种情况下,在时间窗口中可能有几条数据,eKuiper 会使用窗口中的第一条数据的元数据来填充 temperature 的元数据。