![[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子,第1张 [2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子,第1张](/aiimages/%5B2%5D+Flink%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%B5%81%E5%BC%8F%E5%A4%84%E7%90%86%E5%88%A9%E5%89%91%3A+%E7%94%A8Flink%E8%BF%9B%E8%A1%8C%E7%BB%9F%E8%AE%A1%E7%9A%84%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E4%BE%8B%E5%AD%90.png)
上面一节博客<<[1] Flink大数据流式处理利剑: 简介>>, 整体描述了Flink是什么,能做什么,其主要架构是什么,笔者也提供了一些经典案例的链接在案例参考章节。作为入门,现在笔者就提供一个简单的用Flink来统计字符出现频率的例子来带领大家进入Flink的世界。
下面是具体的实验步骤。
- 第一步安装JDK环境: 请网上搜索
- 第二步,安装Maven
- 第三步,用archetype:generate 生成一个模板代码
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.12.1 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
执行后,其代码结构如下:
其pom.xml 内容如下:
4.0.0 frauddetection frauddetection0.1 jar Flink Walkthrough DataStream Java https://flink.apache.org UTF-8 1.12.1 1.8 2.11 ${target.java.version} ${target.java.version} 2.12.1 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-walkthrough-common_${scala.binary.version}${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} provided org.apache.flink flink-clients_${scala.binary.version}${flink.version} provided org.apache.logging.log4j log4j-slf4j-impl${log4j.version} runtime org.apache.logging.log4j log4j-api${log4j.version} runtime org.apache.logging.log4j log4j-core${log4j.version} runtime org.apache.maven.plugins maven-compiler-plugin3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin3.0.0 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA spendreport.FraudDetectionJob org.eclipse.m2e lifecycle-mapping1.0.0 org.apache.maven.plugins maven-shade-plugin[3.0.0,) shade org.apache.maven.plugins maven-compiler-plugin[3.1,) testCompile compile
其main方法所在的类如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.alertSink;
import org.apache.flink.walkthrough.common.entity.alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream alerts = transactions.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts.addSink(new alertSink()).name("send-alerts");
env.execute("Fraud Detection");
}
}
从上面的Flink的提供的例子来看,其就是三个步骤,添加Source --> 做Transformation --> Sink;
- 因为要做文字统计,所以需要从一个source里面读取文件,可以用netcat 软件进行模拟输入流的发送。以windows为例子, 大家可以到下面的网站去下载 https://eternallybored.org/misc/netcat/
下载完后,把netcat的解压缩后的目录,配置到 *** 作系统的path目录下面,这样就可以直接使用nc命令了
nc -L -p 9999
输入流的格式为一系列用“;”分隔开的字符串流,比如:
aaa;bbbb;aaa;aaa;aaaa;bbbb;bb;bbbb;aaaa;bb;
- 启动Flink的客户端,监听127.0.0.1:9999的文本输入流的发送;其代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcesource = env.socketTextStream("localhost", 9999);
- 统计上面的aaa,bbbb,bb出现次数的代码如下:
source对象为上面步骤5创建的source流对象。
source.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String[] words = value.split(";"); for(String word : words) { out.collect(word.toLowerCase().trim()); } } }).filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { return StringUtils.isNotEmpty(value); } }).map(new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2<>(value, 1); } }).keyBy(new KeySelector , String>() { @Override public String getKey(Tuple2 value) throws Exception { return value.f0; } }).sum(1).print();
- 执行流处理逻辑
注意,env对象为步骤5创建的StreamExecutionEnvironment env
env.execute("StreamingWordCountApp");
- 执行结果如下
在nc里面输入为:
在Flink的客户端的输出如下:
只看key的最后结果;bbbb 最后出现的时候,是3次;aaa最后出现的时候,次数是3次;bb最后出现的次数是2次,都能完全对上。
大家可以根据上面的代码和例子,在自己的电脑上练一练!
参考文献:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)