自学内容网 自学内容网

spark的共享变量

因为RDD在spark中是分布式存储

1、python中定义的变量仅仅在driver中运行,在excutor中是获取不到值的——广播变量

2、若定义了一个变量进行累加,先分别在driver和excutor中进行累加,但是结果是不会主动返回给driver的——累加器

Broadcast Variables广播变量

  • driver中存放python变量广播到别的excutor中

  • 若不使用,就会每个task存放一个

  • 不能修改,只能读

  • 通过value使用该变量

if __name__ == '__main__':
        # 配置环境
        os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
        # 配置Hadoop的路径,就是前面解压的那个路径
        os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
        # 配置base环境Python解析器的路径
        os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
        os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

        # 获取 conf 对象
        # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
        #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
        #  appName 任务的名字
        conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
        # 假如我想设置压缩
        # conf.set("spark.eventLog.compression.codec","snappy")
        # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
        sc = SparkContext(conf=conf)

        fileRdd = sc.textFile("../datas/user.tsv",2)
        city_dict = {
                1: "北京",
                2: "上海",
                3: "广州",
                4: "深圳",
                5: "苏州",
                6: "无锡",
                7: "重庆",
                8: "厦门",
                9: "大理",
                10: "成都"
        }
       
        # 将一个变量广播出去,广播到executor中,不是task中
        city_dict_broad = sc.broadcast(city_dict)
        
        # 广播变量
        # <class 'pyspark.broadcast.Broadcast'>
        print(type(city_dict_broad ))
        # <class 'dict'>
        print(type(city_dict_broad.value))
        
        def getLine(line):
                list01 = line.split(" ")
                #cityName = city_dict.get(int(list01[3]))
                # 使用广播变量的变量获取数据
                cityName = city_dict_broad.value.get(int(list01[3]))
                # print(cityName)
                return line + " " + cityName
        mapRdd = fileRdd.map(getLine)
        mapRdd.foreach(print)

        # 释放广播变量
        city_dict_broad.unpersist()
        # 使用完后,记得关闭
        sc.stop()

 累加器

将所有的excutor中的变量返回到driver中,进行汇总。

否则变量是放在excutor中的,而打印的是driver中,变量值不会改变。

用于修改——汇总

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

"""
------------------------------------------
  Description : TODO:
  SourceFile : _06SouGou案例
  Author  : yange
  Date  : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    _sum = 0
    def sumTotalLine(tuple1):
        global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            _sum += 1

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(_sum) # 结果是0


    # 使用完后,记得关闭
    sc.stop()

上面程序最终结果是:0,因为 sum=0 是在 Driver 端的内存中的,executor 中程序再累加也是无法改变 Driver 端的结果的。下面的则为正确的

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

"""
------------------------------------------
  Description : TODO:
  SourceFile : _06SouGou案例
  Author  : yange
  Date  : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    accCounter = sc.accumulator(0)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    #_sum = 0
    def sumTotalLine(tuple1):
        #global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            accCounter.add(1)

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(accCounter.value) # 104694

    # 假如我不知道累加器这个操作,这个题目怎么做?
    print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())


    # 使用完后,记得关闭
    sc.stop()

 


原文地址:https://blog.csdn.net/weixin_52642840/article/details/144436920

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!