Python SDK for Portable Plugin
By using Python SDK for portable plugins, user can develop portable plugins with python language. The Python SDK provides APIs for the source, sink and function interfaces. Additionally, it provides a plugin start function as the execution entry point to define the plugin and its symbols.
To run python plugin, there are two prerequisites in the runtime environment:
- Install Python 3.x environment.
- Install nng and ekuiper package by
pip install nng ekuiper
.
By default, the eKuiper portable plugin runtime will run python script with python userscript.py
. If users have multiple python instance or an alternative python executable command, they can specify the python command in the configuration file.
Development
The process is the same: develop the symbols and then develop the main program. Python SDK provides the similar source, sink and function interfaces in python language.
Source interface:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Sink interface:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Sink ack
The default Portable plugin sink operates asynchronously. In versions 2.0 and later (requiring the use of the new pip eKuiper version), users can configure whether to wait for an acknowledgment before sending the next piece of data when defining a sink with a Portable plugin. For example, suppose the Portable plugin defines a sink of type print
. When requireAck
is enabled, the user's custom sink plugin must return an ack message for each piece of data.
{
"id": "rulePort1",
"sql": "SELECT * FROM mqttStream",
"actions": [
{
"print": {
"requireAck": true
}
}
]
}
Sink implementation must call ctx.ack_ok()
or ctx.ack_err(msg)
to return acknowledge. In the following example , the collect function inside sink returns ack after handling the data.
def collect(self, ctx: Context, data: Any):
print('receive: ', data)
# only add ack when using with requireAck in the rule
ctx.ack_ok()
Function interface:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
Users need to create their own source, sink and function by implement these abstract classes. Then create the main program and declare the instantiation functions for these extensions like below:
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
For the full example, please check the python sdk example.
Package
As python is an interpretive language, we don't need to build an executable for it. Just specify the main program python file in the plugin json file is ok. For detail, please check packaging.
Deployment requirements
Running python script requires the python environment. Make sure python 3.x are installed in the target environment. If using docker image, we recommend to use tags like lfedge/ekuiper:<tag>-slim-python
which have both eKuiper and python environment.
Virtual Environment
Virtual environments are a common and effective technique used in Python development which is useful for python dependency management. Anaconda or Miniconda are one of the most popular environment manager for Python. The conda package and environment manager is included in all versions of Anaconda®, Miniconda, and Anaconda Repository. eKuiper supports to run the Python plugin with conda environment.
To use conda environment, the common steps are:
Create and set up the conda environment.
When packaging the plugin, make sure
virtualEnvType
is set toconda
andenv
is set to the created virtual environment. Below is an example.json{ "version": "v1.0.0", "language": "python", "executable": "pysam.py", "virtualEnvType": "conda", "env": "myenv", "sources": [ "pyjson" ], "sinks": [ "print" ], "functions": [ "revert" ] }
If the plugin has installation script, make sure the script install the dependencies to the correct environment.