![Spark[1]:基本概念与python接口使用,第1张 Spark[1]:基本概念与python接口使用,第1张](/aiimages/Spark%5B1%5D%EF%BC%9A%E5%9F%BA%E6%9C%AC%E6%A6%82%E5%BF%B5%E4%B8%8Epython%E6%8E%A5%E5%8F%A3%E4%BD%BF%E7%94%A8.png)
RDD 是 Resilient Distributes Datasets 的缩写。
RDD 基于cluster中node个数进行partition
1. RDD Transformations从当前RDD创建一个新的RDD懒加载:the results are only computed when evaluated by actions
比如map()就是一个transformation,从一个RDD根据对应函数生成另外一个RDD
2. RDD ActionsActions return a value to driver program after running a computation.
比如reduce()就是一个action,用于aggregates all RDD elements
3. DAGDAG的全称:Directed Acyclic Graph
Spark依赖DAGS确保fault tolerance,当一个节点坏掉,Spark复制DAG重新回复node
- 创建SparkContext与SparkSession创建RDDDataframes 和 SparkSQL的使用
预备工作
# Installing required packages !pip install pyspark !pip install findspark import findspark findspark.init() # PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession
SparkContext是spark app的入口,包含一系列的function,比如创建RDD的parallelize()
SparkSession是SparkSQL和Dataframe *** 作的必须品
创建SparkContext和SparkSession的实例:
# Creating a spark context class
sc = SparkContext()
# Creating a spark session
spark = SparkSession
.builder
.appName("Python Spark Dataframes basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
创建RDD然后使用transformations
# create an RDD which has integres from 1-30 data = range(1, 30) xrangeRDD = sc.parallelize(data, 4) # transformations subRDD = xrangeRDD.map(lambda x: x-1) filteredRDD = subRDD.filter(lambda x : x<10)
创建Dataframe并使用多种方法查询数据,最后关闭。
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("***.json").cache()
# Print the dataframe as well as the data schema
df.show()
df.printSchema()
# Register the Dataframe as a SQL temporary view
df.createTempView("people")
# Select and show basic data columns
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()
# Perform basic filtering
df.filter(df["age"] > 21).show()
spark.sql("SELECt age, name FROM people WHERe age > 21").show()
# 在单独col上的 *** 作,新创造一列old,数值为age的3倍
df.withColumn('old', df['age']*3).show()
# Perfom basic aggregation of data
df.groupBy("age").count().show()
spark.sql("SELECt age, COUNT(age) as count FROM people GROUP BY age").show()
# close the SparkSession
spark.stop()
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)