Start Pipeline

启动运行指定流数据处理任务。

前提条件

已通过流数据处理服务将流数据处理任务发布上线,并获取到流数据处理任务的ID。

请求格式

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

请求参数(URI)

名称 位置(Path/Query) 必需/可选 数据类型 描述
pipelineId Path 必需 String 流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。
orgId Query 必需 String 用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称 必需/可选 数据类型 描述
executionMode 必需 Integer 指定流数据处理任务的运行模式。0:Standalone模式;1:集群模式。
kafkaRate 必需 Integer 指定 Kafka 读取速率,单位为 record/s,默认值为 1000。
resourceConfig 可选 JSONObject 流数据任务运行资源配置。对Standalone模式,直接指定运行流数据处理任务所需的CU数量(单位默认为cu)。对集群模式,JSON体中包含的参数,详见 Resource Config

Resource Config

名称 必需/可选 数据类型 描述
server 可选 String 指定运行流数据处理任务所需的 Server 资源(单位默认为cu)。
yarn 可选 JSONObject

指定运行流数据处理任务所需的 Yarn 资源,JSON 体中包含以下参数:

  • workerCount:worker数量,例如:2。
  • master:master worker数量,例如:1(单位默认为cu)。
  • slave:slave worker数量,例如:2(单位默认为cu)。
advanced 可选 JSONObject 指定以集群模式运行流数据处理任务的高级参数。

响应参数

名称 数据类型 描述
data String 运行成功返回空字符串。

错误码

代码 错误信息 描述
61100 The Stream configuration JSON is invalid. 流数据处理任务配置JSON不正确。
61108 Stream processing job does not exit. 流数据处理任务不存在。
61165 Stream processing job with status [XX] cannot be started. 流数据处理任务当前处于不能启动的状态。
61121 Another user in the organization is starting a stream processing job. Please try again later. 同一组织同时只能触发一条流任务,请稍后重试。
61128 Your organization has not requested XX processing resource. Please request the resource and then start the jobs. 该组织没有申请相应运行模式的资源。
61126 Allocated resource is not enough to start a stream processing job. Please check the resource configuration. 流数据处理任务配置的运行资源已经超出配额,无法启动。

示例

请求示例

以Standalone模式运行


url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
        }
}


以集群模式运行


url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}


返回示例

{
  "code": 0,
  "msg": "OK",
}