
在公司中,遇到这样一个业务,需要将数据库A从oracle迁移到pg数据库,原本让实习生去实现了这样的一个工具,但是最后他写出来的工具存在较大问题。
正好最近在学习spark、flink等流式处理框架,那么我们就用flink来处理这样一个需求吧:
1、主类:package com.ogj.flink;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
public class DbMove {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource dataSource = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(DBContent.SourceDB.url)
.setDrivername(DBContent.PGDRIVER)
.setUsername(DBContent.SourceDB.username)
.setPassword(DBContent.SourceDB.password)
.setQuery("select task_name,file_path from cloud_task")
.setRowTypeInfo(
new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
)
)
.finish()
);
dataSource.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDBUrl(DBContent.DstDB.url)
.setDrivername(DBContent.PGDRIVER)
.setUsername(DBContent.DstDB.username)
.setPassword(DBContent.DstDB.password)
.setQuery("insert into test(task_name,file_path) values(?,?)")
.finish()
);
env.execute("db move");
System.out.println("写入数据中");
TimeUnit.SECONDS.sleep(5);
//查询出来
DataSource read = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(DBContent.DstDB.url)
.setDrivername(DBContent.PGDRIVER)
.setUsername(DBContent.DstDB.username)
.setPassword(DBContent.DstDB.password)
.setQuery("select task_name,file_path from test")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))
.finish()
);
System.out.println("read dst dataSource");
read.print();
System.out.println("=========end==========");
}
}
2、数据库配置类
package com.ogj.flink;
public class DBContent {
public static final String MYSQLDRIVER = "com.mysql.jdbc.Driver";
public static final String PGDRIVER = "org.postgresql.Driver";
public static class SourceDB {
public static String url = "jdbc:postgresql://127.0.0.1:5432/dmt_url?currentSchema=schema_name";
public static String username = "postgres";
public static String password = "123456";
}
public static class DstDB {
public static String url = "jdbc:postgresql://127.0.0.1:5432/RequestMonitor";
public static String username = "postgres";
public static String password = "123456";
}
}
3、maven打包配置:
net.alchim31.maven scala-maven-plugin3.2.2 compile testCompile org.apache.maven.plugins maven-compiler-plugin3.0 8 8 UTF-8 compile compile org.apache.maven.plugins maven-shade-plugin2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA reference.conf cn.itcast.rpc.Master
运行完毕就是这样的啦:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)