Get Pipeline Offset and Lag

获取指定流数据处理任务的Offset和Lag。

前提条件

已通过流数据处理服务创建流数据处理任务,并且流数据处理任务处于运行状态。

请求格式

GET https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset-lag

请求参数(URI)

名称 位置(Path/Query) 必需/可选 数据类型 描述
pipelineId Path 必需 String 流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。
orgId Query 必需 String 用户所属的组织ID。如何获取orgId信息>>

响应参数

名称 数据类型 描述
data List<JSONObject> 返回流数据处理任务的Offset和Lag信息。详见 data

data

名称 数据类型 描述
pipelineId String 流数据处理任务ID。
topicName String Kafka Topic 名称。
lagAndOffset JSONObject 流数据处理任务的 Lag 和 Offset 详细信息。详见 Lag and Offset

Lag and Offset

名称 数据类型 描述
lag String 当前时间点的 Lag。
offset String 当前时间点的 Offset。
ts Long 时间戳。

错误码

代码 错误信息 描述
61108 Stream processing job does not exist. 流数据处理任务不存在。
99000 Internal Server Error. 服务内部错。

示例

请求示例

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset-lag?orgId=yourOrgId

method: GET

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "topicName": "MEASURE_POINT_INTERNAL_o15517683199241",
    "lagAndOffset": [
      {
        "lag": 0,
        "offset": 6094977,
        "ts": 1616759448328
      },
      {
        "lag": 0,
        "offset": 6094977,
        "ts": 1616759673388
      }
    ],
    "pipelineId": "565c0114-9585-4372-86d1-50dde5ab8ce4"
  }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("GET");
    }

    @Test
    public void GetOffsetLag() {
        Request request = new Request();

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}/offset-lag")
                .queryParam("orgId", "yourOrgId")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}