Sink Connectors
In the eKuiper source code, there are built-in sinks and sinks in extension.
Built-in Sinks
Users can directly use the built-in sinks in the standard eKuiper instance. The list of built-in sinks is as follows:
- MQTT sink: sink to external MQTT broker.
- Neuron sink: sink to the local neuron instance.
- EdgeX sink: sink to EdgeX Foundry. This sink only exists when enabling the edgex build tag.
- Rest sink: sink to external HTTP server.
- Redis sink: sink to Redis.
- RedisSub sink: sink to redis channel.
- File sink: sink to a file.
- Memory sink: sink to eKuiper memory topic to form rule pipelines.
- Log sink: sink to log, usually for debugging only.
- Nop sink: sink to nowhere. It is used for performance testing now.
Predefined Sink Plugins
We have developed some official sink plugins. These plugins can be found in eKuiper's source code and users need to build them manually. Please check each sink about how to build and use.
Additionally, these plugins have pre-built binaries for the mainstream cpu architecture such as AMD or ARM. The pre-built plugin hosted in https://packages.emqx.net/kuiper-plugins/$version/$os/sinks/$type_$arch.zip
. For example, to get tdengine sink for debian amd64, install it from https://packages.emqx.net/kuiper-plugins/1.4.4/debian/sinks/tdengine_amd64.zip
.
The list of predefined sink plugins:
- InfluxDB sink: sink to InfluxDB
v1.x
. - InfluxDBV2 sink: sink to InfluxDB
v2.x
. - Image sink: sink to an image file. Only used to handle binary results.
- Zero MQ sink: sink to Zero MQ.
- Kafka sink: sink to Kafka.
Updatable Sink
By default, sinks append data to the external system. Some external system such as SQL DB is updatable which allows to update or delete data. Similar to lookup source, only a few sinks are "updatable" naturally. The sink must support insert, update and delete. The shipped updatable sinks include:
- Memory sink
- Redis sink
- SQL sink
To activate the update feature, the sink must set the rowkindField
property to specify which field in the data represents to action to take. In the below example, rowkindField
is set to action
.
{"redis": {
"addr": "127.0.0.1:6379",
"dataType": "string",
"field": "id",
"rowkindField": "action",
"sendSingle": true
}}
The data ingested must have a field to indicate the update action. In the below example, the action
field is the action to perform. The actions could be insert
, update
, upsert
and delete
. The action implementation varies between sinks. Some sinks may perform the same action for insert, upsert and update.
{"action":"update", "id":5, "name":"abc"}
This message will update the data of id 5 to the new name.
Common Properties
Each sink has its own property set based on the common properties.
Each action can define its own properties. There are several common properties:
property name | Type & Default Value | Description |
---|---|---|
bufferLength | int: 1024 | Specify how many messages can be buffered in memory. If the buffered messages exceed the limit, the sink will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. |
omitIfEmpty | bool: false | If the configuration item is set to true, when SELECT result is empty, then the result will not feed to sink operator. |
sendSingle | bool: false | The output messages are received as an array. This is indicate whether to send the results one by one. If false, the output message will be {"result":"${the string of received message}"} . For example, {"result":"[{\"count\":30},"\"count\":20}]"} . Otherwise, the result message will be sent one by one with the actual field name. For the same example as above, it will send {"count":30} , then send {"count":20} to the RESTful endpoint.Default to false. |
dataTemplate | string: "" | The golang template format string to specify the output data format. The input of the template is the sink message which is always an array of map. If no data template is specified, the raw input will be the data. Please check data template for detail. |
format | string: "json" | The encode format, could be "json" or "protobuf". For "protobuf" format, "schemaId" is required and the referred schema must be registered. |
schemaId | string: "" | The schema to be used to encode the result. |
delimiter | string: "," | Only effective when using delimited format, specify the delimiter character, default is commas. |
fields | []string: nil | The fields used to select the output message. For example, the result of an sql query is {"temperature": 31.2, "humidity": 45} and the fields property is ["humidity"] , then the result message is {"humidity": 45} . It is recommended that you do not configure both the dataTemplate property and the fields property. If the two properties are configured at the same time, the output data is obtained first according to the dataTemplate property and then the final result is obtained through the fields property. |
dataField | string: "" | The field string to specify which data to extract. To understand the relationship between dataTemplate, fields, and dataField, consider the following example. The first step is to retrieve the output information based on the dataTemplate. Let's assume the result is {"tele":{"humidity": 80.2, "temperature": 31.2, "id": 1}, "id": 1}. If the dataField is set to "tele", the result is {"humidity": 80.2, "temperature": 31.2, "id": 1}. Finally, the output information is filtered according to the fields parameter. For instance, if fields=["humidity", "temperature"], then the resulting output is {"humidity": 80.2, "temperature": 31.2}. |
enableCache | bool: default to global definition | whether to enable sink cache. cache storage configuration follows the configuration of the metadata store defined in etc/kuiper.yaml |
memoryCacheThreshold | int: default to global definition | the number of messages to be cached in memory. For performance reasons, the earliest cached messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due to failures such as power outages. |
maxDiskCache | int: default to global definition | The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache. |
bufferPageSize | int: default to global definition | buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost. |
resendInterval | int: default to global definition | The time interval to resend information after failure recovery to prevent message storms. |
cleanCacheAtStop | bool: default to global definition | whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped. Otherwise, the memory and disk rules will be cleared out. |
resendAlterQueue | bool: default to global definition | whether to use the alternate queue when resending the cache. If set to true, the cache will be sent to the alternate queue instead of the original queue. This will result in real-time messages and resend messages being sent using different queues and the order of the messages will change. The following resend-related configurations will only take effect if set to true. |
resendPriority | int: default to global definition | resend cached priority, int type, default is 0. -1 means resend real-time data first; 0 means equal priority; 1 means resend cached data first. |
resendIndicatorField | string: default to global definition | field name of the resend cache, the field type must be a bool value. If the field is set, it will be set to true when resending. e.g., if resendIndicatorField is resend , then the resend field will be set to true when resending the cache. |
resendDestination | string: default "" | the destination to resend the cache to, which may have different meanings or support depending on the sink. For example, the mqtt sink can send the resend data to a different topic. The supported sinks are listed in sinks with resend destination support. |
batchSize | int: 0 | Specify the number of buffered messages before sending. The sink will block sending messages until the number of buffered messages is equal to this value, then the messages will be sent at one time. batchSize treats the data for []map as multiple messages. |
lingerInterval | int 0 | Specify the interval time for buffer messages before seding, the unit is millisecond. The sink will block sending messages until the buffer sending interval reaches this value. lingerInterval can be used together with batchSize to trigger sending when any condition is met. |
compression | string: "" | Sets the data compression algorithm. Only effective when the sink is of a type that sends bytecode. Supported compression methods are "zlib", "gzip", "flate", "zstd". |
encryption | string: "" | Sets the data encryption algorithm. Only effective when the sink is of a type that sends bytecode. Currently, only the AES algorithm is supported. |
Dynamic properties
In the sink, it is common to fetch a property value from the result data to achieve dynamic output. For example, to write data into a dynamic topic of mqtt. The dynamic properties will be parsed as a data template. In below example, the sink topic is gotten from the selected topic using data template.
{
"id": "rule1",
"sql": "SELECT topic FROM demo",
"actions": [{
"mqtt": {
"sendSingle": true,
"topic": "prefix/{{.topic}}"
}
}]
}
In the above example, sendSingle
property is used, so the sink data is a map by default. If not using sendSingle
, you can get the topic by index with data template {{index . 0 "topic"}}
.
Caching
Sinks are used to send processing results to external systems. There are situations where the external system is not available, especially in edge-to-cloud scenarios. For example, in a weak network scenario, the edge-to-cloud network connection may be disconnected and reconnected from time to time. Therefore, sinks provide caching capabilities to temporarily store data in case of recoverable errors and automatically resend the cached data after the error is recovered. Sink's cache can be divided into two levels of storage, namely memory and disk. The user can configure the number of memory cache entries and when the limit is exceeded, the new cache will be stored offline to disk. The cache will be stored in both memory and disk so that the cache capacity becomes larger; it will also continuously detect the failure state and resend without restarting the rule.
The storage location of the offline cache is determined by the storage configuration in etc/kuiper.yaml
, which defaults to sqlite. If the disk storage is sqlite, all caches will be saved to the data/cache.db
file. Each sink will have a unique sqlite table to hold the cache. The number of caches is added to the buffer length section of the sink's metrics.
Flow
Each sink can configure its own caching mechanism. The caching process is the same for each sink. If caching is enabled, all sink's events go through two phases: first, saving all content to the cache; then deleting the cache after receiving an ack.
- Error detection: After a failed send, sink should identify recoverable failures (network, etc.) by returning a specific error type, which will return a failed ack so that the cache can be retained. For successful sends or unrecoverable errors, a successful ack will be sent to delete the cache.
- Cache mechanism: The cache will first be kept in memory. If the memory threshold is exceeded, the later cache will be saved to disk. Once the disk cache exceeds the disk storage threshold, the cache will start to rotate, i.e. the earliest cache in memory will be discarded and the earliest cache on disk will be loaded instead.
- Resend policy: Currently the caching mechanism can only run in the default synchronous mode, where if a message is being sent, it will wait for the sending result to continue sending the next cached data. Otherwise, when new data arrives, the first data in the cache is sent to detect network conditions. If the sending result is successful, all caches in memory and on disk are sent in a sequential chain. Chained sends can define a send interval to prevent message storms.
- Separation of normal data and retransmission data: Users can configure retransmission data and normal data to be sent separately to different destinations. It is also possible to configure the priority of sending. For example, send normal data with higher priority. You can even change the content of the retransmission data. For example, add a field to the retransmission data in order to distinguish it at the receiving end.
Configuration
There are two levels of configuration for the Sink cache. A global configuration in etc/kuiper.yaml
that defines the default behavior of all rules. There is also a rule sink level definition to override the default behavior. The global configuration properties are as follows:
- enableCache: whether to enable sink cache. cache storage configuration follows the configuration of the metadata store defined in
etc/kuiper.yaml
. - memoryCacheThreshold: the number of messages to be cached in memory. For performance reasons, the earliest cached messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due to failures such as power outages.
- maxDiskCache: The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache.
- bufferPageSize. buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost.
- resendInterval: The time interval to resend information after failure recovery to prevent message storms.
- cleanCacheAtStop: whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped. Otherwise, the memory and disk rules will be cleared out.
- resendAlterQueue: whether to use the alternate queue when resending the cache. If set to true, the cache will be sent to the alternate queue instead of the original queue. This will result in real-time messages and resend messages being sent using different queues and the order of the messages will change. The following resend-related configurations will only take effect if set to true.
- resendPriority: resend cached priority, int type, default is 0. -1 means resend real-time data first; 0 means equal priority; 1 means resend cached data first.
- resendIndicatorField: field name of the resend cache, the field type must be a bool value. If the field is set, it will be set to true when resending. e.g., if resendIndicatorField is
resend
, then theresend
field will be set to true when resending the cache.
In the following example configuration of the rule, log sink has no cache-related options configured, so the global default configuration will be used; whereas mqtt sink performs its own caching policy configuration.
{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [{
"log": {},
"mqtt": {
"server": "tcp://127.0.0.1:1883",
"topic": "result/cache",
"qos": 0,
"enableCache": true,
"memoryCacheThreshold": 2048,
"maxDiskCache": 204800,
"bufferPageSize": 512,
"resendInterval": 10
}
}
]
}
Sinks with Resend Destination Support
Not all sinks support resending to alternate destinations. Currently, only the following sinks support resending to alternate destinations:
- MQTT sink: the
resendDesitination
property means the topic to resend to. If not set, the original topic will be used. - REST sink: the
resendDestination
property means the url to resend to. If not set, the original url will be used. - Memory sink: the
resendDestination
property means the topic to resend to. If not set, the original topic will be used.
For customized sinks, you can implement CollectResend
function to customized resend strategy. Please check customize resend strategy for details.
Resource Reuse
Like sources, actions also support configuration reuse. Users only need to create a yaml file with the same name as the target action in the sinks folder and write the configuration in the same form as the source.
For example, for the MQTT action scenario, the user can create the mqtt.yaml file in the sinks directory and write the following content
test:
qos: 1
server: "tcp://broker.emqx.io:1883"
When users need MQTT actions, in addition to the traditional configuration method, as shown below
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"qos": 1,
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}
Can also use the resourceId
reference form with the following configuration
{
"mqtt": {
"resourceId": "test",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}
Runtime Nodes
When users create rules, the Sink is a logical node. Depending on the type of the Sink itself and the user's configuration, each Sink at runtime may generate an execution plan consisting of multiple nodes. The Sink property configuration items are numerous, and the logic during actual runtime is quite complex. By breaking down the execution plan into multiple nodes, the following benefits are primarily achieved:
- There are many shared properties and implementation logic among various Sinks, such as data format encoding. Splitting the shared property implementation into independent runtime nodes facilitates node reuse, simplifies the implementation of Sink nodes (Single Responsibility Principle), and improves the maintainability of nodes.
- The properties of the Sink include time-consuming calculations, such as compression and encoding. With a single node's metrics, it is difficult to distinguish the actual execution status of sub-tasks when the Sink is executed. After splitting the nodes, finer-grained runtime metrics can be supported to understand the status and latency of each sub-task.
- After sub-task splitting, parallel computation of sub-tasks can be implemented, improving the overall efficiency of rule execution.
Execution Plan Splitting
The physical execution plan of the Sink node can be split into:
Batch --> Transform --> Encode --> Compress --> Encrypt --> Cache --> Connect
The rules for splitting are as follows:
- Batch: Configured with
batchSize
and/orlingerInterval
. This node is used to accumulate batches, sending received data to subsequent nodes according to batch configuration. - Transform: Configured with
dataTemplate
ordataField
orfields
or other shared properties that require data format conversion. This node is used to implement various transformation properties. - Encode: Applicable when the Sink is of a type that sends bytecode (such as MQTT, which can send arbitrary bytecode. SQL sinks with their own formats are not of this type) and the
format
property is configured. This node will serialize the data based on the format and related schema configuration. - Compress: Applicable when the Sink is of a type that sends bytecode and the
compression
property is configured. This node will compress the data according to the configured compression algorithm. - Encrypt: Applicable when the Sink is of a type that sends bytecode and the
encryption
property is configured. This node will encrypt the data according to the configured encryption algorithm. - Cache: Configured with
enableCache
. This node is used to implement data caching and retransmission. For detailed information, please refer to Caching. - Connect: A node that is necessarily implemented for each Sink. This node is used to connect to external systems and send data.