Save Flow

保存当前任务流。

前提条件

用户必须为任务流的负责人。

请求格式

POST https://{apigw-address}/batch-processing-service/v2.1/flows

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

userId

Query

必需

String

用户ID。如何获取userId信息>>

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

action

Query

必需

String

固定值:save

请求参数(Body)

名称

必需/可选

数据类型

描述

id

必需

Integer

任务流ID。

type

必需

Integer

任务流调度类型(0:手动调度任务;1:周期调度任务;2:临时任务)。

name

必需

String

任务流名称。

freq

必需

String

任务流调度周期。调度中使用的是七位的Crontab,概括而言,Crontab可以指定某个事件在其指定的时间点被触发,比如:c1 (0 1 * * * ? *) 定义了事件在每个小时的1分0秒触发,c2 (59 59 23 * * ? *) 定义了事件在每天的23时59分59秒触发。有关Crontab的更多配置,详见 http://cron.qqe2.com/

cycle

必需

String

调度周期(M:月;W:周;D:天;H:小时;mi:分钟)。

alertMode

必需

Integer

告警模式(0:无, 1:仅邮件告警, 2:仅短信告警, 3:邮件与短信告警)。

doAs

必需

String

任务流所属组织的大数据账号。

owners

必需

String

任务流所有者(负责人)的用户名(多个owner之间以 ; 分开,例如owners=“userNameA;userNameB”)。

visitors

必需

String

可访问者的用户名(多个visitor之间以 ; 分开,例如visitors=“userNameA;userNameB”)。

startTime

可选

String

任务流生效日期(即开始调度日期)。

active

必需

Integer

任务流调度状态(0:暂停;1:运行)。

parameters

可选

List<Map<key,value>>

调度参数。作为统一配置的全局参数,可以在节点内使用这些参数,以使任务运行时能动态适配环境变化(参数需要以 key-value 的格式表达,例如:[{"key":"env","value":"product"},{"key":"task_id","value":"123456"}] )。

desc

可选

String

任务流描述信息。

queue

可选

String

计算队列。

tasks

必需

List<TaskSimpleInfo>

任务流中包含的任务节点的集合。集合中每个元素是一个TaskSimpleInfo结构体,该结构体包含了任务节点的简要信息。详见 TaskSimpleInfo结构体

flows

必需

List<FlowSimpleInfo>

与当前任务流相关联的其他任务流的集合。详见 FlowSimpleInfo结构体

relations

必需

List<Relation>

任务关连线集合,集合中每个元素表示任务流中两个任务具有上下游数据关系。详见 Relation结构体

响应参数

名称

数据类型

描述

data

JSONObject

包含被保存的任务流ID信息。详见 FlowID结构体

FlowID结构体

示例

{
    "flowId": 2841
}

参数

名称

数据类型

描述

flowId

Integer

任务流ID。

错误码

代码

错误信息

描述

62102

Invalid parameter / missing parameter

参数格式不正确

62105

Your account has been locked

用户账号被锁定,请联系管理员

有关其他错误码的描述,参见 通用错误码

示例

请求示例

url: https://{apigw-address}/batch-processing-service/v2.1/flows?action=save&userId={}&orgId={}

method: POST

requestBody:
{
    "id":"3381",
    "type":1,
    "name":"apimtest2",
    "freq":"0 0 0 * * ? *",
    "cycle":"D",
    "alertMode":3,
    "doAs":"your_bd_account",
    "parameters":"[]",
    "owners":"yourOwners",
    "desc":"e",
    "startTime":"2019-07-24",
    "visitors":"yourVisitors",
    "active":0,
    "queue":"",
    "tasks":[{
            "taskName":"tass",
            "taskId":"105760",
            "x":0.0086,
            "y":0.004700000381469727,
            "nodeId":"t_105760"
        }],
    "flows":[],
    "relations":[]
}

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "flowId": 2841
  }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONArray;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class SampleCode{
    public static class Request extends PoseidonRequest {
        public void setQueryParam(String key, Object value){
            queryParams().put(key, value);
        }
        public void setHeaderParam(String key, String value){
            headerParams().put(key, value);
        }
        public void setBodyParam(Map<String, Object> bodyPara){
            bodyParams().putAll(bodyPara);
        }
        public void setMethod(String method) {
            this.method = method;
        }
        private String method;
        public String baseUri() {
            return "";
        }
        public String method() {
            return method;
        }
    }

    @Test
    public void saveFlowTest(){
        //1.在EnOS Console的左边导航栏中点击应用注册。
        //2.点击需调用API的应用,查看基本信息中的AccessKey即为accessKey、SecretKey即为secretKey
        String accessKey = "AccessKey of your APP";
        String secretKey = "SecretKey of your APP";

        //新建一个request 然后把需要的参数传进去存在Query的map中,key是参数名字,value是参数值
        Request request = new Request();
        HashMap<String,Object> hashMap = new HashMap<String, Object>(2);
        hashMap.put("type",1);
        hashMap.put("name","testWorkflow");
        hashMap.put("cycle","D");
        hashMap.put("freq","0 0 0 * * ? *");
        hashMap.put("parameters","[]");
        hashMap.put("alertMode",3);
        hashMap.put("id","2515");
        hashMap.put("doAs","your_bd_account");
        hashMap.put("visitors","yourVisitors");
        hashMap.put("owners","yourOwners");
        hashMap.put("active",0);
        hashMap.put("queue","");
        hashMap.put("startTime","2019-07-25");
        JSONArray jsonArrayTasks = new JSONArray();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("taskName","tass");
        jsonObject.put("x",0.0132);
        jsonObject.put("y",0.008100000381469727);
        jsonObject.put("taskId","104575");
        jsonObject.put("nodeId","t_104575");
        jsonArrayTasks.add(jsonObject);
        hashMap.put("tasks",jsonArrayTasks);
        JSONArray jsonArray = new JSONArray();
        hashMap.put("relations",jsonArray);
        hashMap.put("flows",jsonArray);
        request.setBodyParam(hashMap);
        request.setMethod("POST");

        try {
            JSONObject response = Poseidon.config(PConfig.init().appKey(accessKey).appSecret(secretKey).debug())
                    .url("https://{apigw-address}/batch-processing-service/v2.1/flows")
                    .queryParam("orgId", "yourOrgId")
                    .queryParam("userId", "yourUserId")
                    .queryParam("action", "save")
                    .getResponse(request, JSONObject.class);

            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}