# Rules

Rules are defined by JSON, below is an example.

{
  "id": "rule1",
  "sql": "SELECT demo.temperature, demo1.temp FROM demo left join demo1 on demo.timestamp = demo1.timestamp where demo.temperature > demo1.temp GROUP BY demo.temperature, HOPPINGWINDOW(ss, 20, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "tcp://47.52.67.87:1883",
        "topic": "demoSink"
      }
    }
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

The parameters for the rules are:

Parameter nameOptionalDescription
idfalseThe id of the rule. The rule id must be unique in the same eKuiper instance.
nametrueThe display name or description of a rule
sqlrequired if graph is not definedThe sql query to run for the rule
actionsrequired if graph is not definedAn array of sink actions
graphrequired if sql is not definedThe json presentation of the rule's DAG(directed acyclic graph)
optionstrueA map of options

# Rule Logic

A rule represents a stream processing flow from data source that ingest data into the flow to various processing logic to actions that engest the data to external systems.

There are two ways to define the flow aka. business logic of a rule. Either using SQL/actions combination or using the newly added graph API.

# SQL Query

By specifying the sql and actions property, we can define the business logic of a rule in a declarative way. Among these, sql defines the SQL query to run against a predefined stream which will transform the data. The output data can then route to multiple locations by actions.

# SQL

THE simplest rule SQL is like SELECT * FROM demo. It has ANSI SQL like syntax and can leverage abundant operators and functions provided by eKuiper runtime. See SQL for more info of eKuiper SQL.

Most of the SQL clause are defining the logic except the FROM clause, which is responsible to specify the stream. In this example, demo is the stream. It is possible to have multiple streams or streams/tables by using join clause. As a streaming engine, there must be at least one stream in a rule.

Thus, the SQL query here actually defines two parts:

  • The stream(s) or table(s) to be processed.
  • How to process.

Before using the SQL rule, the stream must define in prior. Please check streams for detail.

# Actions

The actions part defines the output action for a rule. Each rule can have multiple actions. An action is an instance of a sink connector. When define actions, the key is the sink connector type name, and the value is the properties.

eKuiper has built in abundant sink connector type such as mqtt, rest and file. Users can also extend more sink type to be used in a rule action. Each sink type have its own property set. For more detail, please check sink.

# Graph rule

Since eKuiper 1.6.0, eKuiper provides graph property in the rule model as an alternative way to create a rule. The property defines the DAG of a rule in JSON format. It is easy to map it directly to a graph in a GUI editor and suitable to serve as the backend of a drag and drop UI. An example of the graph rule definition is as below:

{
  "id": "rule1",
  "name": "Test Condition",
  "graph": {
    "nodes": {
      "demo": {
        "type": "source",
        "nodeType": "mqtt",
        "props": {
          "datasource": "devices/+/messages"
        }
      },
      "humidityFilter": {
        "type": "operator",
        "nodeType": "filter",
        "props": {
          "expr": "humidity > 30"
        }
      },
      "logfunc": {
        "type": "operator",
        "nodeType": "function",
        "props": {
          "expr": "log(temperature) as log_temperature"
        }
      },
      "tempFilter": {
        "type": "operator",
        "nodeType": "filter",
        "props": {
          "expr": "log_temperature < 1.6"
        }
      },
      "pick": {
        "type": "operator",
        "nodeType": "pick",
        "props": {
          "fields": ["log_temperature as temp", "humidity"]
        }
      },
      "mqttout": {
        "type": "sink",
        "nodeType": "mqtt",
        "props": {
          "server": "tcp://${mqtt_srv}:1883",
          "topic": "devices/result"
        }
      }
    },
    "topo": {
      "sources": ["demo"],
      "edges": {
        "demo": ["humidityFilter"],
        "humidityFilter": ["logfunc"],
        "logfunc": ["tempFilter"],
        "tempFilter": ["pick"],
        "pick": ["mqttout"]
      }
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

The graph property is a json structure with nodes to define the nodes presented in the graph and topo to define the edge between nodes. The node type can be built-in node types such as window node and filter node etc. It can also be a user defined node from plugins. Please refer to graph rule for more detail.

# Options

The current options includes:

Option nameType & Default ValueDescription
isEventTimeboolean: falseWhether to use event time or processing time as the timestamp for an event. If event time is used, the timestamp will be extracted from the payload. The timestamp filed must be specified by the stream definition.
lateToleranceint64:0When working with event-time windowing, it can happen that elements arrive late. LateTolerance can specify by how much time(unit is millisecond) elements can be late before they are dropped. By default, the value is 0 which means late elements are dropped.
concurrencyint: 1A rule is processed by several phases of plans according to the sql statement. This option will specify how many instances will be run for each plan. If the value is bigger than 1, the order of the messages may not be retained.
bufferLengthint: 1024Specify how many messages can be buffered in memory for each plan. If the buffered messages exceed the limit, the plan will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. A bigger value will accommodate more throughput but will also take up more memory footprint.
sendMetaToSinkbool:falseSpecify whether the meta data of an event will be sent to the sink. If true, the sink can get te meta data information.
sendErrorbool: trueWhether to send the error to sink. If true, any runtime error will be sent through the whole rule into sinks. Otherwise, the error will only be printed out in the log.
qosint:0Specify the qos of the stream. The options are 0: At most once; 1: At least once and 2: Exactly once. If qos is bigger than 0, the checkpoint mechanism will be activated to save states periodically so that the rule can be resumed from errors.
checkpointIntervalint:300000Specify the time interval in milliseconds to trigger a checkpoint. This is only effective when qos is bigger than 0.
restartStrategystructSpecify the strategy to automatic restarting rule after failures. This can help to get over recoverable failures without manual operations. Please check Rule Restart Strategy for detail configuration items.

For detail about qos and checkpointInterval, please check state and fault tolerance.

The rule options can be defined globally in etc/kuiper.yaml under the rules section. The options defined in the rule json will override the global setting.

# Rule Restart Strategy

The restart strategy options include:

Option nameType & Default ValueDescription
attemptsint: 0The maximum retry times. If set to 0, the rule will fail immediately without retrying.
delayint: 1000The default interval in millisecond to retry. If multiplier is not set, the retry interval will be fixed to this value.
maxDelayint: 30000The maximum interval in millisecond to retry. Only effective when multiplier is set so that the delay will increase for each retry.
multiplierfloat: 2The exponential to increase the interval.
jitterFactorfloat: 0.1How large random value will be added or subtracted to the delay to prevent restarting multiple rules at the same time.

The default values can be changed by editing the etc/kuiper.yaml file.