自学内容网 自学内容网

PySpark 数据处理实战:从基础操作到案例分析

        在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。

一、手机号码流量统计案例

(一)需求分析

        给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。

 

(二)代码实现

以下是实现该功能的 PySpark 代码:

import math
import os
import re
from collections.abc import Iterable

# 导入pyspark模块
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式")

    sc = SparkContext(conf=conf)

    fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat")
    print(fileRdd.count())
    filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None )
    print(filterRdd.count())

    mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2])))

    rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num)

    rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB"))

    # 使用完后,记得关闭
    sc.stop()

 

(三)代码解析

  1. 首先,配置了 PySpark 运行所需的环境变量,包括 JAVA_HOMEHADOOP_HOME 以及 Python 解析器路径。
  2. 通过 SparkConf 设置运行模式为本地(local[*])并指定应用名称,然后创建 SparkContext 对象。
  3. 使用 textFile 读取数据文件,得到 fileRdd
  4. 利用 filter 操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到 filterRdd
  5. 对 filterRdd 进行 map 操作,提取手机号码和总流量。
  6. 通过 reduceByKey 按手机号码分组并计算总流量。
  7. 最后,使用 foreach 输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。

 

二、合同数据分析案例

 

(一)需求分析

        给定合同数据文件,包含合同 ID、客户 ID、合同类型、总金额、合同付款类型、注册时间、购买数量、合同签约时间、购买的产品、是否已经交货等字段。需要查询已交货和未交货的数量分别是多少、购买合同的总金额是多少以及分期付款占全部订单的比例。

 

(二)代码实现

以下是实现该功能的 PySpark 代码:

import os
import re

# 导入pyspark模块
from pyspark import SparkContext, SparkConf


class Contract:
    def __init__(self,line):
        # 合同类型, 总金额,合同付款类型,是否已经交货
        tuple1 = re.split(",",line)
        self.contract_type=tuple1[2]
        self.contract_money=int(tuple1[3])
        self.pay_type=tuple1[4]
        self.isDelivery=tuple1[-1]

    def __repr__(self):
        return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("合同分析")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    print(sc)

    mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \
    .filter(lambda line: line.find("合同ID") == -1) \
    .map(lambda line: Contract(line)) \

    """
    1. 已交货和未交货的数量分别是多少
    2. 购买合同的总金额是多少
    3. 分期付款占全部订单的比例
    """
    totalNum = mapRdd.count()
    deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count()
    print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum)

    gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \
                   .map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money)

    gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \
        .map(lambda contract: contract.contract_money).sum()
    print("购买合同的总金额是:",gouMaiMoney2)

    # 第三问
    fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count()
    print("分期付款占全部订单的比例是:",fenQiNum/totalNum)
    # 使用完后,记得关闭
    sc.stop()

 

(三)代码解析

  1. 同样先配置环境变量并创建 SparkContext 对象。
  2. 定义了 Contract 类来封装合同数据的相关字段。
  3. 读取合同数据文件并进行过滤,排除标题行,然后将每行数据映射为 Contract 对象,得到 mapRdd
  4. 对于已交货和未交货数量的统计,先计算总订单数 totalNum,再通过过滤得到已交货订单数 deliverNum,进而得出未交货订单数。
  5. 计算购买合同总金额时,先过滤出购买合同类型的数据,然后提取金额并进行求和操作。
  6. 计算分期付款占比,先统计分期付款订单数 fenQiNum,再除以总订单数 totalNum

 

三、日志分析案例

 

(一)需求分析

  1. 统计热门搜索词 Top10,即统计用户搜索每个词出现的次数,然后降序排序取前 10。
  2. 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、最小和平均点击次数。
  3. 统计一天每小时点击量并按照点击量降序排序,即统计每个小时点击的数据量并按降序排列。

 

(二)jieba分词器

汉语是需要分词的

python语言: Jieba 分词器

Java语言: IK 分词器(好久没更新过了)

安装一下

pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/

没有自定版本,安装的就是最新的版本

使用

语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")

测试

import jieba
# 测试一下结巴分词器
str = "中华人民共和国"
list01 = jieba.cut(str, cut_all=True)
# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国
print(",".join(list01))
# 中华人民共和国
list02 = jieba.cut(str, cut_all=False)
for ele in list02:
    print(ele)

# 中华 华人 人民 共和 共和国 中华人民共和国  比全模式少多,比精确模式多,适用于搜索引擎
list03 = jieba.cut_for_search(str)
print(*list03)

(四)代码实现

以下是实现日志分析功能的 PySpark 代码:

import os
import re

# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jieba


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    # 获取 conf 对象
    conf = SparkConf().setMaster("local[*]").setAppName("")
    # 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字
    sc = SparkContext(conf=conf)
    # print(sc)

    # 清洗数据
    print("===========清洗数据===========")
    fileRdd = sc.textFile("../../datas/sogou/sogou.tsv")
    print(fileRdd.count())
    print(fileRdd.first())
    listRdd = fileRdd.map(lambda line: re.split("\\s+", line))
    filterList = listRdd.filter(lambda l1: len(l1) == 6)
    # 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了
    tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))


    # 求热词top10
    print("===========求热词top10===========")
    wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))
    filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter(
        lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None)
    # filterRdd2.foreach(print)
    result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy(
        keyfunc=lambda tup: tup[1], ascending=False).take(10)

    for ele in result:
        print(ele)


    # 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数
    print("===========统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数===========")


    def splitWord(tupl):
        li1 = jieba.cut_for_search(tupl[2])  # 中国 中华 共和国
        li2 = list()
        for word in li1:
            li2.append(((tupl[1], word), 1))
        return li2


    newRdd = tupleRdd.flatMap(splitWord)
    # newRdd.foreach(print)
    reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num)
    # reduceByUIDAndWordRdd.foreach(print)

    valList = reduceByUIDAndWordRdd.values()
    print(f"最大点击次数: {valList.max()}")
    print(f"最小点击次数: {valList.min()}")
    print(f"中位数: {valList.mean()}")  # 中位数
    print(f"平均点击次数: {valList.sum() / valList.count()}")


    # 统计一天每小时点击量并按照点击量降序排序
    print("===========统计一天每小时点击量并按照点击量降序排序===========")
    reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num)
    sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False)
    listNum = sortRdd.take(24)
    for ele in listNum:
        print(ele)

    # 使用完后,记得关闭
    sc.stop()

 

(三)代码解析

  1. 配置环境变量后创建 SparkContext 对象。
  2. 定义 getWords 函数,用于将搜索词进行分词并构建 ((用户id,词), 1) 的格式。
  3. 读取日志数据文件,进行数据清洗,排除数据长度不足 6 的行和包含特定违禁词的行,然后提取相关字段得到 mapRdd
  4. 对于热词 Top10 的统计,先对热词进行分词,过滤掉特定词和非中文词,然后映射为 (词, 1) 格式,通过 reduceByKey 统计词频,最后按词频降序排序并取前 10。
  5. 统计最大、最小和平均点击次数时,先通过 flatMap 和 getWords 函数构建 ((用户id,词),点击次数) 格式的数据,过滤掉非中文词和特定词后,通过 reduceByKey 统计点击次数,再获取值并计算相关统计量。
  6. 统计一天每小时点击量时,先提取小时信息并映射为 (小时, 1) 格式,通过 reduceByKey 统计每小时点击量,最后按点击量降序排序并收集结果。

 

四、常见错误及解决方法

        在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error 错误。

Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)

        原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take() 算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take 算子。

 

五、总结

        通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、映射、分组求和、排序以及特定数据统计等常见操作。同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。


原文地址:https://blog.csdn.net/weixin_64726356/article/details/143647366

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!