Portable 插件 Python SDK
用户可利用 Python SDK 来开发 portable 插件,这个 SDK 提供了类似原生插件的 API,另外它提供了启动函数,用户只需填充插件信息即可。
运行 python 插件有两个前置条件
- 安装 Python 3.x 环境.
- 通过
pip install nng ekuiper
安装 nng 和 ekuiper 包.
默认情况下,eKuiper 的 portable 插件运行时会通过 python
命令来运行插件。如果您的环境不支持 python
命令,请通过配置文件更换为可用的 Python 命令。
插件开发
开发插件包括子模块和主程序两部分, Python SDK 提供了 python 语言的源,目标和函数 API。
源接口:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
目标接口:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Sink ack
默认的 Portable 插件 sink 是异步运行的。在 v2.0 及之后的版本中(需要使用新的 pip eKuiper 版本),用户使用 Portable 插件定义的 sink 时可以配置是否等待 ack 再发送下一条数据。例如,假设 Portable 插件中定义了 print
类型的 sink 。当 requireAck 打开时,用户的自定义 sink 插件针对每条数据都必须返回 ack 信息。
{
"id": "rulePort1",
"sql": "SELECT * FROM mqttStream",
"actions": [
{
"print": {
"requireAck": true
}
}
]
}
Sink 插件中调用 ctx.ack_ok()
或 ctx.ack_err(msg)
返回 ack 信息。以下为示例 collect 函数,调用成功时返回 ack 。
def collect(self, ctx: Context, data: Any):
print('receive: ', data)
# only add ack when using with requireAck in the rule
ctx.ack_ok()
函数接口:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
用户通过实现这些抽象接口来创建自己的源,目标和函数,然后在主函数中声明这些自定义插件的实例化方法
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
关于更详细的信息,请参考这篇文章 python sdk example.
打包发布
由于 python 是解释性语言,不需要编译出可执行文件,需要确保 json 描述文件中可执行文件名字的准确性即可。详细信息,请参考
部署要求
运行 python 脚本需要有 python 环境。所以,目标系统必须安装 python 3.x 环境。如果使用 docker ,建议使用 lfedge/ekuiper:<tag>-slim-python
版本。该版本包含 eKuiper 和 python 环境,无需再手动安装。
虚拟环境
虚拟环境是Python开发中常用的技术,对 Python 的依赖性管理很有用。Anaconda 或 Miniconda 是最流行的 Python 环境管理器之一。conda 软件包和环境管理器包含在所有版本的 Anaconda®、Miniconda 和 Anaconda Repository 中。eKuiper 支持使用 conda 环境运行 Python 插件。
使用 conda 虚拟环境通常包括如下步骤:
创建并配置虚拟环境。
在打包插件时,确保
virtualEnvType
设置为conda
,env
设置为创建的虚拟环境名,如下所示。json{ "version": "v1.0.0", "language": "python", "executable": "pysam.py", "virtualEnvType": "conda", "env": "myenv", "sources": [ "pyjson" ], "sinks": [ "print" ], "functions": [ "revert" ] }
如果该插件有安装脚本,确保该脚本将依赖安装到正确的虚拟环境中。