# 数据连接
在流式数据处理中,与数据源和目标端点的无缝集成能力非常关键。作为一款轻量级的边缘流处理引擎,eKuiper通过连接器实现了与外部系统的数据交互,如数据库、消息中间件等。
eKuiper 内置各类连接器并支持用户以插件的形式扩展连接器,因此能从各种来源获取数据,对其进行实时处理,并将处理结果推送至指定系统(数据 Sink),因此能够轻松融入从 IoT 边缘设备到云计算基础设施的各种场景。
eKuiper 连接器主要分两类:
- 数据源连接器:负责从各类外部数据源中导入数据至 eKuiper。
- 数据 Sink 连接器:负责将 eKuiper 处理后的数据输出至外部系统。
本章将介绍 eKuiper 中各类连接器的配置、使用以及最佳实践。
# 数据源连接器
eKuiper 数据源连接器旨在从各种外部源导入数据到平台。在 eKuiper 中,用户只需将这些数据源集成到他们的数据流或表中,即可将相关数据导入 eKuiper 并执行查询或数据转换等操作。eKuiper 还提供了丰富的配置选项,方便满足用户的各类数据处理需求。
内置源连接器:
eKuiper 内置以下数据源连接器:
MQTT 源:从 MQTT 主题读取数据。
Neuron 源:从本地 Neuron 实例读取数据。
EdgeX 源:从 EdgeX foundry 读取数据。
HTTP Pull 源:从 HTTP 服务器中拉取数据。
HTTP Push 源:通过 HTTP 推送数据到 eKuiper。
文件源:从文件中读取数据,通常用作表格。
Redis 源:从 Redis 中查询数据,用作查询表。
插件式源连接器 对于需要自定义数据源或与特定第三方集成的场景,eKuiper 提供了基于插件的拓展源连接器:
# 数据 Sink 连接器
eKuiper Sink 连接器负责将 eKuiper 处理后的数据发送到各种目标端点或系统,可直接与 MQTT、Neuron、EdgeX 等平台对接,并提供缓存机制以应对网络中断场景,确保数据的一致性。此外,用户还可通过动态属性和资源重用来定制接收行为,简化集成并提高可伸缩性。
与源连接器类似,Sink 连接器也分为内置和插件式两种。
内置 Sink 连接器
以下是 eKuiper 提供的内置 Sink 连接器:
- MQTT Sink:输出到外部 MQTT 服务。
- Neuron Sink:输出到本地的 Neuron 实例。
- EdgeX Sink:输出到 EdgeX Foundry。此动作仅在启用 edgex 编译标签时存在。
- Rest Sink:输出到外部 HTTP 服务器。
- Redis Sink:写入 Redis 。
- File Sink:写入文件。
- Memory Sink:输出到 eKuiper 内存主题,,常用于构建规则管道。
- Log Sink:写入日志,通常只用于调试。
- Nop Sink:不输出,用于性能测试。
插件式 Sink 连接器
对于特殊的数据分发或特定平台集成需求,eKuiper 支持基于插件的 Sink 连接器:
- InfluxDB Sink:输出到 Influx DB
v1.x
。 - InfluxDBV2 Sink:输出到 Influx DB
v2.x
。 - TDengine Sink:输出到 Tdengine。
- Image Sink:输出到一个图像文件。仅用于处理二进制结果。
- Zero MQ Sink:输出到 ZeroMQ。
- Kafka Sink:输出到 Kafka。
# 数据模板
eKuiper 数据模板 支持用户对分析结果进行"二次处理",以满足不同接收系统的多样化格式要求。利用 Golang 模板系统,eKuiper 提供了动态数据转换、条件输出和迭代处理的机制,确保了与各种接收器的兼容性和精确格式化。
# 连接器的重用
eKuiper 支持通过 connectionSelector
配置项对连接器进行重用,用户只需一次定义即可在多个配置中重用,提升连接管理效率,简化配置流程。
配置
以 MQTT 数据源为例,您可首先在连接配置文件 connections/connection.yaml
中定义 MQTT 全局连接信息,例如 mqtt.localConnection
和 mqtt.cloudConnection
。
mqtt:
localConnection: #connection key
server: "tcp://127.0.0.1:1883"
username: ekuiper
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
clientid: ekuiper
cloudConnection: #connection key
server: "tcp://broker.emqx.io:1883"
username: user1
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在配置 MQTT 源($ekuiper/etc/mqtt_source.yaml
)时,可通过 connectionSelector
引用以上连接配置,例如demo_conf
和 demo2_conf
都将引用 mqtt.localConnection
的连接配置。
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
2
3
4
5
6
7
8
9
10
11
基于 demo_conf
和 demo2_conf
分别创建两个数据流 demo
和 demo2
:
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
2
3
4
5
6
7
8
当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。在这里 DATASOURCE
对应 mqtt 订阅的 topic,配置项中的 qos
将用作订阅时的 Qos
。在以上示例配置中,demo
以 Qos 0 订阅 topic test/
,demo2
以 Qos 0 订阅 topic test2/
。
提示
对于MQTT源,如果两个流具有相同的 DATASOURCE
但 qos
值不同,则只有先启动的规则才会触发订阅。
# 批量配置
eKuiper 提供了 Memory、File、MQTT 等多种数据连接器。为进一步简化用户的配置流程,eKuiper 通过 REST API 引入了批量配置功能,支持用户同时导入或导出多个配置。
示例
{
"streams": { ... },
"tables": { ... },
"rules": { ... },
"nativePlugins": { ... },
"portablePlugins": { ... },
"sourceConfig": { ... },
"sinkConfig": { ... },
...
}
2
3
4
5
6
7
8
9
10
具体操作步骤,可参考 数据导入导出管理。