自学内容网 自学内容网

Spark RDD 操作实战

Spark RDD 基础

更多spark相关知识请查看官方接口文档

PySpark是Spark的PythonAPI,允许Python调用Spark编程模型。

配置spark环境

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q www-us.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz  
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.8-bin-hadoop2.7"
import findspark
findspark.init()

初始化Spark

SparkContext

from pyspark import SparkContext
sc = SparkContext(master = 'local[2]')

SparkContext信息获取

sc.version   #获取SparkContext版本
'2.4.8'
sc.pythonVer   #获取Python版本
'3.7'
sc.master   #要连接的MasterURL
'local[2]'
str(sc.sparkHome)   #Spark在工作节点的安装路径
'None'
str(sc.sparkUser())   #获取SparkContext的Spark用户名
'root'
sc.appName   #返回应用名称
'pyspark-shell'
sc.applicationId   #获取应用程序ID
'local-1623220140497'
sc.defaultParallelism   #返回默认并行级别
2
sc.defaultMinPartitions   #RDD默认最小分区数
2

配置

from pyspark import SparkConf, SparkContext

conf = (SparkConf() \
        .setMaster("local") \
        .setAppName("My app") \
        .set("spark.executor.memory", "1g"))

sc = SparkContext.getOrCreate(conf = conf)

使用Shell

PySpark Shell已经为SparkContext创建了名为 sc 的变量。

$./bin/spark-shell --master local[2]        #命令行启动spark
$./bin/pyspark --master local[4] --py-files code.py     #命令行提交spark脚本任务

用 --master 参数设定 Context 连接到哪个Master 务器,通过传递逗号分隔列表至 --py-files 添加 Python.zip、.egg 或 .py文件到 Runtime 路径。

加载数据

并行集合

rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])

外部数据

使用textFile()函数从HDFS、本地文件或其它支持Hadoop的文件系统里读取文本文件,或使用wholeTextFiles()函数读取目录里文本文件。

textFile = sc.textFile("/my/directory/*.txt")
# 如果是在google colab中可以运行下方代码
# textFile = sc.textFile("sample_data/california_housing_train.csv")
textFile2 = sc.wholeTextFiles("/my/directory/")
# 如果是在google colab中可以运行下方代码
# textFile2 = sc.wholeTextFiles("sample_data/")

提取RDD信息

基础信息

rdd.getNumPartitions()   #列出分区数
2
rdd.count()   #计算RDD实例数量
3
rdd.countByKey()   #按键计算RDD实例数量
defaultdict(int, {'a': 2, 'b': 1})
rdd.countByValue()   #按值计算RDD实例数量
defaultdict(int, {('a', 2): 1, ('a', 7): 1, ('b', 2): 1})
rdd.collectAsMap()   #以字典形式返回键值
{'a': 2, 'b': 2}
rdd3.sum()   #RDD元素求和
4950
sc.parallelize([]).isEmpty()   #检查RDD是否为空
True

汇总

rdd3.max()   #RDD元素的最大值
99
rdd3.min()   #RDD元素的最小值
0
rdd3.mean()   #RDD元素的平均值
49.5
rdd3.stdev()   #RDD元素的标准差
28.86607004772212
rdd3.variance()   #RDD元素的方差
833.25
rdd3.histogram(3)   #分箱(Bin)生成直方图
([0, 33, 66, 99], [33, 33, 34])
rdd3.stats()   #综合统计包括:计数、平均值、标准差、最大值和最小值
(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)

应用函数

map与flatmap函数

rdd.map(lambda x: x+(x[1],x[0])).collect()   #对每个RDD元素执行函数
[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]
rdd5=rdd.flatMap(lambda x: x+(x[1],x[0]))   #对每个RDD元素执行函数,并拉平结果
rdd5.collect()
['a', 7, 7, 'a', 'a', 2, 2, 'a', 'b', 2, 2, 'b']
rdd4.flatMapValues(lambda x: x).collect()   #不改变键,对rdd4的每个键值对执行flatMap函数
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

选择数据

获取

rdd.collect()   #返回包含所有RDD元素的列表
[('a', 7), ('a', 2), ('b', 2)]
rdd.filter(lambda x: "a" in x) .collect()   #提取前两个RDD元素
[('a', 7), ('a', 2)]
rdd.first()   #提取第一个RDD元素
('a', 7)
rdd5.distinct().collect()   #提取前两个RDD元素
[2, 'b', 'a', 7]

抽样

rdd3.sample(False, 0.15, 81).collect()   #返回rdd3的采样子集
[3, 4, 27, 28, 35, 41, 43, 49, 51, 55, 64, 65, 66, 67, 85, 88, 89, 92]

筛选

 rdd.filter(lambda x: "a" in x) .collect()   #筛选RDD
[('a', 7), ('a', 2)]
rdd5.distinct().collect()   #返回RDD里的唯一值
[2, 'b', 'a', 7]
rdd.keys().collect()   #返回RDD键值对里的键
['a', 'a', 'b']

迭代

foreach函数迭代

def g(x):
    print(x)
rdd.foreach(g)   #为所有RDD应用函数

改变数据形状

Reduce操作

rdd.reduceByKey(lambda x,y : x+y).collect()   #合并每个键的RDD值
[('b', 2), ('a', 9)]
rdd.reduce(lambda a, b: a + b)   #合并RDD的值
('a', 7, 'a', 2, 'b', 2)

分组

# rdd3.groupBy(lambda x: x % 2).mapValues(list).collect()   #返回RDD的分组值
rdd.groupByKey().mapValues(list).collect()   #按键分组RDD
[('b', [2]), ('a', [7, 2])]

聚合

seqOp = (lambda x,y: (x[0]+y,x[1]+1))
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
add = (lambda x,y:x+y)
rdd3.aggregate((0,0), seqOp, combOp)   #汇总每个分区里的RDD元素,并输出结果
(4950, 100)
rdd.aggregateByKey((0,0), seqOp, combOp).collect()   #汇总每个RDD的键的值
[('b', (2, 1)), ('a', (9, 2))]
rdd3.fold(0, add)   #汇总每个分区里的RDD元素,并输出结果
4950
rdd.foldByKey(0, add).collect()   #合并每个键的值
[('b', 2), ('a', 9)]
# rdd3.keyBy(lambda x: x+x).collect()   #通过执行函数,创建RDD元素的元组

数学运算

RDD运算

rdd.subtract(rdd2).collect()   #返回在rdd2里没有匹配键的rdd键值对
[('b', 2), ('a', 7)]
rdd2.subtractByKey(rdd).collect()   #返回rdd2里的每个(键,值)对,rdd中没有匹配的键
[('d', 1)]
rdd.cartesian(rdd2).collect()   #返回rdd和rdd2的笛卡尔积
[(('a', 7), ('a', 2)),
 (('a', 7), ('d', 1)),
 (('a', 7), ('b', 1)),
 (('a', 2), ('a', 2)),
 (('b', 2), ('a', 2)),
 (('a', 2), ('d', 1)),
 (('a', 2), ('b', 1)),
 (('b', 2), ('d', 1)),
 (('b', 2), ('b', 1))]

排序

RDD排序

rdd2.sortBy(lambda x: x[1]).collect()   #按给定函数排序
[('d', 1), ('b', 1), ('a', 2)]
rdd2.sortByKey() .collect()   #RDD按键排序RDD的键值对
[('a', 2), ('b', 1), ('d', 1)]

重分区

repartition函数

rdd.repartition(4)   #新建一个含4个分区的RDD
MapPartitionsRDD[104] at coalesce at NativeMethodAccessorImpl.java:0
rdd.coalesce(1)   #将RDD中的分区数缩减为1个
CoalescedRDD[105] at coalesce at NativeMethodAccessorImpl.java:0

保存

存储RDD到本地或HDFS

rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child", 'org.apache.hadoop.mapred.TextOutputFormat')

终止SparkContext

停止SparkContext

sc.stop()

执行脚本程序

提交脚本执行

$./bin/spark-submit examples/src/main/python/pi.py

原文地址:https://blog.csdn.net/lyzsun8295/article/details/139096502

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!