# EdgeX 数据源
stream source scan table source
eKuiper 内置支持 EdgeX 数据源,支持订阅来自于 EdgeX 消息总线 (opens new window)的数据,并将数据放入 eKuiper 数据处理流水线中。用户可直接通过 EdgeX 数据源消费 EdgeX 中的事件,无需任何手动模式定义。
在 eKuiper 中,EdgeX 连接器可以作为源连接器(从 EdgeX 获取数据)或 Sink 连接器(将数据发布到 EdgeX),本节重点介绍 EdgeX 源连接器。
# Configurations
eKuiper 连接器可以通过环境变量、REST API 或配置文件进行配置,本节将介绍配置文件的使用方法。
EdgeX 源连接器的配置文件位于: $ekuiper/etc/sources/edgex.yaml,其中:
- default:对应全局连接配置。
- 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。
- 连接器重用:eKuiper 还支持通过
connectionSelector配置项在不同的配置中复用某个连接配置。
以下示例包括一个全局配置和自定义配置 demo1:
#全局 Edgex 配置
default:
protocol: tcp
server: localhost
port: 5573
topic: rules-events
messageType: event
# optional:
# ClientId: client1
# Username: user1
# Password: password
#覆盖全局配置
demo1: #Conf_key
protocol: tcp
server: 10.211.55.6
port: 5571
topic: rules-events
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 全局配置
用户可在 default 部分指定全局设置。
# 连接相关配置
protocol:连接到 EdgeX 消息总线的协议,缺省为tcpserver:EdgeX 消息总线的地址,缺省为localhostport:EdgeX 消息总线的端口,缺省为5573
# 连接重用
connectionSelector:重用 EdgeX 数据源连接,如下方配置示例中的edgex.redisMsgBus。注意:连接配置文件位于connections/connection.yaml。有关连接重用的详细解释,见连接器的重用。#全局 Edgex 配置 default: protocol: tcp server: localhost port: 5573 connectionSelector: edgex.redisMsgBus topic: rules-events messageType: event # optional: # ClientId: client1 # Username: user1 # Password: password1
2
3
4
5
6
7
8
9
10
11
12提示
指定
connectionSelector参数后,所有关于连接的参数都会被忽略,包括protocol、server和port配置。本例中,protocol: tcp | server: localhost | port: 5573的值都将被忽略。
# 主题和消息配置
topic:EdgeX 消息总线上监听的主题名称,缺省为rules-events。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的 messageType 类型。type:EdgeX 消息总线类型,目前支持三种消息总线。如果指定的消息总线类型不支持,将使用缺省zero类型。zero:使用 ZeroMQ 类型的消息总线。mqtt:使用 MQTT 服务器作为消息总线,如选择 MQTT 总线类型,eKuiper 支持更多的 MQTT 配置项,具体请查看 其他配置(MQTT 相关配置)。redis:使用 Redis 服务器作为消息总线。使用 EdgeX docker compose 启动时,type 参数会默认设置为该类型。
EdgeX Levski 引入了两种信息消息总线类型,eKuiper 从 1.7.1 开始支持这两种新的类型,分别为
nats-jetstreamnats-core
messageType:EdgeX 消息模型类型。该参数支持两种类型:event:如果连接到 EdgeX application service 的主题、则消息为 "event" 类型;消息将会解码为dtos.Event类型。该选项为默认值。request:如果直接连接到消息总线的主题,接收 device service 或者 core data 发出的数据,则消息类型为 "request"。消息将会解码为requests.AddEventRequest类型。
# 其他配置(MQTT 相关配置)
如使用 MQTT 消息总线,eKuiper 还支持其他一些可选配置项。请注意,所有可选配置都应为字符类型,KeepAlive: "5000" ,有关各配置项的详细解释,可参考 MQTT 协议。
ClientIdUsernamePasswordQosKeepAliveRetainedConnectionPayloadCertFileKeyFileCertPEMBlockKeyPEMBlockSkipCertVerify
# 自定义配置
对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。
配置示例
#覆盖全局配置
demo1: #Conf_key
protocol: tcp
server: 10.211.55.6
port: 5571
topic: rules-events
2
3
4
5
6
定义 demo1 配置组后,如希望在创建流时使用此配置,可通过 CONF_KEY 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 default 配置中的相应参数。详细步骤,可参考 流语句。
示例
create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1");
# 创建流类型源
完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。EdgeX 源连接器可以作为流类型或扫描表类型数据源使用,本节将以流类型源为例进行说明。
# 通过 REST API 创建
REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。
示例
create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1");
详细操作步骤及命令解释,可参考 通过 REST API 进行流管理。
# 通过 CLI 创建
用户也可以通过命令行界面(CLI)直接访问 eKuiper。
进入 eKuiper
bin目录:cd path_to_eKuiper_directory/bin1使用
create命令创建规则,指定 EdgeX 连接器为数据源,例如:bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'1
详细操作步骤及命令解释,可参考 通过 CLI 进行流管理。
# 拓展阅读:EdgeX 中的流定义
EdgeX 在 reading objects (opens new window) 已经定义了数据类型,因此在 eKuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示:
# cd $eKuiper_base
# bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'
2
# 自动数据类型转换
eKuiper 在处理 EdgeX 事件时,会根据 EdgeX ValueType 字段自动管理数据类型转换。
数据转换原则:
- 如果在 reading 的值类型中可以找到支持的数据类型,执行数据类型转换;guas
- 如果在 reading 的值类型中找不到支持的数据类型,将保留原值;
- 如果类型转换失败,该值将被丢弃,并在日志上打印一条告警消息;
# Boolean
如果 reading 中 ValueType 的值为 Bool ,那么 eKuiper 会试着将其转换为 boolean 类型:
- 转换为
true: "1"、"t"、"T"、"true"、"TRUE"、"True" - 转换为
false:"0"、"f"、"F"、"false"、"FALSE"、"False"
# Bigint
如果 reading 中 ValueType 的值为 INT8、INT16、INT32、INT64、UINT8、UINT16、UINT32、UINT64 那么 eKuiper 会试着将其转换为 Bigint 类型。
# Float
如果 reading 中 ValueType 的值为 FLOAT32、FLOAT64 ,那么 eKuiper 会试着将其转换为 Float 类型。
# String
如果 reading 中 ValueType 的值为 String,那么 eKuiper 会试着将其转换为 String 类型。
# Boolean 数组
EdgeX 中的 Bool 数组类型会被转换为 boolean 数组。
# Bigint 数组
EdgeX 中所有的 INT8,INT16,INT32,INT64,UINT8,UINT16,UINT32,UINT64 数组类型会被转换为 Bigint 数组。
# Float 数组
EdgeX 中所有的 FLOAT32,FLOAT64 数组类型会被转换为 Float 数组。
