
Flink之Mysql数据
JDBC Connector <= 官方目前没有专门针对MySQL的,我们自己实现就好了
这里测试的是连接MySQL。
pom依赖(我本地docker里的mysql是8.0.19版本的)
mysql mysql-connector-java8.0.26
启动mysql服务(我集群启动mysql服务)
新建数据库
CREATE DATAbase `test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
新建schema
CREATE TABLE `sensor_temp` ( `id` varchar(255) NOT NULL, `temp` double NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
编写java代码
package com.zch.apitest.sink;
import com.zch.apitest.beans.SensorReading;
import com.zch.apitest.source.SourceTest4_自定义;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkTest3_JDBC {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// // 读取文件
// DataStream inputStream = env.readTextFile("F:\JAVA\bigdata2107\zch\flink\src\main\resources\Sensor.txt");
//
// SingleOutputStreamOperator dataStream = inputStream.map(lines -> {
// String[] split = lines.split(",");
// return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
// });
// 使用之前编写的随机变动温度的SourceFunction来生成数据
DataStream dataStream = env.addSource(new SourceTest4_自定义.MySensorSource());
// 自定义sink到JDBC
dataStream.addSink(new MyjdbcSink());
env.execute();
}
// 实现自定义的SinkFunction
public static class MyjdbcSink extends RichSinkFunction{
// 声明连接和预编译语句
Connection connection = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://zhaohui01:3306/test","root","123456");
insertStmt = connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行插入,如果没有更新成功,就插入
updateStmt.setDouble(1,value.getTemperature());
updateStmt.setString(2,value.getId());
updateStmt.execute();
if (updateStmt.getUpdateCount() <= 0){
insertStmt.setString(1,value.getId());
insertStmt.setDouble(2,value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
updateStmt.close();
insertStmt.close();
connection.close();
}
}
}
输出结果
运行Flink程序,查看MySQL数据(可以看到MySQL里的数据一直在变动)
mysql> SELECT * FROM sensor_temp; +-----------+--------------------+ | id | temp | +-----------+--------------------+ | sensor_3 | 20.489172407885917 | | sensor_10 | 73.01289164711463 | | sensor_4 | 43.402500895809744 | | sensor_1 | 6.894772325662007 | | sensor_2 | 101.79309911751122 | | sensor_7 | 63.070612021580324 | | sensor_8 | 63.82606628090501 | | sensor_5 | 57.67115738487047 | | sensor_6 | 50.84442627975055 | | sensor_9 | 52.58400793021675 | +-----------+--------------------+ 10 rows in set (0.00 sec) mysql> SELECt * FROM sensor_temp; +-----------+--------------------+ | id | temp | +-----------+--------------------+ | sensor_3 | 19.498209543035923 | | sensor_10 | 71.92981963197121 | | sensor_4 | 43.566017489470426 | | sensor_1 | 6.378208186786803 | | sensor_2 | 101.71010087830145 | | sensor_7 | 62.11402602179431 | | sensor_8 | 64.33196455020062 | | sensor_5 | 56.39071692662006 | | sensor_6 | 48.952784757264894 | | sensor_9 | 52.078086096436685 | +-----------+--------------------+ 10 rows in set (0.00 sec)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)