Skip to content

连接管理

Source 和 Sink 用于与外部系统的交互,其中都会包含连接外部资源的动作。本章主要讲解 eKuiper 中对连接的管理。

连接类型

不同的连接类型复杂度各不相同,例如 MQTT 长连接需要关注连接的状态,规则运行中可能出现连接的断连,需要自动重连等复杂的管理;而 HTTP 连接默认为无状态的连接,状态管理较为简单。为了统一管理复杂的连接资源创建,复用,自动重连以及获取连接状态等功能,eKuiper v2 增加了内部的连接池组件,并适配了一系列连接类型:

  • MQTT 连接
  • Neuron 连接
  • EdgeX 连接
  • SQL 连接
  • HTTP 连接 (包括 REST sink,HTTP Pull source,HTTP push source 使用的连接)
  • WebSocket 连接
  • Kafka 连接

其余连接类型可能会在后续版本中陆续接入。接入连接池的连接类型可通过 API 进行资源的独立创建,并获取 API。

eKuiper 中对各种连接的生命周期的管理分为 3 种:

  1. 连接附属于规则:默认情况下,连接由 Source/Sink 的实现自行管理,其生命周期由使用的规则进行控制。规则启动时,使用到的连接资源才会开始连接;规则结束时,连接将被关闭。在下例中,我们创建了 memory 类型的数据流 memStream。由于该类型未接入连接池,只有当使用该流的规则启动时,才会进行连接。

    sql
    create stream memStream () WITH (TYPE="mqtt", DATASOURCE="demo")
  2. 连接池管理的匿名连接资源:部分连接类型适配了连接池管理接口,其生命周期由连接池管理。当启动包含这些类型连接的规则时,规则会从连接池获取匿名(实际资源 id 由规则生成,且不会被共享)资源。在下例中,我们创建了 mqtt 类型的数据流 mqttStream。连接为匿名连接,由于该类型适配了连接池,我们可以在连接 API 中获取到该连接。规则删除时,对应连接也会删除。

    sql
    create stream mqttStream () WITH (TYPE="mqtt", DATASOURCE="demo")
  3. 用户创建的连接资源:用户可通过连接管理API 进行资源的增删改查。通过 API 创建的资源必须指定唯一的 id,规则中可引用此处创建的规则资源。请注意:只有适配连接池的连接资源可通过 API 进行管理。这种连接类型创建的连接为独立的物理连接,创建完后会立即运行,无需依附于规则。它可以被多个规则,或者多个 source/sink 共用。

请注意:用户创建的连接为实体连接,会自动重连直到连接成功为止。

连接重用

用户创建的连接资源可以独立运行,多个规则可以引用该命名资源。连接重用是通过 connectionSelector 配置项进行配置。用户只需创建一次连接资源即可复用,提升连接管理效率,简化配置流程。

  1. 创建资源,如下例通过 API 创建 id 为 mqttcon1 的连接。可在 props 中配置连接需要的参数。连接创建成功后,mqttcon1 可在连接列表 API 中找到。

    shell
    POST http://localhost:9081/connections
    {
      "id": "mqttcon1"
      "typ":"mqtt",
      "props": {
        server: "tcp://127.0.0.1:1883"
      }
    }
  2. 在数据源中使用。在配置 MQTT 源($ekuiper/etc/mqtt_source.yaml)时,可通过 connectionSelector 引用以上连接配置,例如demo_confdemo2_conf 都将引用 mqttcon1 的连接配置。

yaml
#Override the global configurations
demo_conf: #Conf_key
   qos: 0
   connectionSelector: mqttcon1
   servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]

#Override the global configurations
demo2_conf: #Conf_key
   qos: 0
   connectionSelector: mqttcon1
   servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]

基于 demo_confdemo2_conf 分别创建两个数据流 demodemo2

text
demo (
    ...
  ) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");

demo2 (
    ...
  ) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");

当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。在这里 DATASOURCE 对应 mqtt 订阅的 topic,配置项中的 qos 将用作订阅时的 Qos。在以上示例配置中,demo 以 Qos 0 订阅 topic test/demo2 以 Qos 0 订阅 topic test2/

TIP

对于MQTT源,如果两个流具有相同的 DATASOURCEqos 值不同,则只有先启动的规则才会触发订阅。

也可以在规则的 action 中,通过 connectionSelector 重用定义的连接资源。

对于 Kafka sink,可以通过 connectionSelector 重用 Kafka 连接资源。Kafka 连接用于管理连接状态,并通过 ping 配置的 broker 来验证连通性。当 Kafka sink 引用该连接时,brokers、SASL、TLS 等连接相关配置会从选中的连接中复制。sink 仍会创建自己的 Kafka producer 用于发送消息。

连接状态

连接状态分成 3 种:

  1. 已连接,指标中用 1 表示。
  2. 连接中,指标中用 0 表示。
  3. 未连接,指标中用 1 表示。

用户可通过连接 API 获取连接的状态。同时,用户也可通过规则的指标查看规则 source/sink 中连接的状态,例如 source_demo_0_connection_status 指标表示 demo 流的连接状态。所有支持的连接指标请查看指标列表