# Source Extension

Sources feed data into eKuiper from other systems. eKuiper has built-in source support for MQTT broker. There are still needs to consume data from various external systems include messaging systems and data pipelines etc. Source extension is presented to meet this requirement.

# Developing

There are two kinds of sources. One is the normal source also named scan source, the other is the lookup source. A normal source can be used as a stream or scan table; A lookup source can be used as a lookup table. Users can develop one kind or both in a source plugin.

# Develop a source

To develop a source for eKuiper is to implement api.Source (opens new window) interface and export it as a golang plugin.

Before starting the development, you must setup the environment for golang plugin.

To develop a source, the Configure method must be implemented. This method will be called once the source is initialized. In this method, you can retrieve the DATASOURCE property of the stream (which is topic for mqtt and other messaging system) from the first parameter. Then in the second parameter, a map that contains the configuration in your yaml file is passed. See configuration for more detail. Typically, there will be information such as host, port, user and password of the external system. You can use this map to initialize this source.

//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties read from the yaml
Configure(datasource string, props map[string]interface{}) error
1
2

The main task for a Source is to implement open method. The implementation should be synchronized to create a connection to the external system. Then continuously receive data from the external system and send the received message to the consumer channel. The consumer channel accepts SourceTuple interface which is composed by a map for the message body and another map for the optional metadata. There are two methods available to help developers create SourceTuple objects: api.NewDefaultSourceTuple(message, meta) and api.NewDefaultSourceTupleWithTime(message, meta, time). The difference between these two methods is that the former creates a SourceTuple object with a timestamp generated at the time of the function call, while the latter creates a SourceTuple object with a timestamp that can be specified by the user at the time of the call. The meta data could be anything that worth to be recorded. For example, the qualified topic of the message. The first parameter is a StreamContext pointer. You can retrieve the context information and logger etc. from it. It is also an implementation of go context, so you can listen to Done() channel to know if the parent stream has quit. For any errors happening during the connection or receiving, handle it in this method. If the error cannot be handled, send it to the errCh. By default, the rule will be terminated if any errors received from errCh.

//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
1
2

The last method to implement is Close which literally close the connection. It is called when the stream is about to terminate. You could also do any clean up work in this function.

Close(ctx StreamContext) error
1

As the source itself is a plugin, it must be in the main package. Given the source struct name is mySource. At last of the file, the source must be exported as a symbol as below. There are 2 types of exported symbol supported. For source extension, states are usually needed, so it is recommended to export a constructor function.

function MySource() api.Source{
    return &mySource{}
}
1
2
3

The Random Source (opens new window) is a good example.

# Develop a lookup source

To develop a lookup source for eKuiper is to implement api.LookupSource (opens new window) interface and export it.

Before starting the development, you must setup the environment for golang plugin.

To develop a lookup source, the Configure method must be implemented. This method will be called once the source is initialized. In this method, you can retrieve the DATASOURCE property of the stream (which is topic for mqtt and other messaging system) from the first parameter. Then in the second parameter, a map that contains the configuration in your yaml file is passed. See configuration for more detail. Typically, there will be information such as host, port, user and password of the external system. You can use this map to initialize this source.

//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties read from the yaml
Configure(datasource string, props map[string]interface{}) error
1
2

The next task is to implement open method. The method will be called once the source is created. It is responsible for initialization like establish the connection.

// Open creates the connection to the external data source
Open(ctx StreamContext) error
1
2

The main task for a Source is to implement Lookup method. The method will be run for each join operation. The parameters are gotten at runtime about the fields, keys and values to be retrieved from the external system. Each lookup source have a different lookup mechanism. For example, the SQL lookup source will build a SQL query from these parameters to retrieve the lookup data.

// Lookup receive lookup values to construct the query and return query results
Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
1
2

The last method to implement is Close which literally close the connection. It is called when the stream is about to terminate. You could also do any clean up work in this function.

Close(ctx StreamContext) error
1

As the source itself is a plugin, it must be in the main package. The exported name must end with Lookup so that it can be refereed as the source named MySource. For source extension, states are usually needed, so it is recommended to export a constructor function.

function MySourceLookup() api.LookupSource{
    return &mySource{}
}
1
2
3

The SQL Lookup Source (opens new window) is a good example.

# Rewindable source

If the rule checkpoint is enabled, the source requires to be rewindable. That means the source need to implement both api.Source and api.Rewindable interface.

A typical implementation is to save an offset as a field of the source. And update the offset value when reading in new value. Notice that, when implementing GetOffset() will be called by eKuiper system which means the offset value can be accessed by multiple go routines. So a lock is required when read or write the offset.

# Deal with configuration

eKuiper configurations are formatted as yaml and it provides a centralize location /etc to hold all the configurations. Inside it, a subfolder sources is provided for the source configurations including the extended sources.

A configuration system is supported for eKuiper extension which will automatically read the configuration in yaml file and feed into the Configure method of the source. If the CONF_KEY property is specified in the stream, the configuration of that key will be fed. Otherwise, the default configuration is used.

To use configuration in your source, the following conventions must be followed.

  1. The name of your configuration file must be the same as the plugin name. For example, mySource.yaml.
  2. The yaml file must be located inside etc/sources
  3. The format of the yaml file could be found here

# common configuration field

There are 2 common configuration fields.

  • concurrency to specify how many instances will be started to run the source.
  • bufferLength to specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won't increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.

# Package the source

Build the implemented source as a go plugin and make sure the output so file resides in the plugins/sources folder.

go build -trimpath --buildmode=plugin -o plugins/sources/MySource.so extensions/sources/my_source.go
1

# Usage

The customized source is specified in a stream definition. The related properties are:

  • TYPE: specify the name of the source, must be camel case.
  • CONF_KEY: specify the key of the configuration to be used.

If you have developed a source implementation MySource, you should have:

  1. In the plugin file, symbol MySource is exported.
  2. The compiled MySource.so file is located inside plugins/sources
  3. If configuration needed, put mySource.yaml inside etc/sources

To use it, define a stream:

CREATE STREAM demo (
        USERID BIGINT,
        FIRST_NAME STRING,
        LAST_NAME STRING,
        NICKNAMES ARRAY(STRING),
        Gender BOOLEAN,
        ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
    ) WITH (DATASOURCE="mytopic", TYPE="mySource", CONF_KEY="democonf");
1
2
3
4
5
6
7
8