Skip to content

EdgeX 数据源

stream sourcescan table source

eKuiper 内置支持 EdgeX 数据源,支持订阅来自于 EdgeX 消息总线的数据,并将数据放入 eKuiper 数据处理流水线中。用户可直接通过 EdgeX 数据源消费 EdgeX 中的事件,无需任何手动模式定义

在 eKuiper 中,EdgeX 连接器可以作为源连接器(从 EdgeX 获取数据)或 Sink 连接器(将数据发布到 EdgeX),本节重点介绍 EdgeX 源连接器。

Configurations

eKuiper 连接器可以通过环境变量REST API 或配置文件进行配置,本节将介绍配置文件的使用方法。

EdgeX 源连接器的配置文件位于: $ekuiper/etc/sources/edgex.yaml,其中:

  • default:对应全局连接配置。
  • 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。
  • 连接器重用:eKuiper 还支持通过 connectionSelector 配置项在不同的配置中复用某个连接配置。

以下示例包括一个全局配置和自定义配置 demo1

yaml
#全局 Edgex 配置
default:
  protocol: tcp
  server: localhost
  port: 5573
  topic: rules-events
  messageType: event
#  optional:
#    ClientId: client1
#    Username: user1
#    Password: password

#覆盖全局配置
demo1: #Conf_key
  protocol: tcp
  server: 10.211.55.6
  port: 5571
  topic: rules-events

全局配置

用户可在 default 部分指定全局设置。

连接相关配置

  • protocol:连接到 EdgeX 消息总线的协议,缺省为 tcp
  • server:EdgeX 消息总线的地址,缺省为 localhost
  • port:EdgeX 消息总线的端口,缺省为 5573

连接重用

  • connectionSelector:重用 EdgeX 数据源连接,如下方配置示例中的 edgex.redisMsgBus。注意:连接配置文件位于 connections/connection.yaml。有关连接重用的详细解释,见连接器的重用

    yaml
    #全局 Edgex 配置
    default:
    protocol: tcp
    server: localhost
    port: 5573
    connectionSelector: edgex.redisMsgBus
    topic: rules-events
    messageType: event
    #  optional:
    #    ClientId: client1
    #    Username: user1
    #    Password: password

    TIP

    指定 connectionSelector 参数后,所有关于连接的参数都会被忽略,包括 protocolserverport 配置。本例中,protocol: tcp | server: localhost | port: 5573的值都将被忽略。

主题和消息配置

  • topic:EdgeX 消息总线上监听的主题名称,缺省为 rules-events。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的 messageType 类型。

  • type:EdgeX 消息总线类型,目前支持三种消息总线。如果指定的消息总线类型不支持,将使用缺省 zero 类型。

    • zero:使用 ZeroMQ 类型的消息总线。
    • mqtt:使用 MQTT 服务器作为消息总线,如选择 MQTT 总线类型,eKuiper 支持更多的 MQTT 配置项,具体请查看 其他配置(MQTT 相关配置)
    • redis:使用 Redis 服务器作为消息总线。使用 EdgeX docker compose 启动时,type 参数会默认设置为该类型。

    EdgeX Levski 引入了两种信息消息总线类型,eKuiper 从 1.7.1 开始支持这两种新的类型,分别为

    • nats-jetstream
    • nats-core
  • messageType:EdgeX 消息模型类型。该参数支持两种类型:

    • event:如果连接到 EdgeX application service 的主题、则消息为 "event" 类型;消息将会解码为 dtos.Event 类型。该选项为默认值。
    • request:如果直接连接到消息总线的主题,接收 device service 或者 core data 发出的数据,则消息类型为 "request"。消息将会解码为 requests.AddEventRequest 类型。

其他配置(MQTT 相关配置)

如使用 MQTT 消息总线,eKuiper 还支持其他一些可选配置项。请注意,所有可选配置都应为字符类型KeepAlive: "5000" ,有关各配置项的详细解释,可参考 MQTT 协议。

  • ClientId

  • Username

  • Password

  • Qos

  • KeepAlive

  • Retained

  • ConnectionPayload

  • CertFile

  • KeyFile

  • CertPEMBlock

  • KeyPEMBlock

  • SkipCertVerify

自定义配置

对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。

配置示例

yaml
#覆盖全局配置
demo1: #Conf_key
  protocol: tcp
  server: 10.211.55.6
  port: 5571
  topic: rules-events

定义 demo1 配置组后,如希望在创建流时使用此配置,可通过 CONF_KEY 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 default 配置中的相应参数。详细步骤,可参考 流语句

示例

sql
create stream demo1() WITH (FORMAT="JSON"type="edgex"、CONF_KEY="demo1");

创建流类型源

完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。EdgeX 源连接器可以作为流类型扫描表类型数据源使用,本节将以流类型源为例进行说明。

通过 REST API 创建

REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。

示例

sql
create stream demo1() WITH (FORMAT="JSON"type="edgex"、CONF_KEY="demo1");

详细操作步骤及命令解释,可参考 通过 REST API 进行流管理

通过 CLI 创建

用户也可以通过命令行界面(CLI)直接访问 eKuiper。

  1. 进入 eKuiper bin 目录:

    bash
    cd path_to_eKuiper_directory/bin
  2. 使用 create 命令创建规则,指定 EdgeX 连接器为数据源,例如:

    bash
    bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'

详细操作步骤及命令解释,可参考 通过 CLI 进行流管理

拓展阅读:EdgeX 中的流定义

EdgeX 在 reading objects 已经定义了数据类型,因此在 eKuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示:

shell
# cd $eKuiper_base
# bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'

自动数据类型转换

eKuiper 在处理 EdgeX 事件时,会根据 EdgeX ValueType 字段自动管理数据类型转换。

数据转换原则:

  • 如果在 reading 的值类型中可以找到支持的数据类型,执行数据类型转换;guas
  • 如果在 reading 的值类型中找不到支持的数据类型,将保留原值;
  • 如果类型转换失败,该值将被丢弃,并在日志上打印一条告警消息;

Boolean

如果 readingValueType 的值为 Bool ,那么 eKuiper 会试着将其转换为 boolean 类型:

  • 转换为 true: "1"、"t"、"T"、"true"、"TRUE"、"True"
  • 转换为 false:"0"、"f"、"F"、"false"、"FALSE"、"False"

Bigint

如果 readingValueType 的值为 INT8INT16INT32INT64UINT8UINT16UINT32UINT64 那么 eKuiper 会试着将其转换为 Bigint 类型。

Float

如果 readingValueType 的值为 FLOAT32FLOAT64 ,那么 eKuiper 会试着将其转换为 Float 类型。

String

如果 readingValueType 的值为 String,那么 eKuiper 会试着将其转换为 String 类型。

Boolean 数组

EdgeX 中的 Bool 数组类型会被转换为 boolean 数组。

Bigint 数组

EdgeX 中所有的 INT8INT16INT32INT64UINT8UINT16UINT32UINT64 数组类型会被转换为 Bigint 数组。

Float 数组

EdgeX 中所有的 FLOAT32FLOAT64 数组类型会被转换为 Float 数组。