Skip to content

Connection Management

Sources and Sinks are used for interacting with external systems, which involve connecting to external resources. This chapter primarily explains how eKuiper manages connections.

Connection Types

Different connection types vary in complexity. For example, MQTT long connections require attention to connection status, and there may be disconnections during rule execution that necessitate automatic reconnection. On the other hand, HTTP connections are stateless by default, making their state management simpler. To unify the management of complex connection resources, including creation, reuse, automatic reconnection, and obtaining connection status, eKuiper v2 introduced an internal connection pool component and adapted a series of connection types:

  • MQTT Connection
  • Neuron Connection
  • EdgeX Connection
  • SQL Connection
  • HTTP Connection (including REST sink, HTTP Pull source, and HTTP push source connections)
  • WebSocket Connection

Other connection types may be gradually integrated in subsequent versions. Connection types integrated into the connection pool can be independently created via API and accessed.

In eKuiper, the lifecycle management of various connections is divided into three types:

  1. Connection Attached to Rule: By default, connections are managed by the Source/Sink implementation itself, with their lifecycle controlled by the rule in use. The connection resources will only start connecting when the rule is started; when the rule ends, the connection will be closed. In the example below, we create a data stream memStream of type memory. Since this type is not integrated into the connection pool, the connection will only be established when a rule using this stream is started.

    sql
    create stream memStream () WITH (TYPE="mqtt", DATASOURCE="demo")
  2. Anonymous Connection Resource Managed by Connection Pool: Some connection types are adapted to the connection pool management interface, with their lifecycle managed by the connection pool. When a rule containing these types of connections is started, the rule will obtain an anonymous (actual resource id generated by the rule and not shared) resource from the connection pool. In the example below, we create a data stream mqttStream of type mqtt. The connection is anonymous, and since this type is adapted to the connection pool, we can retrieve this connection via the connection API. When the rule is deleted, the corresponding connection will also be deleted.

    sql
    create stream mqttStream () WITH (TYPE="mqtt", DATASOURCE="demo")
  3. User-Created Connection Resource: Users can add, delete, modify, and query resources via the Connection Management API. Resources created via the API must specify a unique id, which can be referenced in rules. Note: Only connection resources adapted to the connection pool can be managed via the API. Connections created in this way are independent physical connections that run immediately after creation and do not depend on rules. They can be shared by multiple rules or multiple sources/sinks.

Note: User-created connections are physical connections that will automatically reconnect until the connection is successful.

Connection Reuse

User-created connection resources can run independently, and multiple rules can reference this named resource. Connection reuse is configured through the connectionSelector configuration item. Users only need to create the connection resource once for reuse, improving connection management efficiency and simplifying the configuration process.

  1. Create Resource: As shown in the example below, create a connection with id mqttcon1 via the API. The connection parameters required can be configured in props. After the connection is successfully created, mqttcon1 can be found in the connection list API.

    shell
    POST http://localhost:9081/connections
    {
      "id": "mqttcon1"
      "typ":"mqtt",
      "props": {
        server: "tcp://127.0.0.1:1883"
      }
    }
  2. Use in Data Source: When configuring the MQTT source ($ekuiper/etc/mqtt_source.yaml), you can reference the above connection configuration via connectionSelector, for example, both demo_conf and demo2_conf will reference the connection configuration of mqttcon1.

yaml
#Override the global configurations
demo_conf: #Conf_key
  qos: 0
  connectionSelector: mqttcon1
  servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]

#Override the global configurations
demo2_conf: #Conf_key
  qos: 0
  connentionSelector: mqttcon1
  servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]

Based on demo_conf and demo2_conf, create two data streams demo and demo2:

text
demo (
    ...
  ) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");

demo2 (
    ...
  ) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");

When corresponding rules reference the above data streams, the source parts of the rules will share the connection. Here, DATASOURCE corresponds to the MQTT subscription topic, and the qos in the configuration item will be used as the Qos for subscription. In the example configuration above, demo subscribes to topic test/ with Qos 0, and demo2 subscribes to topic test2/ with Qos 0.

TIP

For MQTT sources, if two streams have the same DATASOURCE but different qos values, only the rule that starts first will trigger the subscription.

You can also reuse the defined connection resource in the rule's action via connentionSelector.

Connection Status

Connection status is divided into three types:

  1. Connected, represented by 1 in metrics.
  2. Connecting, represented by 0 in metrics.
  3. Disconnected, represented by 1 in metrics.

Users can retrieve the connection status via the connection API. Additionally, users can view the connection status in the rule's source/sink metrics, for example, the source_demo_0_connection_status metric indicates the connection status of the demo stream. For a complete list of supported connection metrics, please refer to the Metrics List.