PySpark 数据处理实战:从基础操作到案例分析
在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。
一、手机号码流量统计案例
(一)需求分析
给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。
(二)代码实现
以下是实现该功能的 PySpark 代码:
import math
import os
import re
from collections.abc import Iterable
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式")
sc = SparkContext(conf=conf)
fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat")
print(fileRdd.count())
filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None )
print(filterRdd.count())
mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2])))
rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num)
rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB"))
# 使用完后,记得关闭
sc.stop()
(三)代码解析
- 首先,配置了 PySpark 运行所需的环境变量,包括
JAVA_HOME
、HADOOP_HOME
以及 Python 解析器路径。 - 通过
SparkConf
设置运行模式为本地(local[*]
)并指定应用名称,然后创建SparkContext
对象。 - 使用
textFile
读取数据文件,得到fileRdd
。 - 利用
filter
操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到filterRdd
。 - 对
filterRdd
进行map
操作,提取手机号码和总流量。 - 通过
reduceByKey
按手机号码分组并计算总流量。 - 最后,使用
foreach
输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。
二、合同数据分析案例
(一)需求分析
给定合同数据文件,包含合同 ID、客户 ID、合同类型、总金额、合同付款类型、注册时间、购买数量、合同签约时间、购买的产品、是否已经交货等字段。需要查询已交货和未交货的数量分别是多少、购买合同的总金额是多少以及分期付款占全部订单的比例。
(二)代码实现
以下是实现该功能的 PySpark 代码:
import os
import re
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
class Contract:
def __init__(self,line):
# 合同类型, 总金额,合同付款类型,是否已经交货
tuple1 = re.split(",",line)
self.contract_type=tuple1[2]
self.contract_money=int(tuple1[3])
self.pay_type=tuple1[4]
self.isDelivery=tuple1[-1]
def __repr__(self):
return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("合同分析")
# 假如我想设置压缩
# conf.set("spark.eventLog.compression.codec","snappy")
# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
sc = SparkContext(conf=conf)
print(sc)
mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \
.filter(lambda line: line.find("合同ID") == -1) \
.map(lambda line: Contract(line)) \
"""
1. 已交货和未交货的数量分别是多少
2. 购买合同的总金额是多少
3. 分期付款占全部订单的比例
"""
totalNum = mapRdd.count()
deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count()
print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum)
gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \
.map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money)
gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \
.map(lambda contract: contract.contract_money).sum()
print("购买合同的总金额是:",gouMaiMoney2)
# 第三问
fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count()
print("分期付款占全部订单的比例是:",fenQiNum/totalNum)
# 使用完后,记得关闭
sc.stop()
(三)代码解析
- 同样先配置环境变量并创建
SparkContext
对象。 - 定义了
Contract
类来封装合同数据的相关字段。 - 读取合同数据文件并进行过滤,排除标题行,然后将每行数据映射为
Contract
对象,得到mapRdd
。 - 对于已交货和未交货数量的统计,先计算总订单数
totalNum
,再通过过滤得到已交货订单数deliverNum
,进而得出未交货订单数。 - 计算购买合同总金额时,先过滤出购买合同类型的数据,然后提取金额并进行求和操作。
- 计算分期付款占比,先统计分期付款订单数
fenQiNum
,再除以总订单数totalNum
。
三、日志分析案例
(一)需求分析
- 统计热门搜索词 Top10,即统计用户搜索每个词出现的次数,然后降序排序取前 10。
- 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、最小和平均点击次数。
- 统计一天每小时点击量并按照点击量降序排序,即统计每个小时点击的数据量并按降序排列。
(二)jieba分词器
汉语是需要分词的
python语言: Jieba 分词器
Java语言: IK 分词器(好久没更新过了)
安装一下
pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/
没有自定版本,安装的就是最新的版本
使用
语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")
测试
import jieba
# 测试一下结巴分词器
str = "中华人民共和国"
list01 = jieba.cut(str, cut_all=True)
# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国
print(",".join(list01))
# 中华人民共和国
list02 = jieba.cut(str, cut_all=False)
for ele in list02:
print(ele)
# 中华 华人 人民 共和 共和国 中华人民共和国 比全模式少多,比精确模式多,适用于搜索引擎
list03 = jieba.cut_for_search(str)
print(*list03)
(四)代码实现
以下是实现日志分析功能的 PySpark 代码:
import os
import re
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jieba
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取 conf 对象
conf = SparkConf().setMaster("local[*]").setAppName("")
# 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字
sc = SparkContext(conf=conf)
# print(sc)
# 清洗数据
print("===========清洗数据===========")
fileRdd = sc.textFile("../../datas/sogou/sogou.tsv")
print(fileRdd.count())
print(fileRdd.first())
listRdd = fileRdd.map(lambda line: re.split("\\s+", line))
filterList = listRdd.filter(lambda l1: len(l1) == 6)
# 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了
tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))
# 求热词top10
print("===========求热词top10===========")
wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))
filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter(
lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None)
# filterRdd2.foreach(print)
result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy(
keyfunc=lambda tup: tup[1], ascending=False).take(10)
for ele in result:
print(ele)
# 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数
print("===========统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数===========")
def splitWord(tupl):
li1 = jieba.cut_for_search(tupl[2]) # 中国 中华 共和国
li2 = list()
for word in li1:
li2.append(((tupl[1], word), 1))
return li2
newRdd = tupleRdd.flatMap(splitWord)
# newRdd.foreach(print)
reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num)
# reduceByUIDAndWordRdd.foreach(print)
valList = reduceByUIDAndWordRdd.values()
print(f"最大点击次数: {valList.max()}")
print(f"最小点击次数: {valList.min()}")
print(f"中位数: {valList.mean()}") # 中位数
print(f"平均点击次数: {valList.sum() / valList.count()}")
# 统计一天每小时点击量并按照点击量降序排序
print("===========统计一天每小时点击量并按照点击量降序排序===========")
reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num)
sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False)
listNum = sortRdd.take(24)
for ele in listNum:
print(ele)
# 使用完后,记得关闭
sc.stop()
(三)代码解析
- 配置环境变量后创建
SparkContext
对象。 - 定义
getWords
函数,用于将搜索词进行分词并构建((用户id,词), 1)
的格式。 - 读取日志数据文件,进行数据清洗,排除数据长度不足 6 的行和包含特定违禁词的行,然后提取相关字段得到
mapRdd
。 - 对于热词 Top10 的统计,先对热词进行分词,过滤掉特定词和非中文词,然后映射为
(词, 1)
格式,通过reduceByKey
统计词频,最后按词频降序排序并取前 10。 - 统计最大、最小和平均点击次数时,先通过
flatMap
和getWords
函数构建((用户id,词),点击次数)
格式的数据,过滤掉非中文词和特定词后,通过reduceByKey
统计点击次数,再获取值并计算相关统计量。 - 统计一天每小时点击量时,先提取小时信息并映射为
(小时, 1)
格式,通过reduceByKey
统计每小时点击量,最后按点击量降序排序并收集结果。
四、常见错误及解决方法
在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error
错误。
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)
原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take()
算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take
算子。
五、总结
通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、映射、分组求和、排序以及特定数据统计等常见操作。同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。
原文地址:https://blog.csdn.net/weixin_64726356/article/details/143647366
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!