
文章目录
- Flink快速应用
- 第 1 节 单词统计案例(批数据)
- 1.1 需求
- 1.2 代码实现
- Java程序
- scala程序
- 第 2 节 单词统计案例(流数据)
- 2.1 需求
- 2.2 代码实现
- scala程序
- java程序
通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)
第 1 节 单词统计案例(批数据) 1.1 需求统计一个文件中各个单词出现的次数,把统计结果输出到文件
步骤:
1、读取数据源
2、处理数据源
a、将读到的数据源文件中的每一行根据空格切分
b、将切分好的每个单词拼接1 c、根据单词聚合(将相同的单词放在一起)
d、累加相同的单词(单词后面的1进行累加)
3、保存处理结果
引入依赖pom.xml
Java程序org.apache.flink flink-java1.11.1 org.apache.flink flink-streaming-java_2.121.11.1 org.apache.flink flink-clients_2.121.11.1 org.apache.flink flink-scala_2.121.11.1 org.apache.flink flink-streaming-scala_2.121.11.1
package com.lagou;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountJavaBatch {
public static void main(String[] args) throws Exception {
String inputPath="D:\data\input\hello.txt";
String outputPath="D:\data\output";
//获取flink的运行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSet text = executionEnvironment.readTextFile(inputPath);
FlatMapOperator> wordondOnes = text.flatMap(new SplitClz());
//(hello 1) (you 1) (hi 1) (him 1)
UnsortedGrouping> groupedWordAndOne = wordOndOnes.groupBy(0);
//(hello 1) (hello 1)
AggregateOperator> out = groupedWordAndOne.sum(1);//1代表第1个元素
out.writeAsCsv(outputPath, "n", " ").setParallelism(1);//设置并行度
executionEnvironment.execute();//人为调用执行方法
}
static class SplitClz implements FlatMapFunction>{
@Override
public void flatMap(String s, Collector> collector) throws Exception {
String[] s1 = s.split(" ");
for (String word:s1) {
collector.collect(new Tuple2(word,1));//发送到下游
}
}
}
}
原文件
输出文件
import org.apache.flink.api.scala._
object WordCountScalaBatch {
def main(args: Array[String]): Unit = {
val inputPath = "D:\data\input\hello.txt"
val outputPath = "D:\data\output"
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = environment.readTextFile(inputPath)
val out: AggregateDataSet[(String, Int)] = text.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
out.writeAsCsv(outputPath,"n", " ").setParallelism(1)
environment.execute("scala batch process")
}
}
第 2 节 单词统计案例(流数据)
2.1 需求
Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。
2.2 代码实现 scala程序import org.apache.flink.streaming.api.scala._
object WordCountScalaStream {
def main(args: Array[String]): Unit = {
//处理流式数据
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //拿到运行环境
val streamData: DataStream[String] = environment.socketTextStream("hadoop102", 7777)
val out = streamData.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
out.print()
environment.execute()
}
}
java程序
package com.lagou;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJavaStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = executionEnvironment.socketTextStream("linux121", 7777);
SingleOutputStreamOperator> sum = dataStream.flatMap(new FlatMapFunction>() {
public void flatMap(String s, Collector> collector) throws Exception {
for (String word : s.split(" ")) {
collector.collect(new Tuple2(word, 1));
}
}
}).keyBy(0).sum(1);
sum.print();
executionEnvironment.execute();
}
}
运行之前需要勾选
#控制台输入,7777表示端口 nc -lp 7777 #然后输入单词,查看统计
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)