![[scala、spark]将MYSQL中某个表内某个条件下的数据合并到一个JSON对象中进行输出,第1张 [scala、spark]将MYSQL中某个表内某个条件下的数据合并到一个JSON对象中进行输出,第1张](/aiimages/%5Bscala%E3%80%81spark%5D%E5%B0%86MYSQL%E4%B8%AD%E6%9F%90%E4%B8%AA%E8%A1%A8%E5%86%85%E6%9F%90%E4%B8%AA%E6%9D%A1%E4%BB%B6%E4%B8%8B%E7%9A%84%E6%95%B0%E6%8D%AE%E5%90%88%E5%B9%B6%E5%88%B0%E4%B8%80%E4%B8%AAJSON%E5%AF%B9%E8%B1%A1%E4%B8%AD%E8%BF%9B%E8%A1%8C%E8%BE%93%E5%87%BA.png)
原表中的数据分为7个类别,类别字段名称为:vehicle_type;
每个类别下有若干的元素,每个类别下元素个数相同,这些元素存于一列中,列名为name_en,列内的值为元素的名称;
每个元素有相同的名称的属性若干,每个属性名称作为一列;
现在想要将一个类别下的元素、属性收集到一个JSON中
结构如下:
{
"元素名称1":{
"属性名称":"属性值"
},
"元素名称1":{
"属性名称":"属性值"
},
"元素名称1":{
"属性名称":"属性值"
},
....
}
val df = spark.read.format("jdbc").options(Map("url" -> MYSQL_URL, "dbtable" -> "factor_algo_config")).load()
df.show()
df.cache()
df.checkpoint()
implicit val formats = Serialization.formats(NoTypeHints)
val vehicle_type = df.select("vehicle_type").distinct().collect().map(_.get(0))
vehicle_type.foreach(x => {
val vehicle_df = df.select(df.schema.names.filter(!Array("id", "vehicle_type").contains(_)).map(col(_).cast("string")): _*)
.where(df("vehicle_type") === s"$x")
.na.fill("")
val vehivle_col_names = vehicle_df.schema.names
val vehicle_array_json = vehicle_df.collect().map(x => {
val dic = mutable.Map[String, String]()
(1 until vehivle_col_names.length).map(i => {
dic += (vehivle_col_names.apply(i) -> x.get(i).toString)
})
val res = Map(x.get(0).toString -> dic)
Json(DefaultFormats).write(res)
})
val json_string=vehicle_array_json.mkString(",").replace("},{",",")
val ds = spark.createDataset(Seq(json_string))
val json_df = spark.read.json(ds)
json_df.show()
})
在将Map转为JSON时发现,用scala自带的JSONObject.toString方法时,Map中嵌套的Map仍被打印成"{“a”:Map(“b”:“c”,“d”:“e”)}"
于是使用
import org.json4s._
import org.json4s.jackson.{Json, Serialization}
Json(DefaultFormats).write(res)
顺利的将JSON转为了JSON字符串,
后面将所有的JSON字符串收集到数组中后使用mkString对其进行合并
将合并后的JSON字符串中的},{替换为,达到将多条JSON合并为一条的效果
后面用createDataset解析JSON字符串,将其转为DataSet后再将其转为Dataframe
参考资料Scala-scalaMap转JSON字符串和javaMap
https://blog.csdn.net/zsyoung/article/details/88844702
Spark中json字符串和Dataframe相互转换
https://blog.csdn.net/xuejianbest/article/details/80694073
【scala】Json与Scala类型的相互转换处理
https://blog.csdn.net/lbf_ml/article/details/100534942
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)