HTTP Pull 数据源
stream sourcescan table source
eKuiper 内置支持 HTTP 数据源。通过 HTTP Pull 数据源连接器,eKuiper 可从外部 HTTP 服务器检索数据,并支持基于指定间隔或由特定条件触发拉取数据。
配置
eKuiper 连接器可以通过环境变量、REST API 或配置文件进行配置,本节将介绍配置文件的使用方法。
HTTP Pull 源连接器的配置文件位于:$ekuiper/etc/sources/http_pull.yaml
,其中:
- default:对应全局连接配置。
- 自定义部分:对于需要自定义某些连接参数的场景,该部分的配置将覆盖全局连接配置。
以下示例包括一个全局配置和自定义配置 application_conf
:
#全局 httppull 配置
default:
# 请求服务器地址的URL
url: http://localhost
# post, get, put, delete
method: post
# 请求之间的间隔,时间单位为 ms
interval: 10000
# http请求超时,时间单位为 ms
timeout: 5000
# 如果将其设置为 true,则将与最后的结果进行比较; 如果两个请求的响应相同,则将跳过发送结果。
# 可能的设置可能是:true/false
incremental: false
# 请求正文,例如'{"data": "data", "method": 1}'
body: '{}'
# 正文类型, none、text、json、html、xml、javascript、form
bodyType: json
# 请求所需的HTTP标头
headers:
Accept: application/json
# 如何检查响应状态,支持通过状态码或 body
responseType: code
# 设置数据压缩算法,可选项有 gzip、zlib、zstd、flate,大小写敏感
# compression: "gzip"
# 获取 token
# oAuth:
# # 设置如何获取访问码
# access:
# # 获取访问码的 URL,总是使用 POST 方法发送请求
# url: https://127.0.0.1/api/token
# # 请求正文
# body: '{"username": "admin","password": "123456"}'
# # 令牌的过期时间,以字符串表示,时间单位为秒,允许使用模板
# expire: '3600'
# # 如何刷新令牌
# refresh:
# # 刷新令牌的 URL,总是使用 POST 方法发送请求
# url: https://127.0.0.1/api/refresh
# # HTTP 请求头,允许使用模板
# headers:
# identityId: '{{.data.identityId}}'
# token: '{{.data.token}}'
# # 请求正文
# body: ''
#重载全局配置
application_conf: #Conf_key
incremental: true
url: http://localhost:9090/pull
全局配置
用户可在 default
部分指定全局设置。
HTTP 请求配置
url
:获取结果的 URL。method
:HTTP 方法,支持 post、get、put 和 delete。interval
:请求之间的间隔时间,单位为 ms。timeout
:HTTP 请求的超时时间,单位为 ms。body
:请求的正文,例如{"data": "data", "method": 1}
bodyType
:正文类型,可选值 none、text、json、html、xml、javascript、format.headers
:需要与 HTTP 请求一起发送的 HTTP 请求标头。responseType
:定义如何解析 HTTP 响应。目前支持两种方式:code
:通过 HTTP 响应码判断响应状态。body
:通过 HTTP 响应正文判断响应状态。要求响应正文为 JSON 格式且其中包含 code 字段。
HTTP 数据压缩支持
用户可以选择是否对 HTTP
数据使用受支持的压缩算法进行压缩,此方法基于标准的端到端压缩技术协定与主动协商机制,通过请求中指定 Accept-Encoding
请求头告知服务器端所支持的压缩算法,服务器根据 Accept-Encoding
中指定的压缩算法对数据进行解压缩并在响应请求时使用 Content-Encoding
来告知客户端响应数据使用了哪种压缩算法,在 eKuiper
中通常要求 服务器端与 eKuiper 使用相同的压缩算法 分别对响应和请求进行压缩,所以在配置数据压缩时,请保证您所指定的压缩算法服务器端同样支持。
compression
:设置数据压缩算法,可选项有 gzip、zlib、zstd、flate,大小写敏感
对于 Flate
压缩算法的额外说明:
对于 Flate
压缩算法会在请求头 Accept-Encoding
中设置值为 deflate
,其他压缩算法则设置值为 gzip
、zlib
或 zstd
。
安全配置
证书路径
certificationPath
: 证书路径,示例值:d3807d9fa5-certificate.pem
。可以是绝对路径,也可以是相对路径。如指定相对路径,那么父目录为执行kuiperd
命令的路径,例如:- 如果在
/var/kuiper
中运行bin/kuiperd
,那么父目录为/var/kuiper
。 - 如果运行从
/var/kuiper/bin
中运行./kuiperd
,那么父目录为/var/kuiper/bin
。
- 如果在
privateKeyPath
:私钥路径,示例值:d3807d9fa5-private.pem.key
。可以是绝对路径,也可以是相对路径,具体可参考certificationPath
。rootCaPath
:根证书路径。可以是绝对路径,也可以是相对路径。certficationRaw
: 经过 base64 编码过的证书原文, 如果同时定义了certificationPath
将会先用该参数。privateKeyRaw
: 经过 base64 编码过的密钥原文, 如果同时定义了privateKeyPath
将会先用该参数。rootCARaw
: 经过 base64 编码过的根证书原文, 如果同时定义了rootCaPath
将会先用该参数。insecureSkipVerify
:是否跳过证书验证。如设置为true
,TLS 接受服务器提供的任何证书以及该证书中的任何主机名。注意:此时,TLS 容易受到中间人攻击。默认值:false
。
OAuth 认证
OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服务器上的用户数据。oAuth 最常见的流程是授权代码,大多用于服务器端和移动网络应用。在这个流程中,用户使用他们的账户登录到网络应用中,认证码会返回给应用。之后,应用程序可以使用认证码来请求访问令牌,并可能在到期后通过刷新令牌来刷新令牌。
以下配置假设已经获取认证码,用户只需指定令牌申请,该过程可能需要该认证码或只是密码(OAuth 的变体)。
OAuth
:定义类 OAuth 的认证流程。其他的认证方式如 apikey 可以直接在 headers 设置密钥,不需要使用这个配置。
access
url
:获取访问码的网址,将始终使用 POST 方法访问。body
:获取访问令牌的请求主体。通常需要授权码。expire
:令牌的过期时间,单位:秒,允许使用模板,因此必须是字符串。
refresh
url
:刷新令牌的网址,将始终使用 POST 方法访问。headers
:用于刷新令牌的请求头。通常把令牌放在这里,用于授权。body
:刷新令牌的请求主体。当使用头文件来传递刷新令牌时,可能不需要配置此选项。
数据处理配置
增量数据处理
incremental
:如设置为 true
,则将与上次的结果进行比较;如果两次请求的响应相同,则将跳过发送结果。
动态属性
动态属性是指在运行时会动态更新的属性。您可以使用动态属性来指定 HTTP 请求的 URL、正文和标头。其语法基于数据模板格式的动态属性。
可使用的动态属性包括:
PullTime
:本次拉取的 int64 格式时间戳。LastPullTime
:上次拉取的 int64 格式时间戳。- 来自 oAuth 的属性:oAuth 返回体的属性也可以使用。 例如,假设返回体为
{"token": "xxxxxx"}
,则可通过{{.token}}
访问 token 。
若目标 HTTP 服务支持过滤开始和结束时间,可以使用这两个属性来实现增量拉取。
- 通过 URL 参数传递开始和结束时间:
http://localhost:9090/pull?start={{.LastPullTime}}&end={{.PullTime}}
. - 通过 body 参数传递开始和结束时间:{"start": {{.LastPullTime}}, "end": {{.PullTime}}`.
自定义配置
对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。
配置示例
#覆盖全局配置
application_conf: #Conf_key
incremental: true
url: http://localhost:9090/pull
定义 application_conf
配置组后,如希望在创建流时使用此配置,可通过 CONF_KEY
选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 default
配置中的相应参数。详细步骤,可参考 流语句。
示例
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="httppull", KEY="USERID", CONF_KEY="application_conf");
创建流类型源
完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。HTTP Pull 数据源连接器可以作为流式或扫描表类数据源使用,本节将以流类型源为例进行说明。
您可通过 REST API 或 CLI 工具在 eKuiper 中创建 HTTP Pull 数据源。
通过 REST API 创建
REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。
示例
{"sql":"create stream http_stream () WITH (FORMAT="json", TYPE="http_pull"}
详细操作步骤及命令解释,可参考 通过 REST API 进行流管理。
通过 CLI 创建
用户也可以通过命令行界面(CLI)直接访问 eKuiper。
进入 eKuiper
bin
目录:bashcd path_to_eKuiper_directory/bin
使用
create
命令创建规则,指定 HTTP Pull 数据源,如:bashbin/kuiper create stream http_stream '() WITH (FORMAT="json", TYPE="http_pull")'
详细操作步骤及命令解释,可参考 通过 CLI 进行流管理。
查询表
httppull 同时也支持成为一个查询表。我们可以使用创建表语句来创建一个 httppull 查询表。它将与实体关系数据库绑定并按需查询:
CREATE TABLE httppullTable() WITH (DATASOURCE="/url", CONF_KEY="default", TYPE="httppull", KIND="lookup")