spark的共享变量
因为RDD在spark中是分布式存储
1、python中定义的变量仅仅在driver中运行,在excutor中是获取不到值的——广播变量
2、若定义了一个变量进行累加,先分别在driver和excutor中进行累加,但是结果是不会主动返回给driver的——累加器
Broadcast Variables广播变量
-
driver中存放python变量广播到别的excutor中
-
若不使用,就会每个task存放一个
-
不能修改,只能读
-
通过value使用该变量
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
# 假如我想设置压缩
# conf.set("spark.eventLog.compression.codec","snappy")
# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
sc = SparkContext(conf=conf)
fileRdd = sc.textFile("../datas/user.tsv",2)
city_dict = {
1: "北京",
2: "上海",
3: "广州",
4: "深圳",
5: "苏州",
6: "无锡",
7: "重庆",
8: "厦门",
9: "大理",
10: "成都"
}
# 将一个变量广播出去,广播到executor中,不是task中
city_dict_broad = sc.broadcast(city_dict)
# 广播变量
# <class 'pyspark.broadcast.Broadcast'>
print(type(city_dict_broad ))
# <class 'dict'>
print(type(city_dict_broad.value))
def getLine(line):
list01 = line.split(" ")
#cityName = city_dict.get(int(list01[3]))
# 使用广播变量的变量获取数据
cityName = city_dict_broad.value.get(int(list01[3]))
# print(cityName)
return line + " " + cityName
mapRdd = fileRdd.map(getLine)
mapRdd.foreach(print)
# 释放广播变量
city_dict_broad.unpersist()
# 使用完后,记得关闭
sc.stop()
累加器
将所有的excutor中的变量返回到driver中,进行汇总。
否则变量是放在excutor中的,而打印的是driver中,变量值不会改变。
用于修改——汇总
import os
import re
import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
"""
------------------------------------------
Description : TODO:
SourceFile : _06SouGou案例
Author : yange
Date : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
# 假如我想设置压缩
# conf.set("spark.eventLog.compression.codec","snappy")
# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
sc = SparkContext(conf=conf)
mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
.filter(lambda line:len(re.split("\s+",line)) == 6) \
.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)
# 统计一天每小时点击量并按照点击量降序排序
_sum = 0
def sumTotalLine(tuple1):
global _sum # 把_sum 设置为全局变量
timeStr = tuple1[0] # 10:19:18
if timeStr[0:2] == '10':
_sum += 1
mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
print(_sum) # 结果是0
# 使用完后,记得关闭
sc.stop()
上面程序最终结果是:0,因为 sum=0 是在 Driver 端的内存中的,executor 中程序再累加也是无法改变 Driver 端的结果的。下面的则为正确的
import os
import re
import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
"""
------------------------------------------
Description : TODO:
SourceFile : _06SouGou案例
Author : yange
Date : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
# 假如我想设置压缩
# conf.set("spark.eventLog.compression.codec","snappy")
# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
sc = SparkContext(conf=conf)
accCounter = sc.accumulator(0)
mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
.filter(lambda line:len(re.split("\s+",line)) == 6) \
.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)
# 统计一天每小时点击量并按照点击量降序排序
#_sum = 0
def sumTotalLine(tuple1):
#global _sum # 把_sum 设置为全局变量
timeStr = tuple1[0] # 10:19:18
if timeStr[0:2] == '10':
accCounter.add(1)
mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
print(accCounter.value) # 104694
# 假如我不知道累加器这个操作,这个题目怎么做?
print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())
# 使用完后,记得关闭
sc.stop()
原文地址:https://blog.csdn.net/weixin_52642840/article/details/144436920
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!