# EdgeX 数据源
stream source scan table source
eKuiper 内置支持 EdgeX 数据源,支持订阅来自于 EdgeX 消息总线 (opens new window)的数据,并将数据放入 eKuiper 数据处理流水线中。用户可直接通过 EdgeX 数据源消费 EdgeX 中的事件,无需任何手动模式定义。
在 eKuiper 中,EdgeX 连接器可以作为源连接器(从 EdgeX 获取数据)或 Sink 连接器(将数据发布到 EdgeX),本节重点介绍 EdgeX 源连接器。
# Configurations
eKuiper 连接器可以通过环境变量、REST API 或配置文件进行配置,本节将介绍配置文件的使用方法。
EdgeX 源连接器的配置文件位于: $ekuiper/etc/sources/edgex.yaml
,其中:
- default:对应全局连接配置。
- 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。
- 连接器重用:eKuiper 还支持通过
connectionSelector
配置项在不同的配置中复用某个连接配置。
以下示例包括一个全局配置和自定义配置 demo1
:
#全局 Edgex 配置
default:
protocol: tcp
server: localhost
port: 5573
topic: rules-events
messageType: event
# optional:
# ClientId: client1
# Username: user1
# Password: password
#覆盖全局配置
demo1: #Conf_key
protocol: tcp
server: 10.211.55.6
port: 5571
topic: rules-events
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 全局配置
用户可在 default
部分指定全局设置。
# 连接相关配置
protocol
:连接到 EdgeX 消息总线的协议,缺省为tcp
server
:EdgeX 消息总线的地址,缺省为localhost
port
:EdgeX 消息总线的端口,缺省为5573
# 连接重用
connectionSelector
:重用 EdgeX 数据源连接,如下方配置示例中的edgex.redisMsgBus
。注意:连接配置文件位于connections/connection.yaml
。有关连接重用的详细解释,见连接器的重用。#全局 Edgex 配置 default: protocol: tcp server: localhost port: 5573 connectionSelector: edgex.redisMsgBus topic: rules-events messageType: event # optional: # ClientId: client1 # Username: user1 # Password: password
1
2
3
4
5
6
7
8
9
10
11
12提示
指定
connectionSelector
参数后,所有关于连接的参数都会被忽略,包括protocol
、server
和port
配置。本例中,protocol: tcp | server: localhost | port: 5573
的值都将被忽略。
# 主题和消息配置
topic
:EdgeX 消息总线上监听的主题名称,缺省为rules-events
。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的 messageType 类型。type
:EdgeX 消息总线类型,目前支持三种消息总线。如果指定的消息总线类型不支持,将使用缺省zero
类型。zero
:使用 ZeroMQ 类型的消息总线。mqtt
:使用 MQTT 服务器作为消息总线,如选择 MQTT 总线类型,eKuiper 支持更多的 MQTT 配置项,具体请查看 其他配置(MQTT 相关配置)。redis
:使用 Redis 服务器作为消息总线。使用 EdgeX docker compose 启动时,type 参数会默认设置为该类型。
EdgeX Levski 引入了两种信息消息总线类型,eKuiper 从 1.7.1 开始支持这两种新的类型,分别为
nats-jetstream
nats-core
messageType
:EdgeX 消息模型类型。该参数支持两种类型:event
:如果连接到 EdgeX application service 的主题、则消息为 "event" 类型;消息将会解码为dtos.Event
类型。该选项为默认值。request
:如果直接连接到消息总线的主题,接收 device service 或者 core data 发出的数据,则消息类型为 "request"。消息将会解码为requests.AddEventRequest
类型。
# 其他配置(MQTT 相关配置)
如使用 MQTT 消息总线,eKuiper 还支持其他一些可选配置项。请注意,所有可选配置都应为字符类型,KeepAlive: "5000"
,有关各配置项的详细解释,可参考 MQTT 协议。
ClientId
Username
Password
Qos
KeepAlive
Retained
ConnectionPayload
CertFile
KeyFile
CertPEMBlock
KeyPEMBlock
SkipCertVerify
# 自定义配置
对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。
配置示例
#覆盖全局配置
demo1: #Conf_key
protocol: tcp
server: 10.211.55.6
port: 5571
topic: rules-events
2
3
4
5
6
定义 demo1
配置组后,如希望在创建流时使用此配置,可通过 CONF_KEY
选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 default
配置中的相应参数。详细步骤,可参考 流语句。
示例
create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1");
# 创建流类型源
完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。EdgeX 源连接器可以作为流类型或扫描表类型数据源使用,本节将以流类型源为例进行说明。
# 通过 REST API 创建
REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。
示例
create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1");
详细操作步骤及命令解释,可参考 通过 REST API 进行流管理。
# 通过 CLI 创建
用户也可以通过命令行界面(CLI)直接访问 eKuiper。
进入 eKuiper
bin
目录:cd path_to_eKuiper_directory/bin
1使用
create
命令创建规则,指定 EdgeX 连接器为数据源,例如:bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'
1
详细操作步骤及命令解释,可参考 通过 CLI 进行流管理。
# 拓展阅读:EdgeX 中的流定义
EdgeX 在 reading objects (opens new window) 已经定义了数据类型,因此在 eKuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示:
# cd $eKuiper_base
# bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")'
2
# 自动数据类型转换
eKuiper 在处理 EdgeX 事件时,会根据 EdgeX ValueType
字段自动管理数据类型转换。
数据转换原则:
- 如果在 reading 的值类型中可以找到支持的数据类型,执行数据类型转换;guas
- 如果在 reading 的值类型中找不到支持的数据类型,将保留原值;
- 如果类型转换失败,该值将被丢弃,并在日志上打印一条告警消息;
# Boolean
如果 reading
中 ValueType
的值为 Bool
,那么 eKuiper 会试着将其转换为 boolean
类型:
- 转换为
true
: "1"、"t"、"T"、"true"、"TRUE"、"True" - 转换为
false
:"0"、"f"、"F"、"false"、"FALSE"、"False"
# Bigint
如果 reading
中 ValueType
的值为 INT8
、INT16
、INT32
、INT64
、UINT8
、UINT16
、UINT32
、UINT64
那么 eKuiper 会试着将其转换为 Bigint
类型。
# Float
如果 reading
中 ValueType
的值为 FLOAT32
、FLOAT64
,那么 eKuiper 会试着将其转换为 Float
类型。
# String
如果 reading
中 ValueType
的值为 String
,那么 eKuiper 会试着将其转换为 String
类型。
# Boolean 数组
EdgeX 中的 Bool
数组类型会被转换为 boolean
数组。
# Bigint 数组
EdgeX 中所有的 INT8
,INT16
,INT32
,INT64
,UINT8
,UINT16
,UINT32
,UINT64
数组类型会被转换为 Bigint
数组。
# Float 数组
EdgeX 中所有的 FLOAT32
,FLOAT64
数组类型会被转换为 Float
数组。