Reset Pipeline Offset¶
重置指定流数据处理任务的Kafka Offset。
前提条件¶
已通过流数据处理服务创建流数据处理任务,并且流数据处理任务处于暂停或停止状态。
请求格式¶
POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset
请求参数(URI)¶
名称 |
位置(Path/Query) |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|---|
pipelineId |
Path |
必需 |
String |
流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。 |
orgId |
Query |
必需 |
String |
用户所属的组织ID。如何获取orgId信息>> |
响应参数¶
名称 |
数据类型 |
描述 |
---|---|---|
data |
String |
运行成功返回空字符串。 |
错误码¶
代码 |
错误信息 |
描述 |
---|---|---|
61108 |
Stream processing job does not exit. |
流数据处理任务不存在。 |
61168 |
Failed to get the Kafka average rate. |
获取 Kafka 平均输入速率失败。 |
示例¶
请求示例¶
url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset?orgId=yourOrgId
method: POST
返回示例¶
{
"code": 0,
"msg": "OK",
}
Java SDK调用示例¶
import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;
public class Sample {
private static final String API_Gateway_URL = "https://{domain_url}";
private Poseidon poseidon;
private static class Request extends PoseidonRequest {
public void setBodyParams(String key, Object value) {
bodyParams().put(key, value);
}
public void setMethod(String method) {
this.method = method;
}
private String method;
@Override
public String baseUri() {
return "";
}
@Override
public String method() {
return method;
}
}
@Before
public void init() {
poseidon = Poseidon.config(
PConfig.init()
.appKey("AccessKey of your APP")
.appSecret("SecretKey of your APP")
).method("POST");
}
@Test
public void ResetOffset() {
Request request = new Request();
JSONObject response = poseidon
.url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}/offset")
.queryParam("orgId", "yourOrgId")
.getResponse(request, JSONObject.class);
System.out.println(response);
}
}