Skip to content

Connectors

In the realm of stream processing, the ability to seamlessly interact with various data sources and sinks is of paramount importance. eKuiper, being a lightweight edge stream processing engine, recognizes this necessity and incorporates it through the use of "connectors."

Connectors in eKuiper serve as the bridge between the processing engine and external systems, including databases, message brokers, or other data stores. By leveraging connectors, eKuiper can ingest data from diverse sources, process it in real-time, and then dispatch the processed data to the desired destinations. This ensures that eKuiper can be integrated into a wide variety of environments and use cases, from IoT edge devices to cloud-based infrastructures. They are categorized into two primary types:

  • Source Connectors: Responsible for ingesting data into the eKuiper platform from various external sources.
  • Sink Connectors: Handle the dispatch of processed data from eKuiper to external sinks or endpoints.

The architecture of eKuiper provides both built-in connectors, which cater to common data sources and sinks, and plugin-based connectors, allowing for extensibility and integration with custom or third-party systems. Proper configuration and management of these connectors are crucial for the efficient and reliable operation of the eKuiper platform.

This chapter details the configuration, usage, and best practices associated with both source and sink connectors in eKuiper.

Source Connectors

Source connectors in eKuiper are designed to facilitate data ingestion from various external sources into the platform. Within the eKuiper framework, each source can operate in one of two distinct modes: a "Streaming" mode, where data events are streamed sequentially, or a "Reference" mode (often used in the context of 'tables'), where specific external content is fetched based on queries. To leverage these capabilities, users simply integrate these sources into their streams or tables, specifying the desired source type and fine-tuning behavior through configurable attributes.

Built-in Source Connectors

Below are the built-in source connectors provided by eKuiper:

Plugin-based Source Connectors

For scenarios where custom data sources or specific third-party integrations are needed, eKuiper offers the flexibility of plugin-based source connectors:

Sink Connectors

Sink connectors handle the task of dispatching the processed data from eKuiper to various external endpoints or sinks. These sinks can directly interface with platforms like MQTT, Neuron, and EdgeX, among others, while also offering cache mechanisms to handle network interruptions and ensure data consistency. Additionally, users have the flexibility to customize sink behaviors through dynamic properties and resource reuse, streamlining integration and improving scalability.

Similar to source connectors, sink connectors are also categorized into built-in and plugin-based types.

Built-in Sink Connectors

Below are the built-in sink connectors provided by eKuiper:

  • MQTT sink: A sink to external MQTT broker.
  • Neuron sink: A sink to the local neuron instance.
  • EdgeX sink: A sink to EdgeX Foundry. This sink only exists when enabling the edgex build tag.
  • Rest sink: A sink to external HTTP server.
  • Redis sink: A sink to Redis.
  • File sink: A sink to a file.
  • Memory sink: A sink to eKuiper memory topic to form rule pipelines.
  • Log sink: A sink to log, usually for debugging only.
  • Nop sink: A sink to nowhere. It is used for performance testing now.

Plugin-based Sink Connectors

For specialized data dispatch requirements or integrations with particular platforms, eKuiper supports plugin-based sink connectors:

Data Templates in Sink Connectors

Data templates in eKuiper allow for "secondary processing" of analysis results to cater to the diverse formatting requirements of different sink systems. Utilizing the Golang template system, eKuiper provides mechanisms for dynamic data transformation, conditional outputs, and iterative processing. This ensures compatibility and precise formatting for various sinks.

Connection Selector

The connector selector is a powerful feature in eKuiper that allows users to define a connection once and reuse it across multiple configurations. It ensures efficient connection management and reduces redundancy.

To define a global connection configuration, use the connectionSelector key to name your connection, e.g., mqtt.localConnection. Override global configurations with custom configurations but reference the same connectionSelector.

For example, consider the configurations demo_conf and demo2_conf:

yaml
#Override the global configurations
demo_conf: #Conf_key
  qos: 0
  connectionSelector: mqtt.localConnection
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

#Override the global configurations
demo2_conf: #Conf_key
  qos: 0
  connentionSelector: mqtt.localConnection
  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

Both configurations reference the same connectionSelector, indicating that they utilize the same MQTT connection. When streams demo and demo2 are defined based on these configurations:

text
demo (
    ...
  ) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");

demo2 (
    ...
  ) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");

They inherently share the MQTT connection. Specifically:

  • The stream demo subscribes to the MQTT topic test/ with a QoS of 0.
  • The stream demo2 subscribes to test2/, also with a QoS of 0.

TIP

For MQTT sources, if two streams have the same DATASOURCE but differing qos values, only the rule started first will trigger a subscription.

Configuration

The actual connection profiles, like mqtt.localConnection, are usually defined in a separate file, such as connections/connection.yaml.

Example

yaml
mqtt:
  localConnection: #connection key
    server: "tcp://127.0.0.1:1883"
    username: ekuiper
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3
    clientid: ekuiper
  cloudConnection: #connection key
    server: "tcp://broker.emqx.io:1883"
    username: user1
    password: password
    #certificationPath: /var/kuiper/xyz-certificate.pem
    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
    #insecureSkipVerify: false
    #protocolVersion: 3

Batch Configuration

For advanced data stream processing, eKuiper offers an array of connectors like Memory, File, MQTT, and more. To streamline the integration, eKuiper’s REST API introduces the capability for batch configuration, allowing users to simultaneously import or export multiple configurations.

Example

json
{
    "streams": { ... },
    "tables": { ... },
    "rules": { ... },
    "nativePlugins": { ... },
    "portablePlugins": { ... },
    "sourceConfig": { ... },
    "sinkConfig": { ... },
    ...
}

More details can be found at Data Import/Export Management