[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子

[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子,第1张

[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子

上面一节博客<<[1] Flink大数据流式处理利剑: 简介>>, 整体描述了Flink是什么,能做什么,其主要架构是什么,笔者也提供了一些经典案例的链接在案例参考章节。作为入门,现在笔者就提供一个简单的用Flink来统计字符出现频率的例子来带领大家进入Flink的世界。

下面是具体的实验步骤

  1. 第一步安装JDK环境: 请网上搜索
  2. 第二步,安装Maven
  3. 第三步,用archetype:generate 生成一个模板代码
mvn archetype:generate  -DarchetypeGroupId=org.apache.flink  
-DarchetypeArtifactId=flink-walkthrough-datastream-java    
-DarchetypeVersion=1.12.1   
-DgroupId=frauddetection   
-DartifactId=frauddetection   
-Dversion=0.1    
-Dpackage=spendreport 
-DinteractiveMode=false

执行后,其代码结构如下:

其pom.xml 内容如下:


	4.0.0
	frauddetection
	frauddetection
	0.1
	jar
	Flink Walkthrough DataStream Java
	https://flink.apache.org
	
		UTF-8
		1.12.1
		1.8
		2.11
		${target.java.version}
		${target.java.version}
		2.12.1
	
	
		
			apache.snapshots
			Apache Development Snapshot Repository
			https://repository.apache.org/content/repositories/snapshots/
			
				false
			
			
				true
			
		
	
	
		
			org.apache.flink
			flink-walkthrough-common_${scala.binary.version}
			${flink.version}
		
		
			org.apache.flink
			flink-streaming-java_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-clients_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.logging.log4j
			log4j-slf4j-impl
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j.version}
			runtime
		
	

	
		

			
			
				org.apache.maven.plugins
				maven-compiler-plugin
				3.1
				
					${target.java.version}
					${target.java.version}
				
			
			
				org.apache.maven.plugins
				maven-shade-plugin
				3.0.0
				
					
					
						package
						
							shade
						
						
							
								
									org.apache.flink:force-shading
									com.google.code.findbugs:jsr305
									org.slf4j:*
									org.apache.logging.log4j:*
								
							
							
								
									*:*
									
										meta-INF/*.SF
										meta-INF/*.DSA
										meta-INF/*.RSA
									
								
							
							
								
									spendreport.FraudDetectionJob
								
							
						
					
				
			
		

		
			
				
					org.eclipse.m2e
					lifecycle-mapping
					1.0.0
					
						
							
								
									
										org.apache.maven.plugins
										maven-shade-plugin
										[3.0.0,)
										
											shade
										
									
									
										
									
								
								
									
										org.apache.maven.plugins
										maven-compiler-plugin
										[3.1,)
										
											testCompile
											compile
										
									
									
										
									
								
							
						
					
				
			
		
	

其main方法所在的类如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.alertSink;
import org.apache.flink.walkthrough.common.entity.alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		DataStream transactions = env
			.addSource(new TransactionSource())
			.name("transactions");

		DataStream alerts = transactions.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");

		alerts.addSink(new alertSink()).name("send-alerts");

		env.execute("Fraud Detection");
	}
}

从上面的Flink的提供的例子来看,其就是三个步骤,添加Source --> 做Transformation --> Sink;

  1. 因为要做文字统计,所以需要从一个source里面读取文件,可以用netcat 软件进行模拟输入流的发送。以windows为例子, 大家可以到下面的网站去下载 https://eternallybored.org/misc/netcat/

    下载完后,把netcat的解压缩后的目录,配置到 *** 作系统的path目录下面,这样就可以直接使用nc命令了
nc -L -p 9999


输入流的格式为一系列用“;”分隔开的字符串流,比如:

aaa;bbbb;aaa;aaa;aaaa;bbbb;bb;bbbb;aaaa;bb;
  1. 启动Flink的客户端,监听127.0.0.1:9999的文本输入流的发送;其代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.socketTextStream("localhost", 9999);
  1. 统计上面的aaa,bbbb,bb出现次数的代码如下:
    source对象为上面步骤5创建的source流对象。
	 source.flatMap(new FlatMapFunction() {
         @Override
         public void flatMap(String value, Collector out) throws Exception {
             String[] words = value.split(";");
             for(String word : words) {
                 out.collect(word.toLowerCase().trim());
             }
         }
     }).filter(new FilterFunction() {
         @Override
         public boolean filter(String value) throws Exception {
             return StringUtils.isNotEmpty(value);
         }
     }).map(new MapFunction>() {
         @Override
         public Tuple2 map(String value) throws Exception {
             return new Tuple2<>(value, 1);
         }
     }).keyBy(new KeySelector, String>() {
         @Override
         public String getKey(Tuple2 value) throws Exception {
             return value.f0;
         }
     }).sum(1).print();
  1. 执行流处理逻辑
    注意,env对象为步骤5创建的StreamExecutionEnvironment env
 env.execute("StreamingWordCountApp");
  1. 执行结果如下
    在nc里面输入为:

    在Flink的客户端的输出如下:

    只看key的最后结果;bbbb 最后出现的时候,是3次;aaa最后出现的时候,次数是3次;bb最后出现的次数是2次,都能完全对上。

大家可以根据上面的代码和例子,在自己的电脑上练一练!

参考文献:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5682111.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存