Start Pipeline¶
启动运行指定流数据处理任务。
前提条件¶
已通过流数据处理服务将流数据处理任务发布上线,并获取到流数据处理任务的ID。
请求格式¶
POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
请求参数(URI)¶
名称  | 
位置(Path/Query)  | 
必需/可选  | 
数据类型  | 
描述  | 
|---|---|---|---|---|
pipelineId  | 
Path  | 
必需  | 
String  | 
流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。  | 
orgId  | 
Query  | 
必需  | 
String  | 
用户所属的组织ID。如何获取orgId信息>>  | 
请求参数(Body)¶
名称  | 
必需/可选  | 
数据类型  | 
描述  | 
|---|---|---|---|
executionMode  | 
必需  | 
Integer  | 
指定流数据处理任务的运行模式。0:Standalone模式;1:集群模式。  | 
kafkaRate  | 
必需  | 
Integer  | 
指定 Kafka 读取速率,单位为 record/s,默认值为 1000。  | 
resourceConfig  | 
可选  | 
JSONObject  | 
流数据任务运行资源配置。对Standalone模式,直接指定运行流数据处理任务所需的CU数量(单位默认为cu)。对集群模式,JSON体中包含的参数,详见 Resource Config  | 
Resource Config¶
名称  | 
必需/可选  | 
数据类型  | 
描述  | 
|---|---|---|---|
server  | 
可选  | 
String  | 
指定运行流数据处理任务所需的 Server 资源(单位默认为cu)。  | 
yarn  | 
可选  | 
JSONObject  | 
指定运行流数据处理任务所需的 Yarn 资源,JSON 体中包含以下参数: 
  | 
advanced  | 
可选  | 
JSONObject  | 
指定以集群模式运行流数据处理任务的高级参数。  | 
响应参数¶
名称  | 
数据类型  | 
描述  | 
|---|---|---|
data  | 
String  | 
运行成功返回空字符串。  | 
错误码¶
代码  | 
错误信息  | 
描述  | 
|---|---|---|
61100  | 
The Stream configuration JSON is invalid.  | 
流数据处理任务配置JSON不正确。  | 
61108  | 
Stream processing job does not exit.  | 
流数据处理任务不存在。  | 
61165  | 
Stream processing job with status [XX] cannot be started.  | 
流数据处理任务当前处于不能启动的状态。  | 
61121  | 
Another user in the organization is starting a stream processing job. Please try again later.  | 
同一组织同时只能触发一条流任务,请稍后重试。  | 
61128  | 
Your organization has not requested XX processing resource. Please request the resource and then start the jobs.  | 
该组织没有申请相应运行模式的资源。  | 
61126  | 
Allocated resource is not enough to start a stream processing job. Please check the resource configuration.  | 
流数据处理任务配置的运行资源已经超出配额,无法启动。  | 
示例¶
请求示例¶
以Standalone模式运行
url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
method: POST
requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
        }
}
以集群模式运行
url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start
method: POST
requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}
返回示例¶
{
  "code": 0,
  "msg": "OK",
}