Data Import/Export Management
eKuiper REST api allows to import or export data.
Data Format
The file format for importing and exporting data is JSON, which can contain : streams, tables, rules, plugin, source yaml and so on. Each type holds the key-value pair of the name and the creation statement. In the following example file, we define stream 、rules、table、plugin、source config、sink config
{
"streams": {
"demo": "CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
},
"tables": {
"T110":"\n CREATE TABLE T110\n (\n S1 string\n )\n WITH (DATASOURCE=\"test.json\", FORMAT=\"json\", TYPE=\"file\", KIND=\"scan\", );\n "
},
"rules": {
"rule1": "{\"id\": \"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}",
"rule2": "{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{ \"log\": {}}]}"
},
"nativePlugins":{
"functions_image":"{\"name\":\"image\",\"file\":\"https://packages.emqx.net/kuiper-plugins/1.8.1/debian/functions/image_amd64.zip\",\"shellParas\":[]}",
"sources_video":"{\"name\":\"video\",\"file\":\"https://packages.emqx.net/kuiper-plugins/1.8.1/debian/sources/video_amd64.zip\",\"shellParas\":[]}"
},
"portablePlugins":{
},
"sourceConfig":{
"mqtt":"{\"td\":{\"insecureSkipVerify\":false,\"password\":\"public\",\"protocolVersion\":\"3.1.1\",\"qos\":1,\"server\":\"tcp://broker.emqx.io:1883\",\"username\":\"admin\"},\"test\":{\"insecureSkipVerify\":false,\"password\":\"public\",\"protocolVersion\":\"3.1.1\",\"qos\":1,\"server\":\"tcp://127.0.0.1:1883\",\"username\":\"admin\"}}"
},
"sinkConfig":{
"edgex":"{\"test\":{\"bufferLength\":1024,\"contentType\":\"application/json\",\"enableCache\":false,\"format\":\"json\",\"messageType\":\"event\",\"omitIfEmpty\":false,\"port\":6379,\"protocol\":\"redis\",\"sendSingle\":true,\"server\":\"localhost\",\"topic\":\"application\",\"type\":\"redis\"}}"
},
"connectionConfig":{
},
"Service":{
},
"Schema":{
},
"uploads":{
},
"scripts":{
"area":"{\"id\":\"area\",\"description\":\"calculate area\",\"script\":\"function area(x, y) { return x * y; }\",\"isAgg\":false}"
}
}Import Data
The API resets all existing data and then imports the new data into the system by default. But user can specify partial=1 parameter in HTTP URL to keep the existing data and apply the new data. The API supports specifying data by means of text content or file URIs.
Example 1: Import by text content
POST http://{{host}}/data/import
Content-Type: application/json
{
"content": "{json of the ruleset}"
}Example 2: Import by file URI
POST http://{{host}}/data/import
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}Example 3: Import data via file URI and exit (for plug-ins and static schema updates, users need to ensure that eKuiper can be restarted after exiting)
POST http://{{host}}/data/import?stop=1
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}Example 4: Keep the old data and import new data (overwrite the tables/streams/rules/source config/sink config. install plugins/schema if not exist, else ignore them)
POST http://{{host}}/data/import?partial=1
Content-Type: application/json
{
"file": "file:///tmp/a.json"
}Example 5: Import data through an asynchronous API. After receiving the request, the server will generate a task ID, then execute the task in the background and return a response immediately.
POST http://{{host}}/async/data/import
Content type: application/json
{
"content": "$data json content"
}
response
{
"id": "$taskID"
}Check the running status of background tasks by task ID
Get http://{{host}}/async/task/{{id}}
Content type: application/jsonImport data status
This API returns data import errors. If all returns are empty, it means that the import is completely successful.
GET http://{{host}}/data/import/statusExample 1: The data import is completely successful
GET http://{{host}}/data/import/status
Content-Type: application/json
{
"streams":{},
"tables":{},
"rules":{},
"nativePlugins":{},
"portablePlugins":{},
"sourceConfig":{},
"sinkConfig":{},
"connectionConfig":{},
"Service":{},
"Schema":{},
"uploads":{},
"scripts":{}
}Example 2: Failed to import plugin
GET http://{{host}}/data/import/status
Content-Type: application/json
{
"streams":{},
"tables":{},
"rules":{},
"nativePlugins":{
"sinks_tdengine":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sinks/tdengine_amd64.zip: stat /root/ekuiper-jran/_plugins/ubuntu/sinks/tdengine_amd64.zip: no such file or directory",
"sources_random":"fail to download file file:///root/ekuiper-jran/_plugins/ubuntu/sources/random_amd64.zip: stat /root/ekuiper-jran/_plugins/ubuntu/sources/random_amd64.zip: no such file or directory"},
"portablePlugins":{},
"sourceConfig":{},
"sinkConfig":{},
"connectionConfig":{},
"Service":{},
"Schema":{},
"uploads":{},
"scripts":{}
}Data Export
The export API returns a file to download.
Example 1: export all data
GET http://{{host}}/data/exportExample 2: export specific rules related data
POST -d '["rule1","rule2"]' http://{{host}}/data/exportImport and export data through yaml format
For eKuiper configuration, the yaml format is more readable. eKuiper also supports importing and exporting configurations through yaml format, including stream stream, table table, rule rule, plug-in plugin, and source configuration etc. Each type stores a name and a key-value pair of the creation statement. In the following example file, we define flows, rules, tables, plug-ins, source configurations, and target action configurations.
GET /v2/data/export
sourceConfig:
sources.mqtt.mqttconf1:
connectionSelector: mqttcon
qos: 1
sourceType: stream
connectionConfig:
connections.mqtt.mqttcon:
insecureSkipVerify: false
protocolVersion: 3.1.1
server: tcp://127.0.0.1:1883
streams:
mqttstream1:
sql: ' CREATE STREAM mqttstream1 () WITH (DATASOURCE="topic1", FORMAT="json", CONF_KEY="mqttconf1", TYPE="mqtt", SHARED="false", );'
rules:
rule1:
triggered: false
id: rule1
sql: select * from mqttstream1
actions:
- log: {}Import Configuration
POST http:///v2/data/import Content-Type: application/json
{
"file": "file:///tmp/a.yaml"
}