Portable 插件 Python SDK
用户可利用 Python SDK 来开发 portable 插件,这个 SDK 提供了类似原生插件的 API,另外它提供了启动函数,用户只需填充插件信息即可。
运行 python 插件有两个前置条件
- 安装 Python 3.x 环境.
- 通过
pip install nng ekuiper
安装 nng 和 ekuiper 包.
默认情况下,eKuiper 的 portable 插件运行时会通过 python
命令来运行插件。如果您的环境不支持 python
命令,请通过配置文件更换为可用的 Python 命令。
插件开发
开发插件包括子模块和主程序两部分, Python SDK 提供了 python 语言的源,目标和函数 API。
源接口:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
目标接口:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
函数接口:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
用户通过实现这些抽象接口来创建自己的源,目标和函数,然后在主函数中声明这些自定义插件的实例化方法
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
关于更详细的信息,请参考这篇文章 python sdk example.
打包发布
由于 python 是解释性语言,不需要编译出可执行文件,需要确保 json 描述文件中可执行文件名字的准确性即可。详细信息,请参考
部署要求
运行 python 脚本需要有 python 环境。所以,目标系统必须安装 python 3.x 环境。如果使用 docker ,建议使用 lfedge/ekuiper:<tag>-slim-python
版本。该版本包含 eKuiper 和 python 环境,无需再手动安装。
虚拟环境
虚拟环境是Python开发中常用的技术,对 Python 的依赖性管理很有用。Anaconda 或 Miniconda 是最流行的 Python 环境管理器之一。conda 软件包和环境管理器包含在所有版本的 Anaconda®、Miniconda 和 Anaconda Repository 中。eKuiper 支持使用 conda 环境运行 Python 插件。
使用 conda 虚拟环境通常包括如下步骤:
创建并配置虚拟环境。
在打包插件时,确保
virtualEnvType
设置为conda
,env
设置为创建的虚拟环境名,如下所示。json{ "version": "v1.0.0", "language": "python", "executable": "pysam.py", "virtualEnvType": "conda", "env": "myenv", "sources": [ "pyjson" ], "sinks": [ "print" ], "functions": [ "revert" ] }
如果该插件有安装脚本,确保该脚本将依赖安装到正确的虚拟环境中。