# EdgeX 数据源

stream source scan table source

eKuiper 内置支持 EdgeX 数据源,支持订阅来自于 EdgeX 消息总线 (opens new window)的数据,并将数据放入 eKuiper 数据处理流水线中。用户可直接通过 EdgeX 数据源消费 EdgeX 中的事件,无需任何手动模式定义

在 eKuiper 中,EdgeX 连接器可以作为源连接器(从 EdgeX 获取数据)或 Sink 连接器(将数据发布到 EdgeX),本节重点介绍 EdgeX 源连接器。

# Configurations

eKuiper 连接器可以通过环境变量REST API 或配置文件进行配置,本节将介绍配置文件的使用方法。

EdgeX 源连接器的配置文件位于: $ekuiper/etc/sources/edgex.yaml,其中:

  • default:对应全局连接配置。
  • 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。
  • 连接器重用:eKuiper 还支持通过 connectionSelector 配置项在不同的配置中复用某个连接配置。

以下示例包括一个全局配置和自定义配置 demo1

#全局 Edgex 配置
default:
  protocol: tcp
  server: localhost
  port: 5573
  topic: rules-events
  messageType: event
#  optional:
#    ClientId: client1
#    Username: user1
#    Password: password

#覆盖全局配置
demo1: #Conf_key
  protocol: tcp
  server: 10.211.55.6
  port: 5571
  topic: rules-events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 全局配置

用户可在 default 部分指定全局设置。

# 连接相关配置

  • protocol:连接到 EdgeX 消息总线的协议,缺省为 tcp
  • server:EdgeX 消息总线的地址,缺省为 localhost
  • port:EdgeX 消息总线的端口,缺省为 5573

# 连接重用

  • connectionSelector:重用 EdgeX 数据源连接,如下方配置示例中的 edgex.redisMsgBus。注意:连接配置文件位于 connections/connection.yaml。有关连接重用的详细解释,见连接器的重用

    #全局 Edgex 配置
    default:
    protocol: tcp
    server: localhost
    port: 5573
    connectionSelector: edgex.redisMsgBus
    topic: rules-events
    messageType: event
    #  optional:
    #    ClientId: client1
    #    Username: user1
    #    Password: password
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    提示

    指定 connectionSelector 参数后,所有关于连接的参数都会被忽略,包括 protocolserverport 配置。本例中,protocol: tcp | server: localhost | port: 5573的值都将被忽略。

# 主题和消息配置

  • topic:EdgeX 消息总线上监听的主题名称,缺省为 rules-events。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的 messageType 类型。

  • type:EdgeX 消息总线类型,目前支持三种消息总线。如果指定的消息总线类型不支持,将使用缺省 zero 类型。

    • zero:使用 ZeroMQ 类型的消息总线。
    • mqtt:使用 MQTT 服务器作为消息总线,如选择 MQTT 总线类型,eKuiper 支持更多的 MQTT 配置项,具体请查看 其他配置(MQTT 相关配置)
    • redis:使用 Redis 服务器作为消息总线。使用 EdgeX docker compose 启动时,type 参数会默认设置为该类型。

    EdgeX Levski 引入了两种信息消息总线类型,eKuiper 从 1.7.1 开始支持这两种新的类型,分别为

    • nats-jetstream
    • nats-core
  • messageType:EdgeX 消息模型类型。该参数支持两种类型:

    • event:如果连接到 EdgeX application service 的主题、则消息为 "event" 类型;消息将会解码为 dtos.Event 类型。该选项为默认值。
    • request:如果直接连接到消息总线的主题,接收 device service 或者 core data 发出的数据,则消息类型为 "request"。消息将会解码为 requests.AddEventRequest 类型。

# 其他配置(MQTT 相关配置)

如使用 MQTT 消息总线,eKuiper 还支持其他一些可选配置项。请注意,所有可选配置都应为字符类型KeepAlive: "5000" ,有关各配置项的详细解释,可参考 MQTT 协议。

  • ClientId

  • Username

  • Password

  • Qos

  • KeepAlive

  • Retained

  • ConnectionPayload

  • CertFile

  • KeyFile

  • CertPEMBlock

  • KeyPEMBlock

  • SkipCertVerify

# 自定义配置

对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。

配置示例

#覆盖全局配置
demo1: #Conf_key
  protocol: tcp
  server: 10.211.55.6
  port: 5571
  topic: rules-events
1
2
3
4
5
6

定义 demo1 配置组后,如希望在创建流时使用此配置,可通过 CONF_KEY 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 default 配置中的相应参数。详细步骤,可参考 流语句

示例

create stream demo1() WITH (FORMAT="JSON"type="edgex"、CONF_KEY="demo1");
1

# 创建流类型源

完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。EdgeX 源连接器可以作为流类型扫描表类型数据源使用,本节将以流类型源为例进行说明。

# 通过 REST API 创建

REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。

示例

create stream demo1() WITH (FORMAT="JSON"type="edgex"、CONF_KEY="demo1");
1

详细操作步骤及命令解释,可参考 通过 REST API 进行流管理

# 通过 CLI 创建

用户也可以通过命令行界面(CLI)直接访问 eKuiper。

  1. 进入 eKuiper bin 目录:

    cd path_to_eKuiper_directory/bin
    
    1
  2. 使用 create 命令创建规则,指定 EdgeX 连接器为数据源,例如:

    bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'
    
    1

详细操作步骤及命令解释,可参考 通过 CLI 进行流管理

# 拓展阅读:EdgeX 中的流定义

EdgeX 在 reading objects (opens new window) 已经定义了数据类型,因此在 eKuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示:

# cd $eKuiper_base
# bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'
1
2

# 自动数据类型转换

eKuiper 在处理 EdgeX 事件时,会根据 EdgeX ValueType 字段自动管理数据类型转换。

数据转换原则:

  • 如果在 reading 的值类型中可以找到支持的数据类型,执行数据类型转换;guas
  • 如果在 reading 的值类型中找不到支持的数据类型,将保留原值;
  • 如果类型转换失败,该值将被丢弃,并在日志上打印一条告警消息;

# Boolean

如果 readingValueType 的值为 Bool ,那么 eKuiper 会试着将其转换为 boolean 类型:

  • 转换为 true: "1"、"t"、"T"、"true"、"TRUE"、"True"
  • 转换为 false:"0"、"f"、"F"、"false"、"FALSE"、"False"

# Bigint

如果 readingValueType 的值为 INT8INT16INT32INT64UINT8UINT16UINT32UINT64 那么 eKuiper 会试着将其转换为 Bigint 类型。

# Float

如果 readingValueType 的值为 FLOAT32FLOAT64 ,那么 eKuiper 会试着将其转换为 Float 类型。

# String

如果 readingValueType 的值为 String,那么 eKuiper 会试着将其转换为 String 类型。

# Boolean 数组

EdgeX 中的 Bool 数组类型会被转换为 boolean 数组。

# Bigint 数组

EdgeX 中所有的 INT8INT16INT32INT64UINT8UINT16UINT32UINT64 数组类型会被转换为 Bigint 数组。

# Float 数组

EdgeX 中所有的 FLOAT32FLOAT64 数组类型会被转换为 Float 数组。