Start Pipeline

Start a specific stream processing pipeline.

Prerequisites

The stream processing pipeline is released online, and the pipeline ID is available.

Request Format

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

Request Parameters (URI)

Name Location (Path/Query) Mandatory/Optional Data Type Description
pipelineId Path Mandatory String The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.
orgId Query Mandatory String The organization ID. How to get the orgId>>

Request Parameters (Body)

Name Mandatory/Optional Data Type Description
executionMode Mandatory Integer Specify the running mode of the stream processing pipeline (0: Standalone Mode; 1: Cluster Mode).
kafkaRate Mandatory Integer Specify the Kafka data read rate, by the unit of record/s. Default is 1000.
resourceConfig Optional JSONObject Specify the resources for running the pipeline. For Standalone Mode, specify the amount of CUs required by the pipeline directly (for example, 2). For Cluster Mode, specify the resourses in JSON format. For details, see Resource Config

Resource Config

Name Mandatory/Optional Data Type Description
server Optional String Specify the Server resource for running the pipeline, for example 1 (with cu as the unit by default).
yarn Optional JSONObject

Specify the Yarn resource for running the pipeline. The JSON body contains the following parameters:

  • workerCount: Count of workers, for example: 2.
  • master:Count of master workers, for example: 1 (with cu as the unit by default).
  • slave:Count of slave workers, for example: 2 (with cu as the unit by default).
advanced Optional JSONObject Specify the advanced parameters for running the pipeline by Cluster Mode.

Response Parameters

Name Data Type Description
data String Returns an empty string upon success.

Error Code

Code Error Information Description
61100 The Stream configuration JSON is invalid. The pipeline configuration JSON is not valid. Please check the syntax.
61108 stream processing pipeline does not exit. Stream processing pipeline does not exist. Please check the pipeline ID.
61165 Stream processing job with status [XX] cannot be started. The stream processing job is currently in a state that cannot be started.
61121 Another user in the organization is starting a stream processing job. Please try again later. Only one stream processing job could be triggered in the same OU. Try again later.
61128 Your organization has not requested XX processing resource. Please request the resource and then start the jobs. The resources of corresponding running mode have not been applied in this OU.
61126 Allocated resource is not enough to start a stream processing job. Please check the resource configuration. The running resources configured for the stream processing job have exceeded the quota and the stream processing job cannot be started.

Sample

Request Sample

For Standalone Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
    }
}

For Cluster Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}

Return Sample

{
  "code": 0,
  "msg": "OK",
}

SDK Samples


You can access the SDK samples for stream processing service on GitHub: