写端口-tcp udp不同方式发包和接包
最近一直在学习网络编程,今天把 socket部分做一个总结。
Python 的socket库可以实现不同协议不同地址的发包和收包,无奈资料很少,官方例子有限,大神博客很少提及, 经过一番尝试后,总结以下几点用法以便大家以后使用。
client端 import socket import time import random import string # 定义一些常量,变量名可以变。自己好记就行 buffer = 1024 # 发送缓冲区大小,这里是1KB data_size = 128 # 每个UDP数据包的大小 bandwidth = 1 # 目标带宽,1 Mbps PACKETS_PER_SECOND = bandwidth * 1024 * 1024 / (8 * data_size) # 计算每秒需要发送的数据包数量 # 目标服务器的IP和端口 SERVER_IP = '127.0.0.1' # 请替换为实际服务器IP SERVER_PORT = 1245 # 请替换为实际端口 def generate_random_data(size): """生成随机数据,生成往UDP包中放的随机值""" return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(size)).encode() def udp_main(): # 创建UDP套接字 client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) print(f"Starting UDP bandwidth test to {SERVER_IP}:{SERVER_PORT} aiming for {bandwidth} Mbps...") start_time = time.time() packets_sent = 0 try: while True: # 生成随机数据 data = generate_random_data(data_size) # 发送数据包 client_socket.sendto(data, (SERVER_IP, SERVER_PORT)) #包计数 packets_sent += 1 # 控制发送速率以达到目标带宽 elapsed_time = time.time() - start_time if elapsed_time >= 1: #实际带宽,怎么算需要记一下 actual_bandwidth = (packets_sent * data_size * 8) / (elapsed_time * 1024 * 1024) print(f"Actual bandwidth: {actual_bandwidth:.2f} Mbps") #重新计算包 packets_sent = 0 start_time = time.time() time.sleep(1 / PACKETS_PER_SECOND) # 等待,控制发送频率 except KeyboardInterrupt: print("\nBandwidth test interrupted by user.") finally: client_socket.close() def tcp_main(): client_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client_socket.connect((SERVER_IP, SERVER_PORT)) start_time = time.time() packets_sent = 0 try: while True: data = generate_random_data(data_size) client_socket.send(data) packets_sent += 1 elapsed_time = time.time() - start_time if elapsed_time >= 1: client_socket.send("q".encode()) packets_sent = 0 start_time = time.time() acutalwidth = (packets_sent * data_size * 8) / (elapsed_time * 1024 * 1024) print(f"Actual bandwidth: {acutalwidth:.2f} Mbps") time.sleep(1/PACKETS_PER_SECOND) except KeyboardInterrupt: print("\nBandwidth test interrupted by user.") finally: client_socket.close() if __name__ == "__main__": tcp_main(
server端
import socket def Server_Socket(): serverip = "0.0.0.0" serverport = 8080 #1、2 创建UDP的socket server_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) server_socket.bind((serverip,serverport)) print(f"udp server {serverip}:{serverport}") try: while True: #3、接收客户端发来的值 data,addr = server_socket.recvfrom(1024) print(f"recv from {addr}:{data.decode('utf-8')}") #print的内容不用背 except KeyboardInterrupt: print("^C received, shutting down the server") #无所谓不用背具体详情 finally: server_socket.close() def TCP_socket(): serverip = "127.0.0.1" serverport = 1245 server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server_socket.bind((serverip,serverport)) print(f"start listening:") server_socket.listen(1024) print(f"tcp socket: {serverip}:{serverport}") try: dataconn,addr = server_socket.accept() print(f"client{dataconn}:{addr}") while True: data = dataconn.recv(1024) print(f"recv from {addr},recv data:{data.decode('utf-8')}") if data.decode('utf-8') == "q": break dataconn.send("已送达".encode('utf-8')) except KeyboardInterrupt: print("^C received, shutting down the server") finally: server_socket.close() if __name__ == "__main__": TCP_socket()
原文地址:https://blog.csdn.net/u013184378/article/details/142844219
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!