pyspark --- 将df按照某一列展开

pyspark --- 将df按照某一列展开,第1张

pyspark --- 将df按照某一列展开

from pyspark import SparkContext, SQLContext, SparkConf
from pyspark.sql import SparkSession
import warnings
from pyspark.sql import functions as fn
from pyspark.sql.types import StructField, LongType

# sc = SparkContext(appName="AAA")
sqlContext = SQLContext(sc)
ss = SparkSession(sc).builder 
    .config('spark.sql.shuffle.partitions', 2000) 
    .config('spark.executor.memoryOverhead', 8192) 
    .config('spark.driver.memoryOverhead', 8192) 
    .config('spark.dynamicAllocation.enabled', 'true') 
    .getOrCreate()

warnings.filterwarnings("ignore", category=DeprecationWarning)
sc.setLogLevel('ERROR')     
data = ss.createDataframe([
    {'A': 'X', 'B': 'X', 'C': 3}, {'A': 'Y', 'B': 'Y', 'C': 1}, {'A': 'L', 'B': 'L', 'C': 1}
])
data.show()

def row_dealwith(row):
    a, b, c = row[0], row[1], row[2]
    if c == 1:
        return (a, b, c)
    resA, resB, resC = [], [], []
    for i in range(c):
        resA.append(a)
        resB.append(b)        
        resC.append(1)
    return (','.join(str(i) for i in resA), ','.join(str(i) for i in resB), ','.join(str(i) for i in resC),)

newdata = data.rdd.map(row_dealwith).toDF(schema=['A', 'B', 'C'])
newdata.show()

dfA = newdata.withColumn('A', fn.explode(fn.split(newdata.A, ','))).select('A')
dfB = newdata.withColumn('B', fn.explode(fn.split(newdata.B, ','))).select('B')
dfC = newdata.withColumn('C', fn.explode(fn.split(newdata.C, ','))).select('C')

dfA = mkdf_tojoin(dfA, ss)
dfB = mkdf_tojoin(dfB, ss)
dfC = mkdf_tojoin(dfC, ss)

dfres = dfA.join(dfB, on=['tmpid'], how='left').join(dfC, on=['tmpid'], how='left').drop('tmpid')
dfres.show()

def flat(l):
    for k in l:
        if not isinstance(k, (list, tuple)):
            yield k
        else:
            yield from flat(k)
            
def mkdf_tojoin(df, ss):
    schema = df.schema.add(StructField("tmpid", LongType()))
    rdd = df.rdd.zipWithIndex()
    rdd = rdd.map(lambda x: list(flat(x)))
    df = ss.createDataframe(rdd, schema)
    return df

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5638600.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-16
下一篇2022-12-16

发表评论

登录后才能评论

评论列表(0条)

    保存