Skip to content

Data Import/Export Management

eKuiper REST api allows to import or export data.

Data Format

The file format for importing and exporting data is JSON, which can contain : streams, tables, rules, plugin, source yaml and so on. Each type holds the key-value pair of the name and the creation statement. In the following example file, we define stream 、rules、table、plugin、source config、sink config

    "streams": {
        "demo": "CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
    "tables": {
      "T110":"\n CREATE TABLE T110\n (\n S1 string\n )\n WITH (DATASOURCE=\"test.json\", FORMAT=\"json\", TYPE=\"file\", KIND=\"scan\", );\n "
    "rules": {
        "rule1": "{\"id\": \"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}",
        "rule2": "{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"
      "area":"{\"id\":\"area\",\"description\":\"calculate area\",\"script\":\"function area(x, y) { return x * y; }\",\"isAgg\":false}"

Import Data

The API resets all existing data and then imports the new data into the system by default. But user can specify partial=1 parameter in HTTP URL to keep the existing data and apply the new data. The API supports specifying data by means of text content or file URIs.

Example 1: Import by text content

POST http://{{host}}/data/import
Content-Type: application/json

  "content": "{json of the ruleset}"

Example 2: Import by file URI

POST http://{{host}}/data/import
Content-Type: application/json

  "file": "file:///tmp/a.json"

Example 3: Import data via file URI and exit (for plug-ins and static schema updates, users need to ensure that eKuiper can be restarted after exiting)

POST http://{{host}}/data/import?stop=1
Content-Type: application/json

  "file": "file:///tmp/a.json"

Example 4: Keep the old data and import new data (overwrite the tables/streams/rules/source config/sink config. install plugins/schema if not exist, else ignore them)

POST http://{{host}}/data/import?partial=1
Content-Type: application/json

  "file": "file:///tmp/a.json"

Example 5: Import data through an asynchronous API. After receiving the request, the server will generate a task ID, then execute the task in the background and return a response immediately.

POST http://{{host}}/async/data/import
Content type: application/json

  "content": "$data json content"


  "id": "$taskID"

Check the running status of background tasks by task ID

Get http://{{host}}/async/task/{{id}}
Content type: application/json

Import data status

This API returns data import errors. If all returns are empty, it means that the import is completely successful.

GET http://{{host}}/data/import/status

Example 1: The data import is completely successful

GET http://{{host}}/data/import/status
Content-Type: application/json


Example 2: Failed to import plugin

GET http://{{host}}/data/import/status
Content-Type: application/json

    "sinks_tdengine":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sinks/ stat /root/ekuiper-jran/_plugins/ubuntu/sinks/ no such file or directory",
    "sources_random":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sources/ stat /root/ekuiper-jran/_plugins/ubuntu/sources/ no such file or directory"},

Data Export

The export API returns a file to download.

Example 1: export all data

GET http://{{host}}/data/export

Example 2: export specific rules related data

POST -d '["rule1","rule2"]' http://{{host}}/data/export

Import and export data through yaml format

For eKuiper configuration, the yaml format is more readable. eKuiper also supports importing and exporting configurations through yaml format, including stream stream, table table, rule rule, plug-in plugin, and source configuration etc. Each type stores a name and a key-value pair of the creation statement. In the following example file, we define flows, rules, tables, plug-ins, source configurations, and target action configurations.

GET /v2/data/export

        connectionSelector: mqttcon
        qos: 1
        sourceType: stream
        insecureSkipVerify: false
        protocolVersion: 3.1.1
        server: tcp://
        sql: ' CREATE STREAM mqttstream1 ()       WITH (DATASOURCE="topic1", FORMAT="json", CONF_KEY="mqttconf1", TYPE="mqtt", SHARED="false", );'
        triggered: false
        id: rule1
        sql: select * from mqttstream1
            - log: {}

Import Configuration

POST http:///v2/data/import Content-Type: application/json

  "file": "file:///tmp/a.yaml"