源( Source )扩展
源将数据从其他系统反馈到 eKuiper。eKuiper 支持 MQTT 消息服务器的内置源。然而,用户仍然需要从各种外部系统(包括消息传递系统和数据管道等)中获取数据。源扩展正是为了满足此要求。
开发
有两种类型的源。一种是普通源,即扫描源(Scan Source),另一种是查询源(Lookup Source)。一个正常的源可以作为一个流或扫描表使用;一个查询源可以作为一个查询表使用。用户可以在一个源插件中开发一种或两种源。
开发普通源
为 eKuiper 开发源的 是实现 api.Source 接口并将其导出为 golang 插件。
在开始开发之前,您必须为 golang 插件设置环境。
要开发源,必须实现 Configure 方法。 初始化源后,将调用此方法。 在此方法中,您可以从第一个参数检索流的 DATASOURCE 属性(这是 mqtt 和其他消息传递系统的主题)。 然后在第二个参数中,传递包含 yaml 文件中的配置的映射。 有关更多详细信息,请参见 配置。 通常,将有外部系统的信息,例如主机、端口、用户和密码。 您可以使用此映射来初始化此源。
//在初始化期间调用。 使用数据源(例如,mqtt 的主题)和从 Yaml 读取的属性来配置源
Configure(datasource string, props map[string]interface{}) error
源的主要任务是实现 open 方法,且应该和创建到外部系统的连接保持同步。然后从外部系统连续接收数据,并将接收到的消息发送到消费通道。消费通道接受 SourceTuple 接口,该接口由消息正文的映射和可选元数据的另一个映射组成。有两种方法可用于帮助开发人员创建SourceTuple
对象: api.NewDefaultSourceTuple(message, meta)
和 api.NewDefaultSourceTupleWithTime(message, meta, time)
,这两个方法的不同之处在于,前者创建的SourceTuple对象的时间戳是在函数调用时生成的,而后者创建的SourceTuple对象的时间戳可以由用户在调用时指定。元数据可以是任何值得记录的内容。例如,消息的合格主题。第一个参数是 StreamContext 指针。您可以从中检索上下文信息和日志等。它也是 go 上下文的实现,因此您可以监听 Done() 通道以了解父流是否已退出。对于在连接或接收过程中发生的任何错误,请使用此方法进行处理。如果错误无法处理,请将其发送到 errCh。默认情况下,如果从 errCh 收到任何错误,则该规则将终止。
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
最后要实现的方法是 Close,它实际上用来关闭连接。 当流即将终止时调用它。 您也可以在此功能中执行任何清理工作。
Close(ctx StreamContext) error
由于源本身是一个插件,因此它必须位于主程序包中。 给定源结构名称为 mySource。 在文件的最后,必须将源作为符号导出,如下所示。 有 2种类型的导出符号。 对于源扩展,通常需要状态,因此建议导出构造函数。
function MySource() api.Source{
return &mySource{}
}
Random Source 是一个很好的示例。
开发查询源
为 eKuiper 开发一个查询源就是实现 api.LookupSource 接口并将其导出。
在开始开发之前,您必须为 golang 插件设置环境。
要开发一个查询源,必须实现 Configure 方法。 初始化源后,将调用此方法。 在此方法中,您可以从第一个参数检索流的 DATASOURCE 属性(这是 mqtt 和其他消息传递系统的主题)。 然后在第二个参数中,传递包含 yaml 文件中的配置的映射。 有关更多详细信息,请参见 配置。 通常,将有外部系统的信息,例如主机、端口、用户和密码。 您可以使用此映射来初始化此源。
//在初始化过程中调用。用数据源(例如mqtt的topic)和从yaml中读取的属性来配置这个源
Configure(datasource string, props map[string]interface{}) error
下一个任务是实现 open 方法。一旦源被创建,该方法将被调用。它负责初始化,比如建立连接。
// Open 创建与外部数据源的连接
Open(ctx StreamContext) error
查询源的主要任务是实现 Lookup 方法。该方法将在每个连接操作中运行。参数是在运行时获得的,包括要从外部系统中检索的字段、键和值等信息。每个查询源都有不同的查询机制。例如,SQL 查询源将从这些参数中组装一个 SQL 查询来检索查询数据。
// Lookup 接收查询值以构建查询并返回查询结果
Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
最后要实现的方法是 Close,它实际上用来关闭连接。当流即将终止时调用它。 您也可以在此功能中执行任何清理工作。
Close(ctx StreamContext) error
由于源本身是一个插件,因此它必须位于主程序包中。Export 的名称必须以 Lookup
结尾,这样它就可以被称为 MySource
的查询源。对于源扩展,通常需要状态,所以建议导出一个构造函数。
function MySourceLookup() api.LookupSource{
return &mySource{}。
}
SQL Lookup Source 是一个很好的示例。
可回溯源
如果规则检查点被启用,源需要可回退。这意味着源需要同时实现 api.Source
和 api.Rewindable
接口。
一个典型的实现是将 "offset" 作为源的一个字段来保存。当读入新的值时更新偏移值。注意,当实现 GetOffset() 时,将被 eKuiper 系统调用,这意味着偏移值可以被多个 go routines 访问。因此,在读或写偏移量时,需要一个锁。
处理配置
eKuiper 配置的格式为 yaml,它提供了一个集中位置 /etc 来保存所有配置。 在其中,为源配置提供了一个子文件夹 sources,同时也适用于扩展源。
eKuiper 扩展支持配置系统自动读取 yaml 文件中的配置,并将其输入到源的 Configure 方法中。 如果在流中指定了 CONF_KEY 属性,则将输入该键的配置。 否则,将使用默认配置。
要在源中使用配置,必须遵循以下约定:
- 您的配置文件名称必须与插件名字相同,例如,mySource.yaml。
- yaml 文件必须位于 etc/sources 内。
- 可以在此处找到 yaml 文件的格式。
通用配置字段
有两个通用配置字段。
concurrency
指定将启动多少实例来运行源。bufferLength
指定要在内存中缓冲的最大消息数。 这是为了避免过多的内存使用情况而导致内存不足错误。 请注意,内存使用情况将因实际缓冲区而异。 在此处增加长度不会增加初始内存分配,因此可以安全设置较大的缓冲区长度。 默认值为102400,即如果每个消息体大小约为100个字节,则最大缓冲区大小将约为102400 * 100B〜= 10MB。
打包源
将已实现的源构建为 go 插件,并确保输出的 so 文件位于 plugins/sources 文件夹中。
go build -trimpath --buildmode=plugin -o plugins/sources/MySource.so extensions/sources/my_source.go
使用
在流定义中指定自定义源, 相关属性为:
- TYPE:指定源名称,必须为驼峰式命名。
- CONF_KEY:指定要使用的配置键。
如果您开发了源实现 MySource,则应该具有:
- 在插件文件中,将导出符号 MySource。
- 编译的 MySource.so 文件位于 plugins/sources 内部。
- 如果需要配置,请将 mySource.yaml 放在 etc/sources 中。
要使用它,请定义一个流:
CREATE STREAM demo (
USERID BIGINT,
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN,
ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
) WITH (DATASOURCE="mytopic", TYPE="mySource", CONF_KEY="democonf");