
- 1、读取与保存文件
- 1.1、读取文本文件
- 1.2、读取MySQL中的数据
- 1.3、将数据保存为orc格式
- 2、SparkSQL SQL语法
- 3、SparkSQL DSL语法
读取以下文本文件
1500100001,施笑槐,22,女,文科六班 1500100002,吕金鹏,24,男,文科六班 1500100003,单乐蕊,22,女,理科六班 1500100004,葛德曜,24,男,理科三班 1500100005,宣谷芹,22,女,理科五班 1500100006,边昂雄,21,男,理科二班 1500100007,尚孤风,23,女,文科六班 1500100008,符半双,22,女,理科六班 1500100009,沈德昌,21,男,理科一班 1500100010,羿彦昌,23,男,理科六班
例:
// 创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: Dataframe = spark
.read
// 指定列名
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
// 指定分隔符
.option("seq", ",")
// 指定文件格式
.format("csv")
// 指定路径
.load("spark/data/stu/students.txt")
// 显示十行
studentsDF.show(10)
1.2、读取MySQL中的数据
例:
// 创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("readMySQL")
.master("local")
.getOrCreate()
val stuJDBC: Dataframe = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/student")
.option("dbtable", "student")
.option("user", "root")
.option("password", "123456")
.load()
// 显示前十行
stuJDBC.show(10)
1.3、将数据保存为orc格式
将上述从MySQL中读取的数据保存为orc格式的文件
例:
// 保存文件
stuJDBC.write
// 指定保存的模式
// 指定写入的模式: Append追加、Overwrite覆盖、ErrorIfExists存在就报错、Ignore存在即忽略
.mode(SaveMode.Overwrite)
// 指定输出格式
.format("orc")
.save("spark/data/stuORC/")
2、SparkSQL SQL语法
SparkSQL SQL语法中可以执行SQL语句,并且支持开窗函数,可以将写好的SQL语句放入spark.sql()中执行。在此就不多赘述。
下面将举例一个简单的例子
需求:统计每个班级有多少学生
例:
// 创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: Dataframe = spark
.read
// 指定列名
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
// 指定分隔符
.option("seq", ",")
// 指定文件格式
.format("csv")
// 指定路径
.load("spark/data/stu/students.txt")
// 创建一个临时视图
studentsDF.createOrReplaceTempView("students")
spark.sql(
"""
|select clazz
| ,count(id) as cnt
|from students
|group by clazz
""".stripMargin).show()
3、SparkSQL DSL语法
DSL含有
- select
- filter
- where
- join
- order by
等等 *** 作
// 创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: Dataframe = spark
.read
// 指定列名
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
// 指定分隔符
.option("seq", ",")
// 指定文件格式
.format("csv")
// 指定路径
.load("spark/data/stu/students.txt")
// 导入隐式转换
import spark.implicits._
studentsDF
.select($"id", $"name", $"age", $"clazz")
// where筛选出理科班的学生
.where($"clazz" like "理科%")
// filter筛选年龄大于23的学生
.filter(row => {
val age: Int = row.getAs[Int]("age")
age > 23
})
.show()
需求:统计每个班级的人数
// 创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("readTextFile")
.master("local")
.getOrCreate()
val studentsDF: Dataframe = spark
.read
// 指定列名
.schema("id INT,name STRING,age INT,gender STRING,clazz STRING")
// 指定分隔符
.option("seq", ",")
// 指定文件格式
.format("csv")
// 指定路径
.load("spark/data/stu/students.txt")
// 导入隐式转换
import spark.implicits._
studentsDF
.groupBy($"clazz")
.count()
.show()
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)