PySpark3.4.4_基于StreamingContext实现网络字节流统计分析
网络字节流与嵌套字节流的区别
-
概念解释
- 网络嵌套字节流:
- 在网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
- 套字节流:
- 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
- 网络嵌套字节流:
-
应用场景区别
- 网络嵌套字节流:
- 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
- 套字节流:
- 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
- 网络嵌套字节流:
-
处理方式区别
- 网络嵌套字节流:
- 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
- 套字节流:
- 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。
- 网络嵌套字节流:
PySpark代码开发
需要在ubuntu环境下或windows环境下,提前安装好spark执行环境
软件说明:
- spark 3.4.4
- python 3.9.20
- java jdk1.8.0_431
代码说明
DataSourceSoket.py 用于模拟生成实时字节流数据的脚本
# coding:utf8
import random
from socket import socket
server = socket()
server.bind(('localhost', 9999))
server.listen(1)
while True:
# 为了方便识别,输出一个"I'm waiting the connect ..."
print("I'm waiting the connect ...")
conn, addr = server.accept()
print("Connected by {0}".format(addr))
print(f"Connected by {addr}")
# 输出发送数据
# 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出
# 步骤1:创建一个列表作为数据容器
data_container = []
# 步骤2:向列表中添加10条不同的中文数据
chinese_data = [
"你好,世界",
"今天天气真好",
"学习是一件快乐的事",
"分享知识,传递快乐",
"探索未知的世界",
"坚持就是胜利",
"努力不懈,梦想终会实现",
"失败乃成功之母",
"平凡造就非凡",
"相信自己,你是最棒的",
"I like Spark",
"I like Flink",
"I like Hadoop"
]
data_container.extend(chinese_data)
# 步骤3:使用random.choice()随机选择并输出一条数据
random_item = random.choice(data_container)
print(random_item)
conn.sendall(random_item.encode())
conn.close()
print("Connection closed")
pysparkStreamingNetwordCountCN.py SparkStreaming处理实时数据流
# coding:utf8
from __future__ import print_function
import os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431' # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'
# 加载停用词表
def load_stopwords(file_path):
"""
从指定文件或文件夹中加载停用词列表。
参数:
file_path (str): 停用词文件或文件夹的路径。
返回:
set: 包含停用词的集合。
"""
stopwords = set()
try:
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
stopwords.update(line.strip() for line in f)
elif os.path.isdir(file_path):
for filename in os.listdir(file_path):
file_full_path = os.path.join(file_path, filename)
if os.path.isfile(file_full_path):
with open(file_full_path, 'r', encoding='utf-8') as f:
stopwords.update(line.strip() for line in f)
else:
print(f"Error: The path {file_path} is neither a file nor a directory.")
except FileNotFoundError:
print(f"Error: The file or directory {file_path} does not exist.")
except PermissionError:
print(f"Error: Permission denied for the file or directory {file_path}.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return stopwords
# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3]) # 或 'path/to/stopwords_folder'
def sparkstreamingnetworkcount():
global sc, ssc, lines
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
def split_words(line):
try:
# 使用 jieba 进行中文分词
chinese_words = jieba.lcut(line.strip())
# 使用空格进行英文分词
english_words = line.strip().split(" ")
# 合并分词结果并过滤掉空字符串
words = set(chinese_words + english_words) - {''}
# 过滤掉停用词
filtered_words = [word.lower() for word in words if word not in stopwords]
return filtered_words
except Exception as e:
print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)
return []
counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)
exit(-1)
sparkstreamingnetworkcount()
运行时的运行参数配置
运行结果如下
DataSourceSoket.py
pysparkStreamingNetwordCountCN.py 运行结果
注意事项:
1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py
原文地址:https://blog.csdn.net/pblh123/article/details/144286851
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!