# Connectors
In the realm of stream processing, the ability to seamlessly interact with various data sources and sinks is of paramount importance. eKuiper, being a lightweight edge stream processing engine, recognizes this necessity and incorporates it through the use of "connectors."
Connectors in eKuiper serve as the bridge between the processing engine and external systems, including databases, message brokers, or other data stores. By leveraging connectors, eKuiper can ingest data from diverse sources, process it in real-time, and then dispatch the processed data to the desired destinations. This ensures that eKuiper can be integrated into a wide variety of environments and use cases, from IoT edge devices to cloud-based infrastructures. They are categorized into two primary types:
- Source Connectors: Responsible for ingesting data into the eKuiper platform from various external sources.
- Sink Connectors: Handle the dispatch of processed data from eKuiper to external sinks or endpoints.
The architecture of eKuiper provides both built-in connectors, which cater to common data sources and sinks, and plugin-based connectors, allowing for extensibility and integration with custom or third-party systems. Proper configuration and management of these connectors are crucial for the efficient and reliable operation of the eKuiper platform.
This chapter details the configuration, usage, and best practices associated with both source and sink connectors in eKuiper.
# Source Connectors
Source connectors in eKuiper are designed to facilitate data ingestion from various external sources into the platform. Within the eKuiper framework, each source can operate in one of two distinct modes: a "Streaming" mode, where data events are streamed sequentially, or a "Reference" mode (often used in the context of 'tables'), where specific external content is fetched based on queries. To leverage these capabilities, users simply integrate these sources into their streams or tables, specifying the desired source type and fine-tuning behavior through configurable attributes.
Built-in Source Connectors
Below are the built-in source connectors provided by eKuiper:
- MQTT source: A source to read data from MQTT topics.
- Neuron source: A source to read data from the local neuron instance.
- EdgeX source: A source to read data from EdgeX foundry.
- HTTP pull source: A source to pull data from HTTP servers.
- HTTP push source: A source to push data to eKuiper through HTTP.
- File source: A source to read from file, usually used as tables.
- Memory source: A source to read from eKuiper memory topic to form rule pipelines.
- Redis source: A source to lookup from Redis as a lookup table.
Plugin-based Source Connectors
For scenarios where custom data sources or specific third-party integrations are needed, eKuiper offers the flexibility of plugin-based source connectors:
- SQL source: A source to periodically fetch data from SQL DB.
- Video Source: A source to query video streams.
- Random source: A source to generate random data for testing.
- Zero MQ source: A source to read data from Zero MQ.
# Sink Connectors
Sink connectors handle the task of dispatching the processed data from eKuiper to various external endpoints or sinks. These sinks can directly interface with platforms like MQTT, Neuron, and EdgeX, among others, while also offering cache mechanisms to handle network interruptions and ensure data consistency. Additionally, users have the flexibility to customize sink behaviors through dynamic properties and resource reuse, streamlining integration and improving scalability.
Similar to source connectors, sink connectors are also categorized into built-in and plugin-based types.
Built-in Sink Connectors
Below are the built-in sink connectors provided by eKuiper:
- MQTT sink: A sink to external MQTT broker.
- Neuron sink: A sink to the local neuron instance.
- EdgeX sink: A sink to EdgeX Foundry. This sink only exists when enabling the edgex build tag.
- Rest sink: A sink to external HTTP server.
- Redis sink: A sink to Redis.
- File sink: A sink to a file.
- Memory sink: A sink to eKuiper memory topic to form rule pipelines.
- Log sink: A sink to log, usually for debugging only.
- Nop sink: A sink to nowhere. It is used for performance testing now.
# Plugin-based Sink Connectors
For specialized data dispatch requirements or integrations with particular platforms, eKuiper supports plugin-based sink connectors:
- InfluxDB sink: A sink to InfluxDB
v1.x
. - InfluxDBV2 sink: A sink to InfluxDB
v2.x
. - TDengine sink: A sink to TDengine.
- Image sink: A sink to an image file. Only used to handle binary results.
- Zero MQ sink: A sink to Zero MQ.
- Kafka sink: A sink to Kafka.
# Data Templates in Sink Connectors
Data templates in eKuiper allow for "secondary processing" of analysis results to cater to the diverse formatting requirements of different sink systems. Utilizing the Golang template system, eKuiper provides mechanisms for dynamic data transformation, conditional outputs, and iterative processing. This ensures compatibility and precise formatting for various sinks.
# Connection Selector
The connector selector is a powerful feature in eKuiper that allows users to define a connection once and reuse it across multiple configurations. It ensures efficient connection management and reduces redundancy.
To define a global connection configuration, use the connectionSelector
key to name your connection, e.g., mqtt.localConnection
. Override global configurations with custom configurations but reference the same connectionSelector
.
For example, consider the configurations demo_conf
and demo2_conf
:
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
2
3
4
5
6
7
8
9
10
11
Both configurations reference the same connectionSelector
, indicating that they utilize the same MQTT connection. When streams demo
and demo2
are defined based on these configurations:
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
2
3
4
5
6
7
8
They inherently share the MQTT connection. Specifically:
- The stream
demo
subscribes to the MQTT topictest/
with a QoS of 0. - The stream
demo2
subscribes totest2/
, also with a QoS of 0.
TIP
For MQTT sources, if two streams have the same DATASOURCE
but differing qos
values, only the rule started first will trigger a subscription.
Configuration
The actual connection profiles, like mqtt.localConnection
, are usually defined in a separate file, such as connections/connection.yaml
.
Example
mqtt:
localConnection: #connection key
server: "tcp://127.0.0.1:1883"
username: ekuiper
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
clientid: ekuiper
cloudConnection: #connection key
server: "tcp://broker.emqx.io:1883"
username: user1
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Batch Configuration
For advanced data stream processing, eKuiper offers an array of connectors like Memory, File, MQTT, and more. To streamline the integration, eKuiper’s REST API introduces the capability for batch configuration, allowing users to simultaneously import or export multiple configurations.
Example
{
"streams": { ... },
"tables": { ... },
"rules": { ... },
"nativePlugins": { ... },
"portablePlugins": { ... },
"sourceConfig": { ... },
"sinkConfig": { ... },
...
}
2
3
4
5
6
7
8
9
10
More details can be found at Data Import/Export Management