Skip to content

TDengine 目标 (Sink)

编译插件

在 eKuiper 项目主目录运行如下命令:

shell
go build -trimpath --buildmode=plugin -o plugins/sinks/Tdengine@v1.0.0.so extensions/sinks/tdengine/tdengine.go

安装插件

由于 TDengine 插件的运行依赖于 TDengine 客户端,为了便于用户使用,安装插件时将下载 TDengine 客户端。但是 TDengine 客户端版本与其服务器版本一一对应,互不兼容,所以用户必须告知所用 TDengine 服务器版本。

规则 Actions 说明

由于 TDengine 数据库要求表中必须有时间戳字段,所以用户必须告知数据表的时间戳字段名称(必填tsFieldName)。用户可以选择是否提供时间戳数据,若不提供(provideTs=false),时间戳字段的内容由 TDengine 数据库自动生成。

名称类型是否必填说明
hoststring数据库域名,其值必须为域名,即 FQDN,不能为 IP 地址。其默认值为 localhost。
portint数据库端口
userstring用户名,默认值为 root
passwordstring密码,默认值为 taosdata
databasestring数据库名
tablestring表名,可设置动态属性
fields[]string将要插入的表字段集合。sink 收到的数据和数据库表中均有该字段。若未设置,则所有结果字段写入数据库。
provideTsBool用户是否提供时间戳字段,默认为否。
tsFieldNameString时间戳字段名称
sTableString使用的超级表,可设置动态属性
tagFields[]String结果中作为标签的字段。若设置 sTable 属性,则该属性必填。
tableDataFieldString将 tableDataField 的嵌套值写入数据库。

其他通用的 sink 属性也支持,请参阅公共属性

操作示例

创建数据库、表

参考以下文档:

http
https://www.taosdata.com/cn/getting-started/

创建流

bash
curl --location --request POST 'http://127.0.0.1:9081/streams' --header 'Content-Type:application/json' --data '{"sql":"create stream demoStream(time string, age BIGINT) WITH ( DATASOURCE = \"device/+/message\", FORMAT = \"json\");"}'

创建规则

bash
curl --location --request POST 'http://127.0.0.1:9081/rules' --header 'Content-Type:application/json' --data '{"id":"demoRule","sql":"SELECT * FROM demoStream;","actions":[{"tdengine":{"provideTs":true,"tsFieldName":"time","port":0,"ip":"127.0.0.1","user":"root","password":"taosdata","database":"dbName","table":"tableName","fields":["time","age"]}}]}'

写入固定表格的例子:

json
{
  "tdengine": {
    "host":        "hostname",
    "port":        6030,
    "user":        "root",
    "password":    "taosdata",
    "database":    "db",
    "table":       "tableName",
    "tsfieldname": "ts"
  }
}

写入动态表的例子:

json
{
  "tdengine": {
    "host":        "hostname",
    "port":        6030,
    "database":    "dab",
    "table":       "{{.table}}", // 动态值,从结果中的 table 字段获取
    "tsfieldname": "ts",
    "fields":      []string{"f1", "f2"}, // 结果中的 f1, f2 字段写入数据库中的 f1, f2 列
    "sTable":      "myStable", // 超级表名,也可以动态
    "tagFields":   []string{"f3","f4"} // 结果中的 f3, f4 字段的值按顺序作为标签值写入
  }
}

根据 tableDataField 配置将结果写入数据库:

以下配置将 telemetry 字段的对应值写入数据库

json
{
  "telemetry": [{
    "temperature": 32.32,
    "humidity": 80.8,
    "f3": "f3tagValue",
    "f4": "f4tagValue",
    "ts": 1388082430
  },{
    "temperature": 34.32,
    "humidity": 81.8,
    "f3": "f3tagValue",
    "f4": "f4tagValue",
    "ts": 1388082440
  }]
}
json
{
  "tdengine": {
    "host":        "hostname",
    "port":        6030,
    "database":    "dab",
    "table":       "tableName", // dynamic value, get from the table field of the result
    "tsfieldname": "ts",
    "fields":      []string{"temperature", "humidity"}, // Write f1, f2 fields in result into f1, f2 columns in the db
    "sTable":      "myStable", // super table name, also allow dynamic
    "tableDataField":      "telemetry", // write values of telemetry field into database
    "tagFields":   []string{"f3","f4"} // Write f3, f4 fields' values in the result as tags in order
  }
}