# 数据连接

在流式数据处理中,与数据源和目标端点的无缝集成能力非常关键。作为一款轻量级的边缘流处理引擎,eKuiper通过连接器实现了与外部系统的数据交互,如数据库、消息中间件等。

eKuiper 内置各类连接器并支持用户以插件的形式扩展连接器,因此能从各种来源获取数据,对其进行实时处理,并将处理结果推送至指定系统(数据 Sink),因此能够轻松融入从 IoT 边缘设备到云计算基础设施的各种场景。

eKuiper 连接器主要分两类:

  • 数据源连接器:负责从各类外部数据源中导入数据至 eKuiper。
  • 数据 Sink 连接器:负责将 eKuiper 处理后的数据输出至外部系统。

本章将介绍 eKuiper 中各类连接器的配置、使用以及最佳实践。

# 数据源连接器

eKuiper 数据源连接器旨在从各种外部源导入数据到平台。在 eKuiper 中,用户只需将这些数据源集成到他们的数据流或表中,即可将相关数据导入 eKuiper 并执行查询或数据转换等操作。eKuiper 还提供了丰富的配置选项,方便满足用户的各类数据处理需求。

内置源连接器:

eKuiper 内置以下数据源连接器:

插件式源连接器 对于需要自定义数据源或与特定第三方集成的场景,eKuiper 提供了基于插件的拓展源连接器:

  • SQL 源:定期从关系数据库中拉取数据。
  • 视频源:用于查询视频流。
  • Random 源:用于生成随机数据的源,用于测试。
  • Zero MQ 源:从 Zero MQ 读取数据。

# 数据 Sink 连接器

eKuiper Sink 连接器负责将 eKuiper 处理后的数据发送到各种目标端点或系统,可直接与 MQTT、Neuron、EdgeX 等平台对接,并提供缓存机制以应对网络中断场景,确保数据的一致性。此外,用户还可通过动态属性和资源重用来定制接收行为,简化集成并提高可伸缩性。

与源连接器类似,Sink 连接器也分为内置和插件式两种。

内置 Sink 连接器

以下是 eKuiper 提供的内置 Sink 连接器:

插件式 Sink 连接器

对于特殊的数据分发或特定平台集成需求,eKuiper 支持基于插件的 Sink 连接器:

# 数据模板

eKuiper 数据模板 支持用户对分析结果进行"二次处理",以满足不同接收系统的多样化格式要求。利用 Golang 模板系统,eKuiper 提供了动态数据转换、条件输出和迭代处理的机制,确保了与各种接收器的兼容性和精确格式化。

# 连接器的重用

eKuiper 支持通过 connectionSelector 配置项对连接器进行重用,用户只需一次定义即可在多个配置中重用,提升连接管理效率,简化配置流程。

配置

以 MQTT 数据源为例,您可首先在连接配置文件 connections/connection.yaml 中定义 MQTT 全局连接信息,例如 mqtt.localConnectionmqtt.cloudConnection

mqtt:
  localConnection: #connection key
    server: "tcp://127.0.0.1:1883"
    username: ekuiper
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3
    clientid: ekuiper
  cloudConnection: #connection key
    server: "tcp://broker.emqx.io:1883"
    username: user1
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

在配置 MQTT 源($ekuiper/etc/mqtt_source.yaml)时,可通过 connectionSelector 引用以上连接配置,例如demo_confdemo2_conf 都将引用 mqtt.localConnection 的连接配置。

#Override the global configurations
demo_conf: #Conf_key
  qos: 0
  connectionSelector: mqtt.localConnection
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

#Override the global configurations
demo2_conf: #Conf_key
  qos: 0
  connentionSelector: mqtt.localConnection
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
1
2
3
4
5
6
7
8
9
10
11

基于 demo_confdemo2_conf 分别创建两个数据流 demodemo2

demo (
    ...
  ) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");

demo2 (
    ...
  ) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");

1
2
3
4
5
6
7
8

当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。在这里 DATASOURCE 对应 mqtt 订阅的 topic,配置项中的 qos 将用作订阅时的 Qos。在以上示例配置中,demo 以 Qos 0 订阅 topic test/demo2 以 Qos 0 订阅 topic test2/

提示

对于MQTT源,如果两个流具有相同的 DATASOURCEqos 值不同,则只有先启动的规则才会触发订阅。

# 批量配置

eKuiper 提供了 Memory、File、MQTT 等多种数据连接器。为进一步简化用户的配置流程,eKuiper 通过 REST API 引入了批量配置功能,支持用户同时导入或导出多个配置。

示例

{
    "streams": { ... },
    "tables": { ... },
    "rules": { ... },
    "nativePlugins": { ... },
    "portablePlugins": { ... },
    "sourceConfig": { ... },
    "sinkConfig": { ... },
    ...
}
1
2
3
4
5
6
7
8
9
10

具体操作步骤,可参考 数据导入导出管理