Connection Management
Sources and Sinks are used for interacting with external systems, which involve connecting to external resources. This chapter primarily explains how eKuiper manages connections.
Connection Types
Different connection types vary in complexity. For example, MQTT long connections require attention to connection status, and there may be disconnections during rule execution that necessitate automatic reconnection. On the other hand, HTTP connections are stateless by default, making their state management simpler. To unify the management of complex connection resources, including creation, reuse, automatic reconnection, and obtaining connection status, eKuiper v2 introduced an internal connection pool component and adapted a series of connection types:
- MQTT Connection
- Neuron Connection
- EdgeX Connection
- SQL Connection
- HTTP Connection (including REST sink, HTTP Pull source, and HTTP push source connections)
- WebSocket Connection
Other connection types may be gradually integrated in subsequent versions. Connection types integrated into the connection pool can be independently created via API and accessed.
In eKuiper, the lifecycle management of various connections is divided into three types:
Connection Attached to Rule: By default, connections are managed by the Source/Sink implementation itself, with their lifecycle controlled by the rule in use. The connection resources will only start connecting when the rule is started; when the rule ends, the connection will be closed. In the example below, we create a data stream
memStream
of typememory
. Since this type is not integrated into the connection pool, the connection will only be established when a rule using this stream is started.sqlcreate stream memStream () WITH (TYPE="mqtt", DATASOURCE="demo")
Anonymous Connection Resource Managed by Connection Pool: Some connection types are adapted to the connection pool management interface, with their lifecycle managed by the connection pool. When a rule containing these types of connections is started, the rule will obtain an anonymous (actual resource id generated by the rule and not shared) resource from the connection pool. In the example below, we create a data stream
mqttStream
of typemqtt
. The connection is anonymous, and since this type is adapted to the connection pool, we can retrieve this connection via the connection API. When the rule is deleted, the corresponding connection will also be deleted.sqlcreate stream mqttStream () WITH (TYPE="mqtt", DATASOURCE="demo")
User-Created Connection Resource: Users can add, delete, modify, and query resources via the Connection Management API. Resources created via the API must specify a unique id, which can be referenced in rules. Note: Only connection resources adapted to the connection pool can be managed via the API. Connections created in this way are independent physical connections that run immediately after creation and do not depend on rules. They can be shared by multiple rules or multiple sources/sinks.
Note: User-created connections are physical connections that will automatically reconnect until the connection is successful.
Connection Reuse
User-created connection resources can run independently, and multiple rules can reference this named resource. Connection reuse is configured through the connectionSelector
configuration item. Users only need to create the connection resource once for reuse, improving connection management efficiency and simplifying the configuration process.
Create Resource: As shown in the example below, create a connection with id
mqttcon1
via the API. The connection parameters required can be configured inprops
. After the connection is successfully created,mqttcon1
can be found in the connection list API.shellPOST http://localhost:9081/connections { "id": "mqttcon1" "typ":"mqtt", "props": { server: "tcp://127.0.0.1:1883" } }
Use in Data Source: When configuring the MQTT source (
$ekuiper/etc/mqtt_source.yaml
), you can reference the above connection configuration viaconnectionSelector
, for example, bothdemo_conf
anddemo2_conf
will reference the connection configuration ofmqttcon1
.
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
Based on demo_conf
and demo2_conf
, create two data streams demo
and demo2
:
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
When corresponding rules reference the above data streams, the source parts of the rules will share the connection. Here, DATASOURCE
corresponds to the MQTT subscription topic, and the qos
in the configuration item will be used as the Qos
for subscription. In the example configuration above, demo
subscribes to topic test/
with Qos 0, and demo2
subscribes to topic test2/
with Qos 0.
TIP
For MQTT sources, if two streams have the same DATASOURCE
but different qos
values, only the rule that starts first will trigger the subscription.
You can also reuse the defined connection resource in the rule's action via connentionSelector
.
Connection Status
Connection status is divided into three types:
- Connected, represented by 1 in metrics.
- Connecting, represented by 0 in metrics.
- Disconnected, represented by 1 in metrics.
Users can retrieve the connection status via the connection API. Additionally, users can view the connection status in the rule's source/sink metrics, for example, the source_demo_0_connection_status
metric indicates the connection status of the demo
stream. For a complete list of supported connection metrics, please refer to the Metrics List.