Export Flow

导出任务流到本地。

前提条件

用户必须属于目标任务流所属的OU。

请求格式

GET https://{apigw-address}/dataflow-batch-service/v2.0/flows

请求参数(URI)

名称 位置(Path/Query) 必需/可选 数据类型 描述
flowId Query 必需 Integer 任务流ID。
userId Query 必需 String 用户ID。如何获取userId信息>>
orgId Query 必需 String 用户所属的组织ID。如何获取orgId信息>>
action Query 必需 String 固定值:export

响应参数

名称 数据类型 描述
data List<JSONObject> 包含任务流的详细信息。详见 Flow结构体

Flow结构体

示例

{
    "name": "workflow1",
    "cycle": "D",
    "cron": "0 0 0 * * ? *",
    "parameters": "[]",
    "alertMode": 3,
    "alertTo": "",
    "appId": "",
    "submitter": "yourSubmitter",
    "owners": "yourOwners",
    "visitors": "yourVisitors",
    "type": 1,
    "syncType": 1,
    "desc": "",
    "startTime": "2019-07-25",
    "tasks": [
      {
        "name": "tass",
        "resource": "default",
        "type": "COMMAND",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
        "syncType": 1,
        "cmd": "echo "hello"",
        "submitter": "yourSubmitter",
        "filePackage": "",
        "cron": "",
        "priorityLevel": 0,
        "timeout": 300,
        "retryLimit": 3,
        "retryInterval": 0,
        "successCode": "0",
        "waitCode": "",
        "asLink": false
      }
    ],
    "flowLinks": [],
    "taskLinks": [],
    "relations": [],
    "linkRelations": []
}

参数

名称 数据类型 描述
name String 任务流名称。
cycle String 调度周期(M:月;W:周;D:天;H:小时;mi:分钟)。
cron String 任务流调度周期。调度中使用的是七位的Crontab,概括而言,Crontab可以指定某个事件在其指定的时间点被触发,比如:c1 (0 1 * * * ? *) 定义了事件在每个小时的1分0秒触发,c2 (59 59 23 * * ? *) 定义了事件在每天的23时59分59秒触发。有关Crontab的更多配置,详见 http://cron.qqe2.com/
parameters List<Map<key,value>> 调度参数。作为统一配置的全局参数,可以在节点内使用这些参数,以使任务运行时能动态适配环境变化(参数需要以 key-value 的格式表达,例如:[{"key":"env","value":"product"},{"key":"task_id","value":"123456"}] )。
alertMode Integer 告警模式(0:无, 1:仅邮件告警, 2:仅短信告警, 3:邮件与短信告警)。
alertTo String 告警对象。
appId String N/A,通常为空字符串。
submitter String 任务流提交账号。
owners String 任务流所有者(负责人)的用户名(多个owner之间以 ; 分开,例如owners=“userNameA;userNameB”)。
visitors String 可访问者的用户名(多个visitor之间以 ; 分开,例如visitors=“userNameA;userNameB”)。
type Integer 任务流调度类型(0:手动调度任务;1:周期调度任务;2:临时任务)。
syncType Integer 同步类型(0:文件同步;1:数据同步)。
desc String 任务流描述信息。
startTime String 任务流生效日期(即开始调度日期)。
tasks List<Task> 任务节点集合,集合中每个元素表示任务流中的一个任务,详见 Task结构体
flowLinks List<FlowLink> 任务流依赖集合。集合中每个元素表示当前任务流依赖于某个源任务流(通过 FlowLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。
taskLinks List<TaskLink> 任务节点依赖集合。集合中每个元素表示当前任务流中某个任务依赖于某个源任务(通过 TaskLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。
relations List<Relation> 关连线集合。Relation结构体 的集合,Relation表示两个任务间具有上下游依赖关系。
linkRelations List<LinkRelation> 连接关系集合,详见 LinkRelation结构体

Task结构体

示例

{
    "name": "task",
    "resource": "default",
    "type": "DATA_INTEGRATION",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
    "syncType": 1,
    "cmd": "echo "hello"",
    "submitter": "yourSubmitter",
    "filePackage": "",
    "cron": "",
    "priorityLevel": 0,
    "timeout": 300,
    "retryLimit": 3,
    "retryInterval": 0,
    "successCode": "0",
    "waitCode": "",
    "asLink": false
 }

参数

名称 数据类型 描述
name String 任务名称。
resource String 任务资源。
type String 任务类型(返回值可为:DATA_INTEGRATION,WORMHOLE,CALCULATE,SHELL,CANAAN,HIVE,MR)。
syncType Integer 同步类型(0:文件同步;1:数据同步)。
cmd String 命令行命令。
submitter String 任务提交者。
filePackage String 文件位置。
cron String 具体调度时间。
priorityLevel Integer 优先级别。
timeout Integer 超时时间。
retryLimit Integer 重试次数。
retryInterval Integer 重试时间间隔。
successCode String 成功返回值。
waitCode String N/A,通常为空字符串。
asLink Boolean 是否与其他任务具有依赖关系。
runMode String 任务运行模式,详见 RunMode结构体

RunMode结构体

示例

{
    "taskMode": 1,
    "cpu": 0.5,
    "memory": 1,
    "maxParallel": 0,
    "keyType": 0,
    "datasourceId": 0,
    "path": "",
    "content": ""
}

参数

名称 数据类型 描述
taskMode Integer 任务运行模式(1:单任务;2:多任务)。
cpu Float 每个任务(单任务就是该任务本身,多任务是每个子任务)运行时需要的 CPU(单位:core,最小0.1,最大2)。
memory Float 每个任务运行时需要的 Memory(单位:G,最小0.3,最大4)。
maxParallel Integer 多任务模式下,允许同时并发执行的最大子任务数。
keyType Integer 多任务模式下,分布键的来源(1:外部文件;2:自定义,通过content字段设置)。
datasourceId Integer 分布键来源为外部文件时,连接外部文件所在数据源的数据源ID(通过 数据源注册 服务注册并获取ID)。
path String 分布键来源为外部文件时,分布键文件在外部数据源中的路径。
content String 分布键来源为自定义时,分布键的内容。

Relation结构体

示例

{
    "sourceTaskName": "tass",
    "targetTaskName": "rf",
    "rerun": true
}

参数

名称 数据类型 描述
sourceTaskName String 上游任务名称。
targetTaskName String 下游任务名称。
rerun Boolean true和false仅在任务级联重跑时生效。true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

LinkRelation结构体

示例

{
    "linkId": "0",
    "targetTaskName": "tass",
    "rerun": false
}

参数

名称 数据类型 描述
linkId String 连接ID。
targetTaskName String 下游任务名称。
rerun Boolean true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

错误码

代码 错误信息 描述
62102 Flow validation exception 请求参数格式不正确
62109 Server internal exception 服务器内部异常

有关其他错误码的描述,参见 通用错误码

示例

请求示例

url: https://{apigw-address}/dataflow-batch-service/v2.0/flows?action=export&flowId={}&userId={}&orgId={}

method: GET

返回示例

{
    "status": 0,
    "msg": " Success",
    "data": {
        "name": "clone",
        "cycle": "D",
        "cron": "0 0 0 * * ? *",
        "parameters": "[{\"key\":\"REPLACE\",\"value\":\"lili1\"}]",
        "submitter": "yourSubmitter",
        "owners": "yourOwners",
        "visitors": "yourVisitors",
        "type": 1,
        "desc": "",
        "tasks": [
            {
                "name": "tass",
                "resource": "default",
                "type": "DATA_INTEGRATION",
                "cmd": "echo `whoami`",
                "submitter": "yourSubmitter",
                "filePackage": "",
                "cron": "",
                "priorityLevel": 0,
                "timeout": 300,
                "retryLimit": 3,
                "retryInterval": 0,
                "successCode": "0",
                "waitCode": "",
                "asLink": true,
                "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
                "syncType": 1
            }
        ],
        "relations": [],
        "startTime": "2019-11-22",
        "flowLinks": [],
        "syncType": 1,
        "linkRelations": [],
        "alertTo": "",
        "alertMode": 3,
        "taskLinks": [],
        "appId": ""
    }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class SampleCode{
    public static class Request extends PoseidonRequest {
        public void setQueryParam(String key, Object value){
            queryParams().put(key, value);
        }
        public void setHeaderParam(String key, String value){
            headerParams().put(key, value);
        }

        public void setBodyParam(Map<String, Object> bodyPara){
            bodyParams().putAll(bodyPara);
        }
        public void setMethod(String method) {
            this.method = method;
        }
        private String method;
        public String baseUri() {
            return "";
        }
        public String method() {
            return method;
        }
    }

    @Test
    public void exportFlowTest(){
        //1.在EnOS Console的左边导航栏中点击应用注册。
        //2.点击需调用API的应用,查看基本信息中的AccessKey即为accessKey、SecretKey即为secretKey
        String accessKey = "AccessKey of your APP";
        String secretKey = "SecretKey of your APP";

        //新建一个request 然后把需要的参数传进去存在Query的map中,key是参数名字,value是参数值
        Request request = new Request();
        request.setMethod("GET");

        try {
            JSONObject response = Poseidon.config(PConfig.init().appKey(accessKey).appSecret(secretKey).debug())
                    .url("https://{apigw-address}/dataflow-batch-service/v2.0/flows")
                    .queryParam("orgId", "yourOrgId")
                    .queryParam("userId", "yourUserId")
                    .queryParam("flowId", "36")
                    .queryParam("action", "export")
                    .getResponse(request, JSONObject.class);

            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}