Start Pipeline¶
启动运行指定流数据处理任务。
前提条件¶
已通过流数据处理服务将流数据处理任务发布上线,并获取到流数据处理任务的ID。
请求格式¶
POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
请求参数(URI)¶
名称 |
位置(Path/Query) |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|---|
pipelineId |
Path |
必需 |
String |
流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。 |
orgId |
Query |
必需 |
String |
用户所属的组织ID。如何获取orgId信息>> |
请求参数(Body)¶
名称 |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|
executionMode |
必需 |
Integer |
指定流数据处理任务的运行模式。0:Standalone模式;1:集群模式。 |
kafkaRate |
必需 |
Integer |
指定 Kafka 读取速率,单位为 record/s,默认值为 1000。 |
resourceConfig |
可选 |
JSONObject |
流数据任务运行资源配置。对Standalone模式,直接指定运行流数据处理任务所需的CU数量(单位默认为cu)。对集群模式,JSON体中包含的参数,详见 Resource Config |
Resource Config¶
名称 |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|
server |
可选 |
String |
指定运行流数据处理任务所需的 Server 资源(单位默认为cu)。 |
yarn |
可选 |
JSONObject |
指定运行流数据处理任务所需的 Yarn 资源,JSON 体中包含以下参数:
|
advanced |
可选 |
JSONObject |
指定以集群模式运行流数据处理任务的高级参数。 |
响应参数¶
名称 |
数据类型 |
描述 |
---|---|---|
data |
String |
运行成功返回空字符串。 |
错误码¶
代码 |
错误信息 |
描述 |
---|---|---|
61100 |
The Stream configuration JSON is invalid. |
流数据处理任务配置JSON不正确。 |
61108 |
Stream processing job does not exit. |
流数据处理任务不存在。 |
61165 |
Stream processing job with status [XX] cannot be started. |
流数据处理任务当前处于不能启动的状态。 |
61121 |
Another user in the organization is starting a stream processing job. Please try again later. |
同一组织同时只能触发一条流任务,请稍后重试。 |
61128 |
Your organization has not requested XX processing resource. Please request the resource and then start the jobs. |
该组织没有申请相应运行模式的资源。 |
61126 |
Allocated resource is not enough to start a stream processing job. Please check the resource configuration. |
流数据处理任务配置的运行资源已经超出配额,无法启动。 |
示例¶
请求示例¶
以Standalone模式运行
url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
method: POST
requestBody:
{
"executionMode": 0,
"kafkaRate": 1000,
"resourceConfig": {
"server": 1
}
}
以集群模式运行
url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
method: POST
requestBody:
{
"executionMode": 1,
"kafkaRate": 1000,
"resourceConfig": {
"server": 1,
"yarn": {
"workerCount": 2,
"master": 1,
"slave": 2
},
"advanced": [
{
"key": "spark.streaming.backpressure.enabled",
"value": "true"
},
{
"key": "spark.driver.memoryOverhead",
"value": "1000"
},
{
"key": "spark.executor.memoryOverhead",
"value": "800"
},
{
"key": "spark.memory.fraction",
"value": "0.8"
},
{
"key": "spark.memory.storageFraction",
"value": "0.7"
}
]
}
}
返回示例¶
{
"code": 0,
"msg": "OK",
}
Java SDK调用示例¶
import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class Sample {
private static final String API_Gateway_URL = "https://{domain_url}";
private Poseidon poseidon;
private static class Request extends PoseidonRequest {
public void setBodyParams(String key, Object value) {
bodyParams().put(key, value);
}
public void setMethod(String method) {
this.method = method;
}
private String method;
@Override
public String baseUri() {
return "";
}
@Override
public String method() {
return method;
}
}
@Before
public void init() {
poseidon = Poseidon.config(
PConfig.init()
.appKey("AccessKey of your APP")
.appSecret("SecretKey of your APP")
).method("POST");
}
@Test
public void StartPipeline() {
Request request = new Request();
request.setBodyParams("executionMode", "0");
request.setBodyParams("kafkaRate", "1000");
Map<String, String> resourceConfig=new HashMap<>();
resourceConfig.put("server","1");
JSONObject response = poseidon
.url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}")
.queryParam("orgId", "yourOrgId")
.queryParam("action", "start")
.getResponse(request, JSONObject.class);
System.out.println(response);
}
}