Start Pipeline

Start a specific stream processing pipeline.

Prerequisites

The stream processing pipeline is released online, and the pipeline ID is available.

Request Format

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

Request Parameters (URI)

Name

Location (Path/Query)

Mandatory/Optional

Data Type

Description

pipelineId

Path

Mandatory

String

The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.

orgId

Query

Mandatory

String

The organization ID. How to get the orgId>>

Request Parameters (Body)

Name

Mandatory/Optional

Data Type

Description

executionMode

Mandatory

Integer

Specify the running mode of the stream processing pipeline (0: Standalone Mode; 1: Cluster Mode).

kafkaRate

Mandatory

Integer

Specify the Kafka data read rate, by the unit of record/s. Default is 1000.

resourceConfig

Optional

JSONObject

Specify the resources for running the pipeline. For Standalone Mode, specify the amount of CUs required by the pipeline directly (for example, 2). For Cluster Mode, specify the resourses in JSON format. For details, see Resource Config

Resource Config

Name

Mandatory/Optional

Data Type

Description

server

Optional

String

Specify the Server resource for running the pipeline, for example 1 (with cu as the unit by default).

yarn

Optional

JSONObject

Specify the Yarn resource for running the pipeline. The JSON body contains the following parameters:

  • workerCount: Count of workers, for example: 2.

  • master:Count of master workers, for example: 1 (with cu as the unit by default).

  • slave:Count of slave workers, for example: 2 (with cu as the unit by default).

advanced

Optional

JSONObject

Specify the advanced parameters for running the pipeline by Cluster Mode.

Response Parameters

Name

Data Type

Description

data

String

Returns an empty string upon success.

Error Code

Code

Error Information

Description

61100

The Stream configuration JSON is invalid.

The pipeline configuration JSON is not valid. Please check the syntax.

61108

stream processing pipeline does not exit.

Stream processing pipeline does not exist. Please check the pipeline ID.

61165

Stream processing job with status [XX] cannot be started.

The stream processing job is currently in a state that cannot be started.

61121

Another user in the organization is starting a stream processing job. Please try again later.

Only one stream processing job could be triggered in the same OU. Try again later.

61128

Your organization has not requested XX processing resource. Please request the resource and then start the jobs.

The resources of corresponding running mode have not been applied in this OU.

61126

Allocated resource is not enough to start a stream processing job. Please check the resource configuration.

The running resources configured for the stream processing job have exceeded the quota and the stream processing job cannot be started.

Sample

Request Sample

For Standalone Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
    }
}

For Cluster Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}

Return Sample

{
  "code": 0,
  "msg": "OK",
}