Reset Pipeline Offset

Reset the Kafka Offset of a specific stream processing pipeline.

Prerequisites

A stream processing pipeline is created with the Stream Processing service, and the status of the pipeline is Paused or Stopped.

Request Format

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset

Request Parameters (URI)

Name

Location (Path/Query)

Mandatory/Optional

Data Type

Description

pipelineId

Path

Mandatory

String

The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.

orgId

Query

Mandatory

String

The organization ID. How to get the orgId>>

Response Parameters

Name

Data Type

Description

data

String

Returns an empty string upon success.

Error Code

Code

Error Information

Description

61108

stream processing pipeline does not exit.

Stream processing pipeline does not exist. Please check the pipeline ID.

61168

Failed to get the Kafka average rate.

Failed to get the average rate of Kafka input records.

Sample

Request Sample

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset?orgId=yourOrgId

method: POST

Return Sample

{
  "code": 0,
  "msg": "OK",
}

Java SDK Sample

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void ResetOffset() {
        Request request = new Request();

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}/offset")
                .queryParam("orgId", "yourOrgId")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}