Data Import/Export Management
eKuiper REST api allows to import or export data.
Data Format
The file format for importing and exporting data is JSON, which can contain : streams
, tables
, rules
, plugin
, source yaml
and so on. Each type holds the key-value pair of the name and the creation statement. In the following example file, we define stream 、rules、table、plugin、source config、sink config
{
"streams": {
"demo": "CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
},
"tables": {
"T110":"\n CREATE TABLE T110\n (\n S1 string\n )\n WITH (DATASOURCE=\"test.json\", FORMAT=\"json\", TYPE=\"file\", KIND=\"scan\", );\n "
},
"rules": {
"rule1": "{\"id\": \"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}",
"rule2": "{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{ \"log\": {}}]}"
},
"nativePlugins":{
"functions_image":"{\"name\":\"image\",\"file\":\"https://packages.emqx.net/kuiper-plugins/1.8.1/debian/functions/image_amd64.zip\",\"shellParas\":[]}",
"sources_video":"{\"name\":\"video\",\"file\":\"https://packages.emqx.net/kuiper-plugins/1.8.1/debian/sources/video_amd64.zip\",\"shellParas\":[]}"
},
"portablePlugins":{
},
"sourceConfig":{
"mqtt":"{\"td\":{\"insecureSkipVerify\":false,\"password\":\"public\",\"protocolVersion\":\"3.1.1\",\"qos\":1,\"server\":\"tcp://broker.emqx.io:1883\",\"username\":\"admin\"},\"test\":{\"insecureSkipVerify\":false,\"password\":\"public\",\"protocolVersion\":\"3.1.1\",\"qos\":1,\"server\":\"tcp://127.0.0.1:1883\",\"username\":\"admin\"}}"
},
"sinkConfig":{
"edgex":"{\"test\":{\"bufferLength\":1024,\"contentType\":\"application/json\",\"enableCache\":false,\"format\":\"json\",\"messageType\":\"event\",\"omitIfEmpty\":false,\"port\":6379,\"protocol\":\"redis\",\"sendSingle\":true,\"server\":\"localhost\",\"topic\":\"application\",\"type\":\"redis\"}}"
},
"connectionConfig":{
},
"Service":{
},
"Schema":{
},
"uploads":{
},
"scripts":{
"area":"{\"id\":\"area\",\"description\":\"calculate area\",\"script\":\"function area(x, y) { return x * y; }\",\"isAgg\":false}"
}
}
Import Data
The API resets all existing data and then imports the new data into the system by default. But user can specify partial=1
parameter in HTTP URL to keep the existing data and apply the new data. The API supports specifying data by means of text content or file URIs.
Example 1: Import by text content
POST http://{{host}}/data/import
Content-Type: application/json
{
"content": "{json of the ruleset}"
}
Example 2: Import by file URI
POST http://{{host}}/data/import
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}
Example 3: Import data via file URI and exit (for plug-ins and static schema updates, users need to ensure that eKuiper can be restarted after exiting)
POST http://{{host}}/data/import?stop=1
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}
Example 4: Keep the old data and import new data (overwrite the tables/streams/rules/source config/sink config. install plugins/schema if not exist, else ignore them)
POST http://{{host}}/data/import?partial=1
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}
Example 5: Import data through an asynchronous API. After receiving the request, the server will generate a task ID, then execute the task in the background and return a response immediately.
POST http://{{host}}/async/data/import
Content type: application/json
{
"content": "$data json content"
}
response
{
"id": "$taskID"
}
Check the running status of background tasks by task ID
Get http://{{host}}/async/task/{{id}}
Content type: application/json
Import data status
This API returns data import errors. If all returns are empty, it means that the import is completely successful.
GET http://{{host}}/data/import/status
Example 1: The data import is completely successful
GET http://{{host}}/data/import/status
Content-Type: application/json
{
"streams":{},
"tables":{},
"rules":{},
"nativePlugins":{},
"portablePlugins":{},
"sourceConfig":{},
"sinkConfig":{},
"connectionConfig":{},
"Service":{},
"Schema":{},
"uploads":{},
"scripts":{}
}
Example 2: Failed to import plugin
GET http://{{host}}/data/import/status
Content-Type: application/json
{
"streams":{},
"tables":{},
"rules":{},
"nativePlugins":{
"sinks_tdengine":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sinks/tdengine_amd64.zip: stat /root/ekuiper-jran/_plugins/ubuntu/sinks/tdengine_amd64.zip: no such file or directory",
"sources_random":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sources/random_amd64.zip: stat /root/ekuiper-jran/_plugins/ubuntu/sources/random_amd64.zip: no such file or directory"},
"portablePlugins":{},
"sourceConfig":{},
"sinkConfig":{},
"connectionConfig":{},
"Service":{},
"Schema":{},
"uploads":{},
"scripts":{}
}
Data Export
The export API returns a file to download.
Example 1: export all data
GET http://{{host}}/data/export
Example 2: export specific rules related data
POST -d '["rule1","rule2"]' http://{{host}}/data/export
Import and export data through yaml format
For eKuiper configuration, the yaml format is more readable. eKuiper also supports importing and exporting configurations through yaml format, including stream stream
, table table
, rule rule
, plug-in plugin
, and source configuration etc. Each type stores a name and a key-value pair of the creation statement. In the following example file, we define flows, rules, tables, plug-ins, source configurations, and target action configurations.
GET /v2/data/export
sourceConfig:
sources.mqtt.mqttconf1:
connectionSelector: mqttcon
qos: 1
sourceType: stream
connectionConfig:
connections.mqtt.mqttcon:
insecureSkipVerify: false
protocolVersion: 3.1.1
server: tcp://127.0.0.1:1883
streams:
mqttstream1:
sql: ' CREATE STREAM mqttstream1 () WITH (DATASOURCE="topic1", FORMAT="json", CONF_KEY="mqttconf1", TYPE="mqtt", SHARED="false", );'
rules:
rule1:
triggered: false
id: rule1
sql: select * from mqttstream1
actions:
- log: {}
Import Configuration
POST http:///v2/data/import Content-Type: application/json
{
"file": "file:///tmp/a.yaml"
}