Skip to content

eKuiper in Public Data Analysis

In the era of big data, there are many publicly available data sharing platforms where valuable information can be extracted through various processing methods. However, handling and analyzing public data typically require programming skills, which can be a learning barrier for non-technical users. This article uses eKuiper as an example to demonstrate how to process public data using basic SQL statements.

Scenario Introduction

This tutorial demonstrates how to use eKuiper to process the daily order table data of a bike-sharing company from the Shenzhen Open Data Platform. The operating steps are:

  • Subscribing to the API of the open data platform using the HTTP Pull Source
  • Creating streams and rules using eKuiper's REST API interface
  • Processing data using built-in SQL functions and rule pipelines
  • Visualizing the processed data by storing it and using an external API

Data Acquisition

eKuiper supports real-time data processing with millisecond-level of precision. In this tutorial, we will use the data from the daily order table of the Shenzhen Open Data Platform's bike-sharing company as an example to demonstrate how to fetch the corresponding API data using eKuiper for further processing.

If you want to analyze real-time updating APIs, you can reduce the interval of the HTTP Pull Source.

The URL and parameters of the data interface are as follows:

text
http://opendata.sz.gov.cn/api/29200_00403627/1/service.xhtml?page=1&rows=100&appKey=

Now let's try to use the HTTP Pull Source of eKuiper to fetch the first 100 records of message data from the data platform's HTTP server and input it into the eKuiper processing pipeline.

The configuration file for the HTTP Pull Source is located at etc/sources/httppull.yaml, and we need to configure the corresponding fields to enable eKuiper to fetch the data correctly. Here is the content of the configuration file:

yaml
default:
  url: 'https://opendata.sz.gov.cn/api/29200_00403627/1/service.xhtml?page=1&rows=2&appKey=<token>'
  method: get
  interval: 3600000
  timeout: 5000
  incremental: false
  body: ''
  bodyType: json
  insecureSkipVerify: true
  headers:
    Accept: application/json
  responseType: code

After that, we need to use a REST client to create the corresponding STREAM as the source input:

http
###
POST http://{{host}}/streams
Content-Type: application/json

{
  "sql": "CREATE STREAM pubdata(data array(struct(START_TIME string, START_LAT string, END_TIME string, END_LNG string, USER_ID string, START_LNG string, END_LAT string, COM_ID string))) WITH (TYPE=\"httppull\")"
}

Data Processing

By observing the data returned from the API, we can see that all the data we need is in the array field called data:

json
{
  "total": 223838214,
  "data": [
    {
      "START_TIME": "2021-01-30 13:19:32",
      "START_LAT": "22.6364092900",
      "END_TIME": "2021-01-30 13:23:18",
      "END_LNG": "114.0155348300",
      "USER_ID": "9fb2d1ec6142ace4d7405b**********",
      "START_LNG": "114.0133088800",
      "END_LAT": "22.6320290800",
      "COM_ID": "0755**"
    }
  ]
}

If we want to perform calculations and processing using SELECT for each data record, we need to use UNNEST to return the data from the array as multiple rows.

http
###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "demo_rule_1",
  "sql": "SELECT unnest(data) FROM pubdata",
  "actions": [{
    "log": {
    }
  }]
}

Create Rule Pipelines

We can employ the Memory Source to integrate the results of a prior rule into succeeding rules, thereby establishing a rule pipeline for systematically handling data generated by the preceding rule.

In the first step, we just need to add a new memory target/source to the actions field of the demo_rule_1:

json
{
  "id": "demo_rule_1",
  "sql": "SELECT unnest(data) FROM pubdata",
  "actions": [{
    "log": {
    },
    "memory": {
      "topic": "channel/data"
    }
  }]
}

Then, using the API, we create a new STREAM based on the memory source described above:

http
###
POST http://{{host}}/streams
Content-Type: application/json

{"sql" : "create stream pubdata2 () WITH (DATASOURCE=\"channel/data\", FORMAT=\"JSON\", TYPE=\"memory\")"}

After that, we can create new rules to process the source data:

http
###
POST http://{{host}}/rules/
Content-Type: application/json

{
  "id": "demo_rule_2",
  "sql": "SELECT * FROM pubdata2",
  "actions": [{
    "log": {
    }
  }]
}

Calculate Travel Distance with SQL

eKuiper provides a rich set of built-in SQL functions that can meet most calculation requirements in various scenarios, even without using extended plugins.

Since we already have the starting and ending coordinates of the bikes in our data, we can calculate the average speed of the bikes by applying the distance formula based on latitude and longitude:

coordinator

The explanation of the formula is as follows:

  1. Lng1 Lat1 represents the longitude and latitude of point A, and Lng2 Lat2 represents the longitude and latitude of point B.
  2. a = Lat1 – Lat2 is the difference between the latitudes of the two points, and b = Lng1 -Lng2 is the difference between the longitudes of the two points.
  3. 6378.137 is the radius of the Earth in kilometers.
  4. The calculated result is in kilometers. If the radius is changed to meters, the result will be in meters.
  5. The calculation precision is similar to the distance precision of Google Maps, with a difference range of less than 0.2 meters.

We can use the following SELECT statement to calculate the corresponding distance and duration:

sql
SELECT
    6378.138 * 2 * ASIN(
        SQRT(
            POW(
                SIN((cast(START_LAT,"float") * PI() / 180 - cast(END_LAT,"float") * PI() / 180) / 2), 2) +
                COS(cast(START_LAT,"float") * PI() / 180) * COS(cast(END_LAT,"float") * PI() / 180) *
            POW(
                SIN((cast(START_LNG,"float") * PI() / 180 - cast(END_LNG,"float") * PI() / 180) / 2), 2))) *1000
        AS distance,
    (to_seconds(END_TIME) - to_seconds(START_TIME))
        AS duration
FROM pubdata2

Calculate Travel Velocity

Once we have the distance and duration, we can continue the rule pipeline and calculate the velocity of the bikes in the next rule.

We can create a new STREAM by using the results of the SELECT statement in the previous step and then create the corresponding rule for the next processing step:

http
###
POST http://{{host}}/streams
Content-Type: application/json

{"sql" : "create stream pubdata3 () WITH (DATASOURCE=\"channel/data2\", FORMAT=\"JSON\", TYPE=\"memory\")"}

Now we can easily calculate the desired velocity of the bikes:

http
###
PUT http://{{host}}/rules/demo_rule_3
Content-Type: application/json

{
  "id": "demo_rule_3",
  "sql": "SELECT (distance / duration) AS velocity FROM pubdata3",
  "actions": [{
    "log": {
    }
  }]
}

In the eKuiper log, we can see similar calculation results like this:

text
2023-07-14 14:51:09 time="2023-07-14 06:51:09" level=info msg="sink result for rule demo_rule_3: [{\"velocity\":2.52405571799467}]" file="sink/log_sink.go:32" rule=demo_rule_3

The velocity field represents the velocity of the bikes, which is the value we need.

Visualizing the Data

Finally, we can store the calculated data in the corresponding database and display it using an external API in the desired chart format.

json
{
  "influx2": {
    "addr": "http://influx.db:8086",
    "token": "token",
    "org": "admin",
    "measurement": "test",
    "bucket": "pubdata",
    "tagKey": "tagKey",
    "tagValue": "tagValue",
    "fields": ["velocity", "user_id"]
  }
}

For example, users can easily retrieve the desired data from the InfluxDB and perform further processing using a Python script. The following script retrieves the first four records from the database and prints them in the format of quickchart.io parameters:

python
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

url = "http://influx.db:8086"
token = "token"
org = "admin"
bucket = "pubdata"

client = InfluxDBClient(url=url, token=token)
client.switch_database(bucket=bucket, org=org)

query = f'from(bucket: "{bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "test") |> limit(n: 4)'

result = client.query_api().query(query)

params = '''{
  type: 'bar',
  data: {
    labels: {[v[:7] for v in record.values['user_id']]},
    datasets: [{
      label: 'Users',
      data: {record.values['velocity']}
    }]
  }
}'''

print(params)

client.close()

After that, we can visualize the average velocity of the first four users using a bar chart interface provided by quickchart.io:

public-data-chart