Skip to content

电流变化时的查询

在物联网场景中,当指标变化时然后进行事件触发是一个非常常见的场景。本篇文章将以电流变化为例,介绍 eKuiper SQL 规则。

背景

在该场景中,stream 会以流的形式不断发送当前电流的数据,以及该数据所属的时间戳。本文中,我们将使用如下模拟输入数据:

json
{
  "current": 300,
  "ts": 1,
  "deviceId": 1
}
{
  "current": 400,
  "ts": 2,
  "deviceId": 2
}
{
  "current": 200,
  "ts": 3,
  "deviceId": 1
}
{
  "current": 200,
  "ts": 4,
  "deviceId": 2
}
{
  "current": 500,
  "ts": 5,
  "deviceId": 1
}
{
  "current": 200,
  "ts": 6,
  "deviceId": 2
}
{
  "current": 400,
  "ts": 7,
  "deviceId": 1
}
{
  "current": 600,
  "ts": 8,
  "deviceId": 2
}

当值变化为超过阈值时触发

在物联网应用中,用户经常需要监控传感器数值是否超过某个阈值,从而触发报警等动作。通过简单的比较当前值与阈值大小,可能会导致报警的持续触发。因此,用户可能更需要的是当数值由不超过阈值变为超过阈值时触发报警,其中隐含了判断变化的过程。以下为几个常见的场景:

1. 电流变化后超过阈值

sql
select concurrency, ts
from demo
where concurrency > 300
  and lag(concurrency) <= 300;

该规则会记录电流上一次的数据,然后和当前数据进行比对,一旦满足条件,则进行事件触发。

json
{
  "current": 400,
  "ts": 2
}
{
  "current": 500,
  "ts": 5
}
{
  "current": 400,
  "ts": 7
}

请注意,该规则会检测整个数据流(包含所有设备的值)的变化。如果需要区分设备,请看下面的场景。

2. 各设备的电流变化后超过阈值

sql
select current, deviceId, ts
from demo
where current > 300 and lag(current) over (partition by deviceId) < 300;

该规则会记录每个设备的上一次的数据与该设备当前数据进行比较。输出结果如下:

json
{
  "current": 500,
  "ts": 5,
  "deviceId": 1
}
{
  "current": 600,
  "ts": 8,
  "deviceId": 2
}

可见,在输入数据包含了多个设备的电流值的情况下,我们仍然可以基于设备进行触发。

3. 某个设备的电流变化后超过阈值

假设用户只关心某个具体的设备如 deviceId 为 1 的设备,我们可以通过 OVER WHEN 语句来限定 lag 的范围。

sql
select current, deviceId, ts
from demo
where current > 300 and deviceId = 1 and lag(current) over (when deviceId = 1) < 300;

输出结果如下:

json
{
  "current": 500,
  "ts": 5,
  "deviceId": 1
}

在此规则中,WHERE 语句里添加了条件 deviceId = 1,这样只会针对该设备进行计算。另外在 lag 函数中,OVER WHEN 条件同样限定了 deviceId,这样与当前设备值比较的只会是该设备上一次的值,排除了数据流中其余设备数据的影响。

除了 lag 函数,其余分析函数,例如 had_changed 等都支持 OVER 子句来限定状态的维度。详细信息请查看分析函数

总电流持续10s超过200A

sql
select concurrency from demo group by SLIDINGWINDOW(ss,0,10) over (when concurrency > 200) having min(concurrency) > 200;

该规则会当接收到 200A 以上的电流数据时开启一个窗口,如果该窗口中最小的数据也大于 200A ,则满足需求然后输出事件

json
{"concurrency":100,"ts":1} 
{"concurrency":300,"ts":2} 
{"concurrency":300,"ts":3} 
{"concurrency":300,"ts":4} 
{"concurrency":300,"ts":5} 
{"concurrency":300,"ts":6} 
{"concurrency":300,"ts":7} 
{"concurrency":300,"ts":8} 
{"concurrency":300,"ts":9} 
{"concurrency":300,"ts":10}  
{"concurrency":300,"ts":11} 输出事件