Get Stage State

查询指定流数据处理任务中某个指定算子的中间状态数据。

前提条件

用户已通过流数据处理服务创建StreamSets流数据处理任务。

请求格式

POST https://{apigw-address}/streaming/v2.0/stage-state?action=get

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称

必需/可选

数据类型

描述

pipelineId

必需

String

流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看。

stageInstanceName

必需

String

算子实例名称,可通过 流运维 页面,点击流数据处理任务名称,选择算子后,在 Info 页面查看。

assetIds

必需

String

资产ID,支持查询多个资产,多个资产ID之间用英文逗号隔开。

pointIds

必需

String

测点ID,支持多测点查询,多个测点间用英文逗号隔开;支持查询的(资产数*测点数)上限为10万。

响应参数

名称

数据类型

描述

data

List<JSONObject>

返回算子中间状态数据列表。详见 items

items

名称

数据类型

描述

modelId

String

模型ID。

modelIdPath

String

模型ID路径。

attr

Object

包含算子输出结果的结构体。

assetId

String

资产ID。

pointId

String

测点ID。

time

Long

测点数据时间戳,UNIX时间,精确到秒。

value

String

测点数值。

quality

Integer

测点数据质量打标。

dq

Long

测点数据自带的质量位。

错误码

代码

错误信息

描述

61102

Missing param xx

参数格式不正确

61105

Too many points

请求的总测点数超过限制

61106

Wrong pipelineId or OU has no privilege

pipelineId不正确,或者不属于所提供的OU

61107

Not supported Stage Type

不支持的算子类型

61199

xx

其他错误

示例

请求示例

url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId

method: POST

requestBody:
{
    "pipelineId":"yourPipelineId",
    "stageInstanceName":"yourStageInstanceName",
    "assetIds":"yourAssetIds",
    "pointIds":"yourPointIds"
}

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "items": [
  {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId",
        "pointId": "pointId",
        "time": 1582648020580,
        "value": 3.1,
        "quality": 0,
        "attr": {}
    },
    {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId2",
        "pointId": "pointId2",
        "time": 1582648020580,
        "value": 3.2,
        "quality": 1,
        "attr": {}
    }
    ]
  }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST").header("Content-Type", "application/json");
    }

    @Test
    public void GetStageState() {
        Request request = new Request();
        request.setBodyParams("assetIds", "yourAssetId");
        request.setBodyParams("pointIds", "yourPointId");
        request.setBodyParams("pipelineId", "yourPipelineId");
        request.setBodyParams("stageInstanceName", "yourStageInstanceName");

        JSONObject response = poseidon
                .url(APIM_BASE_URL + "/streaming/v2.0/stage-state")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "get")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}