Get Stage State

查询指定流数据处理任务中某个指定算子的中间状态数据。

前提条件

已通过流数据处理服务创建流数据处理任务。

请求格式

POST https://{apigw-address}/streaming/v2.0/stage-state?action=get

请求参数(URI)

名称 位置(Path/Query) 必需/可选 数据类型 描述
orgId Query 必需 String 用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称 必需/可选 数据类型 描述
pipelineId 必需 String 流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看。
stageInstanceName 必需 String 算子实例名称,可通过 流运维 页面,点击流数据处理任务名称,选择算子后,在 Info 页面查看。
assetIds 必需 String 资产ID,支持查询多个资产,多个资产ID之间用英文逗号隔开。
pointIds 必需 String 测点ID,支持多测点查询,多个测点间用英文逗号隔开;支持查询的(资产数*测点数)上限为10万。

响应参数

名称 数据类型 描述
data List<JSONObject> 返回算子中间状态数据列表。详见 items。 被查询的算子类型不同(根据请求参数中 stageInstanceName 前缀区分),将返回不同算子的中间状态数据列表。注意:不同类型算子的数据格式也有所区别。

items

类型 1:通用类型

属于该类型的算子:LastRecordAppenderLastChangedRecordAppenderLatestRecordMergerRecordCapturer

名称 数据类型 描述
modelId String 模型 ID。
modelIdPath String 模型 ID 路径。
attr Object 包含算子输出结果的结构体。
assetId String 资产ID。
pointId String 测点 ID。
time Long 测点数据时间戳,UNIX 时间,精确到秒。
value String 测点数值。
quality Integer 测点数据质量打标。
dq Long 测点数据自带的质量位。如果测点无质量位,则无该字段。

类型 2:简易时间戳类型

属于该类型的算子:LatePointTagger

名称 数据类型 描述
assetId String 资产 ID。
pointId String 测点 ID。
time Long 测点数据时间戳,UNIX 时间,精确到秒。

类型 3:窗口聚合类型

属于该类型的算子:FixedTimeWindowAggregatorSlidingTimeWindowAggregator

名称 数据类型 描述
assetId String 资产 ID。
pointId String 测点 ID。
%s::windowStrategy::%s Object 窗口函数计算所需的状态信息,%s 代表根据资产、测点等信息确定的字符串。
%s::updated Object 窗口函数的上下文信息,%s 代表根据资产、测点等信息确定的字符串。
%s::watermark Long 窗口函数中当前资产的水位线,%s 代表根据资产、测点等信息确定的字符串。

类型 4:简单窗口聚合类型

属于该类型的算子:SimplifiedTimeWindowAggregator

名称 数据类型 描述
assetId String 资产 ID。
pointId String 测点 ID。
time Long 测点数据时间戳,UNIX 时间,精确到秒。
windowSum Object 窗口函数计算所需的状态信息。

错误码

代码 错误信息 描述
61102 Missing param xx. 参数格式不正确。
61105 Too many points. 请求的总测点数超过限制。
61106 Wrong pipelineId or OU has no privilege. pipelineId不正确,或者不属于指定的组织。
61107 Not supported Stage type. 不支持的算子类型。
61199   其他错误。

示例

请求示例

url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId

method: POST

requestBody
{
    "pipelineId":"yourPipelineId",
    "stageInstanceName":"yourStageInstanceName",
    "assetIds":"yourAssetIds",
    "pointIds":"yourPointIds"
}

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "items": [
  {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId",
        "pointId": "pointId",
        "time": 1582648020580,
        "value": 3.1,
        "quality": 0,
        "attr": {}
    },
    {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId2",
        "pointId": "pointId2",
        "time": 1582648020580,
        "value": 3.2,
        "quality": 1,
        "attr": {}
    }
    ]
  }
}

SDK 示例


你可以在 Github 上获取流数据处理的 SDK 示例: