Run TensorFlow Lite model with eKuiper function plugin 
LF Edge eKuiper is an edge lightweight IoT data analytics / streaming software which can be run at all kinds of resource constrained IoT devices.
TensorFlow Lite is a set of tools to help developers run TensorFlow models on mobile, embedded, and IoT devices. It enables on-device machine learning inference with low latency and a small binary size.
By integrating eKuiper and TensorFlow Lite, users can analyze the data in stream by AI with prebuilt TensorFlow models. In this tutorial, we will walk you through building a eKuiper plugin to label pictures (binary data) produced by an edge device in stream by pre-trained image recognition TensorFlow model.
Prerequisite 
To run the TensorFlow lite interpreter, we need a trained model. We won't cover how to train and cover a model in this tutorial, you can check tflite converter for how to do that. We can either train a new model or pick one online. In this tutorial, we will use label image model from mattn/go-tflite. This repo creates a golang binding for tflite C API. We will also use it to implement our plugin.
Develop the plugin 
To integrate eKuiper with TensorFlow lite, we will develop a customized eKuiper function plugin to be used by eKuiper rules. As an example, we will create LabelImage function whose input is a binary type data representing an image and the output is a string representing the label of the image. For example, LabelImage(col) will produce "peacock" if the input image has a peacock.
To develop the function plugin, we need to:
- Create the plugin go file. For example, in eKuiper source code, create plugins/functions/labelImage/labelImage.go file.
- Create a struct that implements api.Function interface.
- Export the struct.
The key part of the implementation is the Exec function. The pseudo code is like:
func (f *labelImage) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
  
    //... do some initialization and validation
  
    // decode the input image
    img, _, err := image.Decode(bytes.NewReader(arg[0]))
    if err != nil {
        return err, false
    }
    var outerErr error
    f.once.Do(func() {      
        // Load labels, tflite model and initialize the tflite interpreter
    })
    // Run the interpreter against the input image
  
    // Return the label with the highest possibility
    return result, true
}Another thing to notice is the export of plugin. The function is stateless, so we will only export one struct instance. All rules use this function will share one instance to avoid overhead of creating instances and loading model. The model and label path will be specified at the instantiation.
var LabelImage = labelImage{
    modelPath: "labelImage/mobilenet_quant_v1_224.tflite",
    labelPath: "labelImage/labels.txt",
}Check this tutorial for detail steps of creating eKuiper plugins. Please refer to labelImage.go for the full source code.
Build and install the plugin 
To use the plugin, we need to build it in the environment where eKuiper will run and then install it in eKuiper.
Install by pre-built zip 
If using eKuiper docker images with tags like 1.1.1 or 1.1.1-slim which are based on debian, we can install the pre-built labelImage plugin. For example, to install the plugin for eKuiper 1.1.2 in docker image lfedge/ekuiper:1.1.2-slim, the pre-built zip file locates in https://packages.emqx.net/kuiper-plugins/1.1.2/debian/functions/labelImage_amd64.zip. Run the rest command as below to install.
POST http://{{eKuiperHost:eKuiperRestPort}}/plugins/functions
Content-Type: application/json
{"name":"labelImage", "file": "https://packages.emqx.net/kuiper-plugins/1.1.2/debian/functions/labelImage_amd64.zip"}Manual build 
If you don't run eKuiper by official eKuiper docker image, the pre-built labelImage plugin will not fit due to the limitation of golang plugin. You will need to built the plugin manually. There are 3 steps to create the plugin zip file manually:
- Build the TensorFlowLite C API.
- Build the labelImage plugin.
- Package the plugin with install script.
Build the TensorFlowLite C API 
There is a very simple instruction from the tensorflow repo about build the C API. We will expand it in detail step by step in this section. Notice that, the plugin only test against TensorFlow v2.2.0-rc3, so we will build upon this version. Take ubuntu as an example, below are the build steps:
- Install Python 3. 
- Create requirements.txt according to - tensorflow/tensorflow/tools/pip_package/setup.pyof the corresponding TensorFlow version and copy it to your location. Install required python lib:- pip3 install -r requirements.txt.
- Install Bazel which is the build tool for TensorFlow. 
- Clone tesorflow repo, switch to the required branch by - git checkout v2.2.0-rc3 -b mybranch.
- Build the target .so file, the output will be in ./bazel-bin. Copy the two so to tensorflow/lib folder. bash- $ cd $tensorflowSrc $ bazel build --config monolithic -c opt //tensorflow/lite:libtensorflowlite.so $ bazel build --config monolithic -c opt //tensorflow/lite/c:libtensorflowlite_c.so $ mkdir lib $ cp bazel-bin/tensorflow/lite/libtensorflowlite.so lib $ cp bazel-bin/tensorflow/lite/c/libtensorflowlite_c.so lib
- Install the so files. - Update ldconfig file. sudo vi /etc/ld.so.conf.d/tflite.conf.
- Add the path {{tensorflowPath}}/libto tflite.conf then save and exit.
- Run ldconfig: sudo ldconfig.
- Check installation result: ldconfig -p | grep libtensorflow. Make sure the two so files are listed.
 
- Update ldconfig file. 
Build the labelImage plugin 
Make sure the eKuiper github repo has cloned. The plugin source file is in extensions/functions/labelImage/labelImage.go. Export the paths of the tensorflow repo and built libraries before build the plugin.
$ cd {{eKuiperRepoPath}}
$ export CGO_CFLAGS=-I/root/tensorflow
$ export CGO_LDFLAGS=-L/root/tensorflow/lib
$ go build -trimpath --buildmode=plugin -o plugins/functions/LabelImage.so extensions/functions/labelImage/*.go
$ mkdir -p "plugins/functions"
$ cp -r extensions/functions/labelImage plugins/functionsBy these commands, the plugin is built into plugins/functions/LabelImage.so and copy all dependencies to plugins/functions/labelImage folder. For development purpose, you can restart eKuiper to load this plugin automatically and do testing. After testing complete, we should package it in a zip which is ready to use by eKuiper plugin installation API so that it can be used in another machine such as in production environment.
Package the plugin 
Package all files and directories inside plugins/functions/labelImage into a zip file along with the built LabelImage.so. The file structure inside the zip file should be like:
- etc - labels.txt
- mobilenet_quant_v1_224.tflite
 
- lib - libtensorflowlite.so
- libtensorflowlite_c.so
 
- install.sh
- LabelImage.so
- tflite.conf
Install the packaged plugin to the target system like Install by pre-built zip.
Run the plugin 
Once the plugin installed, we can use it in our rule. We will create a rule to receive image byte data from a mqtt topic and label the image by tflite model.
Define the stream 
Define the stream by eKuiper rest API. We create a stream named tfdemo whose format is binary and the topic is tfdemo.
POST http://{{host}}/streams
Content-Type: application/json
{"sql":"CREATE STREAM tfdemo () WITH (DATASOURCE=\"tfdemo\", FORMAT=\"BINARY\")"}Define the rule 
Define the rule by eKuiper rest API. We will create a rule named ruleTf. We just read the images from tfdemo stream and run the custom function labelImage against it. The result will be the label of the image recognized by the AI.
POST http://{{host}}/rules
Content-Type: application/json
{
  "id": "ruleTf",
  "sql": "SELECT labelImage(self) FROM tfdemo",
  "actions": [
    {
      "log": {}
    }
  ]
}Feed the data 
Here we create a go program to send image data to the tfdemo topic to be processed by the rule.
package main
import (
    "fmt"
    "os"
    "time"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
    const TOPIC = "tfdemo"
    images := []string{
        "peacock.png",
        "frog.jpg",
        // other images you want
    }
    opts := mqtt.NewClientOptions().AddBroker("tcp://yourownhost:1883")
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    for _, image := range images {
        fmt.Println("Publishing " + image)
        payload, err := os.ReadFile(image)
        if err != nil {
            fmt.Println(err)
            continue
        }
        if token := client.Publish(TOPIC, 0, false, payload); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
        } else {
            fmt.Println("Published " + image)
        }
        time.Sleep(1 * time.Second)
    }
    client.Disconnect(0)
}Run pub.go, it will start to feed images into tfdemo topic.
Check the result 
Because our rule definition has only one sink: log so the result will be written into the log file. We feed the stream with two images peacock.png and frog.png. Check the log file, we would find:
time="2021-02-05 16:23:29" level=info msg="sink result for rule ruleTf: [{\"labelImage\":\"peacock\"}]" file="sinks/log_sink.go:16" rule=ruleTf
time="2021-02-05 16:23:30" level=info msg="sink result for rule ruleTf: [{\"labelImage\":\"bullfrog\"}]" file="sinks/log_sink.go:16" rule=ruleTfThe images are labeled correctly.
Conclusion 
In this tutorial, we walk you through building a customized eKuiper plugin to leverage a pre-trained TensorFlowLite model. If you need to use other models, just follow the steps to create another function. Notice that, the built TensorFlow C API can be shared among all functions if running in the same environment. Enjoy the AI in edge device.