
官网: https://flink.apache.org/
1. Flink简介Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink是一个框架和分布式处理引擎,用于对无边界和有边界的数据流进行有状态的计算。Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
1.2 为什么选择Flink流数据 源源不断的数据
目标: 低延迟 高吞吐 准确性 容错性
1.3 Flink特点事件驱动
有界流 使用DataSet
无界流 使用 DataStreamAPI
分层API
支持事件时间和处理时间
精确一次的状态一致性保证
低延迟 每秒百万个事件 毫秒级延迟
高可用
与众多常用存储系统的链接
1.4 Flink VS Spark Streaming流处理 vs 微批处理
- 数据模型运行时架构
可以使用webui界面部署
也可以使用shell命令
#启动命令 /bin/start-cluster.sh #停止 /bin/stop-cluster.sh #提交任务 /bin/flink run -c [指定启动类] -p [并行度] [要提交的jar包地址] [指定jvm参数] #查看当前所有作业 /bin/flink list #取消作业 /bin/flink cancel [jobId]2.2. Yarn
需要hadoop集群
没有安装条件 略
2.3 k8s略
3. Flink 运行架构 3.1 运行时组件 3.1.1 作业管理器JobManager 3.1.2 任务管理器TaskManager 3.1.3 资源管理器ResourceManager 3.1.4 分发器Dispatcher 3.2 任务提交流程 3.3. 任务调度原理 3.4 Slot并行度: 一个特定算子的子任务的个数称为其并行度
一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度
Slots 是指Flink计算中执行一个线程所需要资源(CPU,内存)的最小单元
所以Slot的数量一般设置为TaskManager(JVM)的核心数
Slot 有分组的概念
如果是不同的组,必须使用不同的Slot
3.5 程序与数据流DataFlowFlinke程序分为三大块: Source transform sink
数据传输的形式:
- One-to-one 必须是同共享组,并行度也相同的情况下才会One-to-oneRedistributing 重新分区 *** 作, 当并行度不一样时会进行重新分区轮询 *** 作
流处理过程
Environment => source => transform => sink
4.1 Environment执行环境
//流处理执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 批处理执行环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); //创建本地执行环境 ExecutionEnvironment.createLocalEnvironment([并行度]); //创建远程执行环境 ExecutionEnvironment.createRemoteEnvironment(host,port,jar包地址 );4.2 Source
Flink可以从不同数据源读取数据
4.2.1 从集合和元素中读取数据API executionEnvironment.fromCollection(list);
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度1
executionEnvironment.setParallelism(1);
// 创造集合数据
List list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));
}
// 从集合中收集数据
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);
// 打印集合数据
sensorReadingDataStreamSource.print("sensor");
// 从元素中收集数据
DataStreamSource integerDataStreamSource = executionEnvironment.fromElements(1, 2, 3, 4, 56, 7);
// 打印从元素中收集到数据
integerDataStreamSource.print("element");
// 执行Flink程序
executionEnvironment.execute();
}
4.2.2 从文件中读取数据
API executionEnvironment.readTextFile(inputPath);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
String inputPath = "E:\张尧\idea项目\FlinkTutorial\src\main\resources\word.txt";
DataStreamSource dataStreamSource = executionEnvironment.readTextFile(inputPath);
SingleOutputStreamOperator> sum = dataStreamSource.flatMap(new WorkCount.MyFlagMapFunction()).keyBy(0).sum(1);
sum.print();
executionEnvironment.execute();
}
4.2.3 从Kafka中读取数据
4.2.3.1 kafka配置
下载kafka 1.0.0版本以上
需要配置kafka的监听地址(本机除外)
修改config/server.properties
advertised.listeners=PLAINTEXT://192.168.164.205:9092
#启动kafka bin目录下 #启动zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties #启动kafka ./kafka-server-start.sh config/server.properties
package com.zy.flink.source;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSourceTest {
public static void main(String[] args) throws Exception {
// 创建kafka连接配置信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.164.205:9092");
// properties.setProperty("group.id", "")
// 创建流处理执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
// 从kafka从获取数据
DataStreamSource dataStreamSource = executionEnvironment.addSource(new FlinkKafkaConsumer("sourcetest",
new SimpleStringSchema(), properties));
dataStreamSource.print();
executionEnvironment.execute();
}
}
4.2.4 自定义数据源
package com.zy.Flink.source;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class UDFSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.addSource(new MySensorSource());
sensorReadingDataStreamSource.print();
executionEnvironment.execute();
}
public static class MySensorSource implements SourceFunction{
//定义属性控制数据的生成
private Boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
//定义传感器集合
HashMap map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put("sensor"+i, 60 + ThreadLocalRandom.current().nextGaussian() * 20);
}
while (running){
for (String s : map.keySet()) {
sourceContext.collect(new SensorReading(s, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(),map.get(s)+ThreadLocalRandom.current().nextGaussian()));
}
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
4.3 Transform
转换算子
4.3.1 基本转换算子map flatMap filter 这三个是基本转换算子
package com.zy.Flink.transform;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class TransormTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
// 创造集合数据
List list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));
}
// 使用集合收集数据
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);
// map转换 返回sensorReading的sensorId
SingleOutputStreamOperator
4.3.2 聚合算子
keyBy 滚动聚合算子(Rolling Aggregation min() max() sum() minBy() maxBy()) Reduce是聚合类的算子
package com.zy.Flink.transform;
import com.zy.Flink.TestUtil;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransormTest3 {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
executionEnvironment.setParallelism(1);
// 从集合收集数据
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());
// 根据id keyBy 取最大温度
SingleOutputStreamOperator max = sensorReadingDataStreamSource.keyBy("id").maxBy("tmpperature");
// 输出
max.print();
// 执行
executionEnvironment.execute();
}
}
聚合算子 min() 与 minBy() 的区别: min 只有聚合的字段是最小的,其他字段还是第一次收集 到的数据
minBy()是最小的聚合字段对应的数据
4.3.3 Reducepackage com.zy.Flink.transform;
import com.zy.Flink.TestUtil;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransormTest4_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());
KeyedStream keyedStream = sensorReadingDataStreamSource.keyBy(new KeySelector() {
@Override
public Object getKey(SensorReading value) throws Exception {
return value.getId();
}
});
SingleOutputStreamOperator reduce = keyedStream.reduce((value1, value2) -> {
return new SensorReading(value1.getId(), System.currentTimeMillis(), Math.max(value1.getTmpperature(), value2.getTmpperature()));
});
reduce.print();
executionEnvironment.execute();
}
}
4.3.4 多流转换
split(1.12移除)
connect map 只能合并两条流
union 合并多条流
4.3.5 数据类型Flink支持的数据类型
- 支持Java和scale的所有基本数据类型(包括包装类)Java 和 Scale 元组Scale样例类 ?Java 简单对象 (空参构造)Java 集合 枚举
- 函数类匿名函数富函数
打乱分区顺序,重新分区
4.3.7.2 keyby根据hash计算出分区,相同的key一定在同一分区(同一分区的key不一定相同)
4.3.7.3 global当前的所有流发送到下一分区(同一个分区)
package com.zy.flink.transform;
import com.zy.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransfromTest6_partition {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 要测试充分区 *** 作,就不能设置并行度为1
executionEnvironment.setParallelism(4);
// 读取文件数据
SingleOutputStreamOperator dataStreamSource = executionEnvironment.readTextFile("E" +
":\张尧\idea项目\tl\Flink_learn\learn_feature\src\main\resources" +
"\sensorReading.txt").map(line -> {
String[] split = line.split(",");
return new SensorReading(split[0],new Long(split[1]),new Double(split[2]));
});
//输出源流
dataStreamSource.print("input");
// shuffle
dataStreamSource.shuffle().print("shuffle");
// keyBy
dataStreamSource.keyBy("id").print("keyBy");
// global
dataStreamSource.global().print("global");
// 执行作业
executionEnvironment.execute();
}
}
4.4 Sink
4.4.1 写入kafka
package com.zy.flink.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Properties;
public class KafkaSink {
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
// 从kafka获取消息
// 创建kafka配置信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.164.205:9092");
// 消费消息
DataStreamSource sourcetest =
executionEnvironment.addSource(new FlinkKafkaConsumer("sourcetest",
new SimpleStringSchema(), properties));
// 写入kafka
DataStreamSink sinktest = sourcetest.addSink(new FlinkKafkaProducer("192.168.164.205:9092", "sinktest", new SimpleStringSchema()));
// 执行作业
executionEnvironment.execute();
}
}
需要依赖kafka connector连接器
org.apache.flink
flink-connector-kafka_${flinks-clients.suffix.version}
4.4.2 写入redis
2.11 1.0 org.apache.bahir flink-connector-redis_${flinks-clients.suffix.version}${flinks-redis.version}
package com.zy.flink.sink;
import com.zy.flink.TestUtil;
import com.zy.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class RedisSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());
// 创建jedis配置环境
FlinkJedisPoolConfig flinkJedisPoolConfig =
new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
DataStreamSink test = sensorReadingDataStreamSource.addSink(new RedisSink(flinkJedisPoolConfig,
new RedisMapper() {
// 创建执行方法的描述
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "test");
}
@Override
public String getKeyFromData(SensorReading sensorReading) {
return sensorReading.getId();
}
@Override
public String getValueFromData(SensorReading sensorReading) {
return sensorReading.getTmpperature().toString();
}
}));
executionEnvironment.execute();
}
}
4.4.3 写入es
4.4.4 写入jdbc
就是写入数据库
5. window 5.1 window 概念窗口
窗口就是无界流切割为有限流的一种方式
5.2 window 类型- 时间窗口
- 滚动时间窗口 Tumbling Windows
- 时间对齐,窗口长度固定,一个数据只属于一个窗口
- 滑动窗口有步长,一个数据可以存在多个窗口
- 时间无对齐
- 滚动计数窗口滑动计数窗口
window()
timeWindow()
countWindow()
都是开窗 *** 作
5.3.2 window Function当开窗之后,需要使用window Function进行聚合 *** 作
- 增量聚合函数 bucket中只存储一个sum的结果,每来一条数据就计算一次
- ReduceFunctionAggregateFunction…
- ProcessWindowFunctionWindowFunction…
package com.zy.flink.window;
import com.zy.flink.entity.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowApiTest1_TimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());
SingleOutputStreamOperator dataStreamSource = executionEnvironment.socketTextStream(
"192.168.164.205", 8888).map(line -> {
String[] splits = line.split(",");
return new SensorReading(splits[0], new Long(splits[1]), new Double(splits[2]));
});
SingleOutputStreamOperator resultStream = dataStreamSource.keyBy("id")
.timeWindow(Time.seconds(10))
.aggregate(new AggregateFunction() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading s, Integer integer) {
return integer + 1;
}
@Override
public Integer getResult(Integer integer) {
return integer;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return integer + acc1;
}
});
resultStream.print();
executionEnvironment.execute();
}
}
5.3.3 可选API
- trigger 触发器
- 定义window的关闭时间,什么时候触发计算输出结果
- 定义移除某些数据的逻辑
- 允许迟到数据
- 将迟到数据放入侧输出流
- 获取测输出流
分为三个时间
- event Time 事件本身的时间,业务数据产生自身的时间Ingestion Time 数据进入Flink的时间Process Time flink *** 作算子进行计算的计算时间
设置eventTime
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);6.3 水位线 watemark
水位线用来处理乱序数据的延迟到达,当数据由于网络或分布式导致到达时间不是顺序时,要使用水位线平衡延迟
6.4 水位线的传递,引入和设定欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)