Get Stage State¶
查询指定流数据处理任务中某个指定算子的中间状态数据。
前提条件¶
已通过流数据处理服务创建流数据处理任务。
请求格式¶
POST https://{apigw-address}/streaming/v2.0/stage-state?action=get
请求参数(URI)¶
名称 |
位置(Path/Query) |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|---|
orgId |
Query |
必需 |
String |
用户所属的组织ID。如何获取orgId信息>> |
请求参数(Body)¶
名称 |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|
pipelineId |
必需 |
String |
流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看。 |
stageInstanceName |
必需 |
String |
算子实例名称,可通过 流运维 页面,点击流数据处理任务名称,选择算子后,在 Info 页面查看。 |
assetIds |
必需 |
String |
资产ID,支持查询多个资产,多个资产ID之间用英文逗号隔开。 |
pointIds |
必需 |
String |
测点ID,支持多测点查询,多个测点间用英文逗号隔开;支持查询的(资产数*测点数)上限为10万。 |
响应参数¶
名称 |
数据类型 |
描述 |
---|---|---|
data |
List<JSONObject> |
返回算子中间状态数据列表。详见 items。 被查询的算子类型不同(根据请求参数中 stageInstanceName 前缀区分),将返回不同算子的中间状态数据列表。注意:不同类型算子的数据格式也有所区别。 |
items¶
类型 1:通用类型¶
属于该类型的算子:LastRecordAppender、LastChangedRecordAppender、LatestRecordMerger、RecordCapturer
名称 |
数据类型 |
描述 |
---|---|---|
modelId |
String |
模型 ID。 |
modelIdPath |
String |
模型 ID 路径。 |
attr |
Object |
包含算子输出结果的结构体。 |
assetId |
String |
资产ID。 |
pointId |
String |
测点 ID。 |
time |
Long |
测点数据时间戳,UNIX 时间,精确到秒。 |
value |
String |
测点数值。 |
quality |
Integer |
测点数据质量打标。 |
dq |
Long |
测点数据自带的质量位。如果测点无质量位,则无该字段。 |
类型 2:简易时间戳类型¶
属于该类型的算子:LatePointTagger
名称 |
数据类型 |
描述 |
---|---|---|
assetId |
String |
资产 ID。 |
pointId |
String |
测点 ID。 |
time |
Long |
测点数据时间戳,UNIX 时间,精确到秒。 |
类型 3:窗口聚合类型¶
属于该类型的算子:FixedTimeWindowAggregator、SlidingTimeWindowAggregator
名称 |
数据类型 |
描述 |
---|---|---|
assetId |
String |
资产 ID。 |
pointId |
String |
测点 ID。 |
%s::windowStrategy::%s |
Object |
窗口函数计算所需的状态信息,%s 代表根据资产、测点等信息确定的字符串。 |
%s::updated |
Object |
窗口函数的上下文信息,%s 代表根据资产、测点等信息确定的字符串。 |
%s::watermark |
Long |
窗口函数中当前资产的水位线,%s 代表根据资产、测点等信息确定的字符串。 |
类型 4:简单窗口聚合类型¶
属于该类型的算子:SimplifiedTimeWindowAggregator
名称 |
数据类型 |
描述 |
---|---|---|
assetId |
String |
资产 ID。 |
pointId |
String |
测点 ID。 |
time |
Long |
测点数据时间戳,UNIX 时间,精确到秒。 |
windowSum |
Object |
窗口函数计算所需的状态信息。 |
错误码¶
代码 |
错误信息 |
描述 |
---|---|---|
61102 |
Missing param xx. |
参数格式不正确。 |
61105 |
Too many points. |
请求的总测点数超过限制。 |
61106 |
Wrong pipelineId or OU has no privilege. |
pipelineId不正确,或者不属于指定的组织。 |
61107 |
Not supported Stage type. |
不支持的算子类型。 |
61199 |
其他错误。 |
示例¶
请求示例¶
url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId
method: POST
requestBody:
{
"pipelineId":"yourPipelineId",
"stageInstanceName":"yourStageInstanceName",
"assetIds":"yourAssetIds",
"pointIds":"yourPointIds"
}
返回示例¶
{
"code": 0,
"msg": "OK",
"data": {
"items": [
{
"modelId": "modelId",
"modelIdPath": "/modelIdPath",
"assetId": "assetId",
"pointId": "pointId",
"time": 1582648020580,
"value": 3.1,
"quality": 0,
"attr": {}
},
{
"modelId": "modelId",
"modelIdPath": "/modelIdPath",
"assetId": "assetId2",
"pointId": "pointId2",
"time": 1582648020580,
"value": 3.2,
"quality": 1,
"attr": {}
}
]
}
}
SDK 示例¶
你可以在 Github 上获取流数据处理的 SDK 示例: