
azkaban.project
azkaban-flow-version: 2.0
one.flow
nodes:
- name: jobOne
type: command
config:
command: echo "Hello world"
two.flow 依赖
nodes:
- name: jobA
type: command
config:
command: echo "jobA"
- name: jobB
type: command
config:
command: echo "jobB"
- name: jobC
type: command
dependsOn:
- jobA
- jobB
config:
command: echo "jobC"
three.flow 依赖 启动程序
nodes:
- name: jobA
type: command
config:
command: echo "jobA"
- name: jobB
type: command
config:
command: sh job.sh
- name: jobC
type: command
dependsOn:
- jobA
- jobB
config:
command: echo "jobC"
job.sh
#!/bin/bash echo "do job" echo "azkaban job" echo "azkaban shell sh" stop-all.sh start-all.sh hdfs dfsadmin -safemode leave nohup hive --service hiveserver2 & nohup hive --service metastore & zkServer.sh start zkServer.sh status start-hbase.sh echo "over"
four.flow 重试
nodes:
- name: joberror
type: command
config:
command: sh job.sh
retries: 3
retry.backoff: 5000
five.flow 条件
nodes:
- name: jobA
type: command
config:
command: sh jobA.sh
- name: jobB
type: command
dependsOn:
- jobA
config:
command: sh jobB.sh
condition: ${jobA:wk} == 2
jobA.sh
#!/bin/bash
echo "do jobA.sh"
wk=`date +%w`
echo "{"wk":$wk}" > $JOB_OUTPUT_PROP_FILE
jobB.sh
#!/bin/bash stop-hbase.sh zkServer.sh stop stop-all.sh
six.flow 预定义宏
nodes:
- name: starthadoopJob
type: command
config:
command: sh startHadoop.sh
- name: startZkJob
type: command
config:
command: sh startZK.sh
- name: startHbase
type: command
dependsOn:
- starthadoopJob
- startZkJob
config:
command: sh startHbase.sh
condition: all_success
startHadoop.sh
#!/bin/bash start-all.sh hdfs dfsadmin -safemode leave
startZK.sh 开启Hbase
#!/bin/bash zkServer.sh start zkServer.sh status
startHbase.sh
#!/bin/bash start-hbase.sh
seven.flow Hbase *** 作
nodes:
- name: hbaseOpjob
type: command
config:
command: hbase shell hbasedemo.sh
hbasedemo.sh
#!/bin/bash create_namespace 'aabbcc' create 'aabbcc:tb1','student','teacher' list_namespace list_namespace_tables 'aabbcc' exit
eight.flow
nodes:
- name: hiveDemoJob
type: command
config:
command: hive -f hivedemo.sql
hivedemo.sql
create database if not exists exam20220121; use exam20220121; -- drop table if exists student; create table if not exists student(id int, name string) row format delimited fields terminated by ','; load data inpath '/opt/azkaban/student.txt' into table student; -- drop table if exists student2; create table if not exists student2 stored as orc as select * from student;
student.txt
1,zs 2,ls 3,ww
ninejava.flow
nodes:
- name: javaJob
type: javaprocess
config:
java.class: cn.kgc.TestJavaProcess bigdata kb15
package cn.kgc;
public class TestJavaProcess {
public static void main(String[] args) {
System.out.println("azkaban java process");
System.out.println(args[0]);
System.out.println(args[1]);
}
}
tenspark.flow
nodes:
- name: javaJob
type: command
config:
command: spark-submit --class cn.kgc.TestSparkProcess ./azkabandemo-1.0-SNAPSHOT.jar
package cn.kgc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
public class TestSparkProcess {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("azkabanSparkDemo").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
Integer[] ints = {1,2,3,4,5};
JavaRDD rdd = sc.parallelize(Arrays.asList(ints));
List collect = rdd.collect();
for (Integer i : collect) {
System.out.println(i);
}
sc.close();
}
}
project 与 flow内容打包成zip, java程序还需要将jar包、project 与 flow内容一同打包成zip, 在azkaban网页创建主题,上传,执行 达到效果。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)