Start Pipeline

启动运行指定流数据处理任务。

前提条件

已通过流数据处理服务将流数据处理任务发布上线,并获取到流数据处理任务的ID。

请求格式

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

pipelineId

Path

必需

String

流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称

必需/可选

数据类型

描述

executionMode

必需

Integer

指定流数据处理任务的运行模式。0:Standalone模式;1:集群模式。

kafkaRate

必需

Integer

指定 Kafka 读取速率,单位为 record/s,默认值为 1000。

resourceConfig

可选

JSONObject

流数据任务运行资源配置。对Standalone模式,直接指定运行流数据处理任务所需的CU数量(单位默认为cu)。对集群模式,JSON体中包含的参数,详见 Resource Config

Resource Config

名称

必需/可选

数据类型

描述

server

可选

String

指定运行流数据处理任务所需的 Server 资源(单位默认为cu)。

yarn

可选

JSONObject

指定运行流数据处理任务所需的 Yarn 资源,JSON 体中包含以下参数:

  • workerCount:worker数量,例如:2。

  • master:master worker数量,例如:1(单位默认为cu)。

  • slave:slave worker数量,例如:2(单位默认为cu)。

advanced

可选

JSONObject

指定以集群模式运行流数据处理任务的高级参数。

响应参数

名称

数据类型

描述

data

String

运行成功返回空字符串。

错误码

代码

错误信息

描述

61100

The Stream configuration JSON is invalid.

流数据处理任务配置JSON不正确。

61108

Stream processing job does not exit.

流数据处理任务不存在。

61165

Stream processing job with status [XX] cannot be started.

流数据处理任务当前处于不能启动的状态。

61121

Another user in the organization is starting a stream processing job. Please try again later.

同一组织同时只能触发一条流任务,请稍后重试。

61128

Your organization has not requested XX processing resource. Please request the resource and then start the jobs.

该组织没有申请相应运行模式的资源。

61126

Allocated resource is not enough to start a stream processing job. Please check the resource configuration.

流数据处理任务配置的运行资源已经超出配额,无法启动。

示例

请求示例

以Standalone模式运行


url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
        }
}


以集群模式运行


url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}


返回示例

{
  "code": 0,
  "msg": "OK",
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void StartPipeline() {
        Request request = new Request();
        request.setBodyParams("executionMode", "0");
        request.setBodyParams("kafkaRate", "1000");
        Map<String, String> resourceConfig=new HashMap<>();
        resourceConfig.put("server","1");

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "start")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}