![spark[2]: 关于partition的相关 *** 作(帮助理解RDD),第1张 spark[2]: 关于partition的相关 *** 作(帮助理解RDD),第1张](/aiimages/spark%5B2%5D%EF%BC%9A+%E5%85%B3%E4%BA%8Epartition%E7%9A%84%E7%9B%B8%E5%85%B3%E6%93%8D%E4%BD%9C%28%E5%B8%AE%E5%8A%A9%E7%90%86%E8%A7%A3RDD%29.png)
准备工作
# PySpark is the Spark API for Python. we use PySpark to initialize the spark context.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Creating a spark context class
# sc = SparkContext()
# Creating a spark session
spark = SparkSession
.builder
.appName("Python Spark IBM Cloud Example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
创建RDD
nums = [i for i in range(10)]
rdd = sc.parallelize(nums)
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
使用getNumPartitions()查看分区数量,使用glom()查看分区内容。
output:
在函数parallelize()指定分区数量
- 分区为2
rdd = sc.parallelize(nums, 2)
print("Default parallelism: {}".format(sc.defaultParallelism))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
output:
- 分区为15
rdd = sc.parallelize(nums, 15)
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
output:
使用partitionBy()函数
其中,数据集需要是具有键/值对的元组,因为默认分区器使用键的哈希值将元素分配给分区。
rdd = sc.parallelize(nums)
.map(lambda el: (el, el))
.partitionBy(2)
.persist()
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
j=0
for i in rdd.glom().collect():
j+=1
print("partition: " + str(j) + " "+ str(i))
output:
显式分区
准备数据:
transactions = [
{'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
{'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
{'name': 'Marek', 'amount': 51, 'country': 'Poland'},
{'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
{'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
{'name': 'Paul', 'amount': 75, 'country': 'Poland'},
{'name': 'Pierre', 'amount': 120, 'country': 'France'},
{'name': 'Frank', 'amount': 180, 'country': 'France'}
]
为了使得每一个国家在一个node中,我们使用自定义分区:
# Dummy implementation assuring that data for each country is in one partition
def country_partitioner(country):
return hash(country)% (10**7+1)
#return portable_hash(country)
rdd = sc.parallelize(transactions)
.map(lambda el: (el['country'], el))
.partitionBy(3, country_partitioner)
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))
print("n--n")
for i, j in enumerate(rdd.glom().collect()):
print("npartition: " + str(i+1) + "n"+ str(j))
output:
利用partition做计算
使用mapPartitions()函数可以单独对每个partition进行map运算。
def sum_sales(iterator):
yield sum(transaction[1]['amount'] for transaction in iterator)
by_country = sc.parallelize(transactions)
.map(lambda el: (el['country'], el))
.partitionBy(5, country_partitioner)
# Sum sales in each partition
sum_amounts = by_country
.mapPartitions(sum_sales)
.collect()
print("Total sales for each partition: {}".format(sum_amounts))
output:
每个国家对应的amounts总数
使用dataframe查看partition效果
df = spark.createDataframe(transactions)
for i, j in enumerate(df.rdd.glom().collect()):
print("partition: " + str(i+1) + "n"+ str(j))
output:
可知数据分布在两个partition。
使用repartition函数
可以直接指定列名:
# Repartition by column
df2 = df.repartition(10,"country")
print("nAfter 'repartition()'")
print("Number of partitions: {}".format(df2.rdd.getNumPartitions()))
print("Partitioner: {}".format(df2.rdd.partitioner))
print("Partitions structure: {}".format(df2.rdd.glom().collect()))
outoput:
查看每个partition中的数据:
for i, j in enumerate(df2.rdd.glom().collect()):
print("partition: " + str(i+1) + "n"+ str(j))
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)