自学内容网 自学内容网

python构建一个带有服务器的双向视频通话Demo

server.py

import socket  
import threading  
import struct  
import pickle  
import time  

class MediaServer:  
    def __init__(self, host='0.0.0.0', port=5000):  
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
        self.server_socket.bind((host, port))  
        self.server_socket.listen(2)  
        self.PASSWORD = "123456"  
        
        self.clients = []  
        self.client_lock = threading.Lock()  
        self.running = True  
        
        print(f"服务器启动在 {host}:{port}")  

    def handle_client(self, client_socket, client_address):  
        """处理客户端连接"""  
        try:  
            # 密码验证  
            password = client_socket.recv(1024).decode()  
            if password != self.PASSWORD:  
                client_socket.send("验证失败".encode())  
                return  
            client_socket.send("验证成功".encode())  
            
            with self.client_lock:  
                self.clients.append(client_socket)  
                # 如果有两个客户端连接,发送配对成功消息  
                if len(self.clients) == 2:  
                    for client in self.clients:  
                        data = pickle.dumps({"type": "connection_status", "status": "paired"})  
                        client.sendall(struct.pack("L", len(data)) + data)  
            
            # 开始转发数据  
            while self.running:  
                try:  
                    # 接收数据大小  
                    packed_size = client_socket.recv(struct.calcsize("L"))  
                    if not packed_size:  
                        break  
                    msg_size = struct.unpack("L", packed_size)[0]  
                    
                    # 接收数据  
                    data = b""  
                    while len(data) < msg_size:  
                        packet = client_socket.recv(msg_size - len(data))  
                        if not packet:  
                            break  
                        data += packet  
                    
                    if not data:  
                        break  
                    
                    # 转发给另一个客户端  
                    with self.client_lock:  
                        for other_client in self.clients:  
                            if other_client != client_socket:  
                                try:  
                                    other_client.sendall(packed_size + data)  
                                except:  
                                    pass  
                                    
                except Exception as e:  
                    print(f"处理数据时出错: {e}")  
                    break  
                    
        except Exception as e:  
            print(f"客户端处理错误: {e}")  
        finally:  
            # 清理断开的客户端  
            with self.client_lock:  
                if client_socket in self.clients:  
                    self.clients.remove(client_socket)  
                    # 通知其他客户端连接断开  
                    for other_client in self.clients:  
                        try:  
                            data = pickle.dumps({"type": "connection_status", "status": "disconnected"})  
                            other_client.sendall(struct.pack("L", len(data)) + data)  
                        except:  
                            pass  
            
            client_socket.close()  
            print(f"客户端 {client_address} 断开连接")  

    def run(self):  
        """运行服务器"""  
        try:  
            while self.running:  
                client_socket, client_address = self.server_socket.accept()  
                print(f"新客户端连接: {client_address}")  
                
                # 为新客户端创建处理线程  
                client_thread = threading.Thread(  
                    target=self.handle_client,  
                    args=(client_socket, client_address)  
                )  
                client_thread.daemon = True  
                client_thread.start()  
                
        except Exception as e:  
            print(f"服务器错误: {e}")  
        finally:  
            self.running = False  
            self.server_socket.close()  
            print("服务器已关闭")  

if __name__ == '__main__':  
    server = MediaServer()  
    try:  
        server.run()  
    except KeyboardInterrupt:  
        print("服务器正在关闭...")  
        server.running = False

client.py

import socket  
import threading  
import struct  
import pickle  
import numpy as np  
import pyaudio  
import time  
import io  
from PIL import Image, ImageDraw, ImageTk  
import tkinter as tk  
from tkinter import ttk, messagebox  

class MediaClient:  
    def __init__(self, host='127.0.0.1', port=5000):  
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
        self.PASSWORD = "123456"  
        self.running = True  
        self.peer_connected = False  
        
        # 视频设置  
        self.frame_width = 640  
        self.frame_height = 480  
        self.fps = 20  
        self.frame_interval = 1.0 / self.fps  
        self.last_frame_time = 0  
        
        # 音频设置  
        self.CHUNK = 2048  
        self.FORMAT = pyaudio.paInt16  
        self.CHANNELS = 1  
        self.RATE = 44100  
        
        # 创建GUI窗口  
        self.setup_gui()  
        
        # 初始化设备  
        self.init_devices()  
        
        # 连接服务器  
        try:  
            print(f"正在连接到服务器 {host}:{port}...")  
            self.client_socket.connect((host, port))  
            self.client_socket.settimeout(1.0)  
            
            # 验证密码  
            self.client_socket.send(self.PASSWORD.encode())  
            response = self.client_socket.recv(1024).decode()  
            if response != "验证成功":  
                raise Exception("密码验证失败")  
            print("连接成功!等待对方连接...")  
            
        except Exception as e:  
            print(f"连接错误: {e}")  
            self.running = False  
            raise  

    def setup_gui(self):  
        """设置GUI界面"""  
        self.root = tk.Tk()  
        self.root.title("视频聊天")  
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)  

        # 创建状态标签  
        self.status_label = ttk.Label(self.root, text="等待连接...")  
        self.status_label.pack(pady=5)  

        # 创建视频显示标签  
        self.video_label = ttk.Label(self.root)  
        self.video_label.pack(padx=10, pady=10)  

        # 创建退出按钮  
        self.quit_button = ttk.Button(self.root, text="退出", command=self.on_closing)  
        self.quit_button.pack(pady=5)  

    def update_status(self, text):  
        """更新状态显示"""  
        self.root.after(0, lambda: self.status_label.config(text=text))  

    def on_closing(self):  
        """窗口关闭处理"""  
        if messagebox.askokcancel("退出", "确定要退出吗?"):  
            self.running = False  
            self.root.quit()  
            self.root.destroy()  

    def init_devices(self):  
        """初始化设备"""  
        # 初始化音频  
        self.audio = pyaudio.PyAudio()  
        
        # 检查麦克风  
        self.has_mic = False  
        try:  
            test_stream = self.audio.open(  
                format=self.FORMAT,  
                channels=self.CHANNELS,  
                rate=self.RATE,  
                input=True,  
                frames_per_buffer=self.CHUNK  
            )  
            test_stream.close()  
            self.has_mic = True  
            print("检测到麦克风")  
        except:  
            print("未检测到麦克风,将禁用音频输入")  

        # 检查扬声器  
        self.has_speaker = False  
        try:  
            test_stream = self.audio.open(  
                format=self.FORMAT,  
                channels=self.CHANNELS,  
                rate=self.RATE,  
                output=True,  
                frames_per_buffer=self.CHUNK  
            )  
            test_stream.close()  
            self.has_speaker = True  
            print("检测到扬声器")  
        except:  
            print("未检测到扬声器,将禁用音频输出")  

        # 检查摄像头  
        self.has_camera = False  
        try:  
            import cv2  
            self.cv2 = cv2  
            self.camera = self.cv2.VideoCapture(0)  
            if self.camera.isOpened():  
                self.has_camera = True  
                print("检测到摄像头")  
            else:  
                raise Exception("摄像头打开失败")  
        except:  
            print("未检测到摄像头,将使用黑色画面")  
            self.create_dummy_frame()  

    def create_dummy_frame(self):  
        """创建一个带有文字的黑色画面"""  
        image = Image.new('RGB', (self.frame_width, self.frame_height), color='black')  
        draw = ImageDraw.Draw(image)  
        text = "No Camera Available"  
        draw.text((self.frame_width//3, self.frame_height//2), text, fill='white')  
        self.dummy_frame = np.array(image)  

    def get_video_frame(self):  
        """获取视频帧"""  
        if self.has_camera:  
            ret, frame = self.camera.read()  
            if ret:  
                frame = self.cv2.resize(frame, (self.frame_width, self.frame_height))  
                return frame  
        return self.dummy_frame.copy()  

    def encode_frame(self, frame):  
        """编码视频帧"""  
        if isinstance(frame, np.ndarray):  
            if self.has_camera:  
                frame = self.cv2.cvtColor(frame, self.cv2.COLOR_BGR2RGB)  
            image = Image.fromarray(frame)  
        else:  
            image = frame  

        # 压缩图像  
        image = image.resize((self.frame_width // 2, self.frame_height // 2))  
        
        with io.BytesIO() as bio:  
            image.save(bio, format='JPEG', quality=40)  
            buffer = np.frombuffer(bio.getvalue(), dtype=np.uint8)  
        return buffer  

    def update_video_display(self, frame_data):  
        """更新视频显示"""  
        try:  
            image = Image.open(io.BytesIO(frame_data))  
            image = image.resize((self.frame_width, self.frame_height))  
            photo = ImageTk.PhotoImage(image)  
            self.video_label.configure(image=photo)  
            self.video_label.image = photo  
        except Exception as e:  
            print(f"显示错误: {e}")  

    def send_media(self):  
        """发送视频和音频数据"""  
        if self.has_mic:  
            audio_stream = self.audio.open(  
                format=self.FORMAT,  
                channels=self.CHANNELS,  
                rate=self.RATE,  
                input=True,  
                frames_per_buffer=self.CHUNK  
            )  

        while self.running:  
            try:  
                if not self.peer_connected:  
                    time.sleep(0.5)  
                    continue  

                current_time = time.time()  
                
                # 控制视频帧率  
                if current_time - self.last_frame_time >= self.frame_interval:  
                    frame = self.get_video_frame()  
                    buffer = self.encode_frame(frame)  
                    data = pickle.dumps({"type": "video", "data": buffer})  
                    self.client_socket.sendall(struct.pack("L", len(data)) + data)  
                    self.last_frame_time = current_time  

                # 发送音频  
                if self.has_mic:  
                    audio_data = audio_stream.read(self.CHUNK, exception_on_overflow=False)  
                    data = pickle.dumps({"type": "audio", "data": audio_data})  
                    self.client_socket.sendall(struct.pack("L", len(data)) + data)  

                time.sleep(0.001)  

            except Exception as e:  
                print(f"发送错误: {e}")  
                break  

        if self.has_mic:  
            audio_stream.stop_stream()  
            audio_stream.close()  

    def receive_media(self):  
        """接收视频和音频数据"""  
        if self.has_speaker:  
            audio_stream = self.audio.open(  
                format=self.FORMAT,  
                channels=self.CHANNELS,  
                rate=self.RATE,  
                output=True,  
                frames_per_buffer=self.CHUNK  
            )  

        while self.running:  
            try:  
                # 接收数据大小  
                packed_size = self.client_socket.recv(struct.calcsize("L"))  
                if not packed_size:  
                    break  
                msg_size = struct.unpack("L", packed_size)[0]  

                # 接收数据  
                data = b""  
                while len(data) < msg_size:  
                    packet = self.client_socket.recv(msg_size - len(data))  
                    if not packet:  
                        break  
                    data += packet  

                # 解析数据  
                received_data = pickle.loads(data)  
                data_type = received_data["type"]  
                
                if data_type == "connection_status":  
                    status = received_data["status"]  
                    if status == "paired":  
                        self.peer_connected = True  
                        self.update_status("对方已连接")  
                        print("对方已连接")  
                    elif status == "disconnected":  
                        self.peer_connected = False  
                        self.update_status("对方已断开连接")  
                        print("对方已断开连接")  
                
                elif data_type == "video":  
                    frame_data = received_data["data"]  
                    self.root.after(0, self.update_video_display, frame_data)  
                
                elif data_type == "audio" and self.has_speaker:  
                    audio_data = received_data["data"]  
                    audio_stream.write(audio_data)  

            except socket.timeout:  
                continue  
            except Exception as e:  
                print(f"接收错误: {e}")  
                break  

        if self.has_speaker:  
            audio_stream.stop_stream()  
            audio_stream.close()  

    def run(self):  
        """运行客户端"""  
        try:  
            # 创建发送和接收线程  
            send_thread = threading.Thread(target=self.send_media)  
            receive_thread = threading.Thread(target=self.receive_media)  
            
            send_thread.daemon = True  
            receive_thread.daemon = True  
            
            send_thread.start()  
            receive_thread.start()  
            
            # 运行GUI主循环  
            self.root.mainloop()  
            
        except Exception as e:  
            print(f"运行错误: {e}")  
        finally:  
            self.running = False  
            if self.has_camera:  
                self.camera.release()  
            self.audio.terminate()  
            self.client_socket.close()  
            print("已关闭所有连接和设备")  

if __name__ == '__main__':  
    try:  
        client = MediaClient()  
        client.run()  
    except Exception as e:  
        print(f"错误: {e}")

开发耗时两小时,有一些想不到的坑,在此记录一下,帮助有需要的人。


原文地址:https://blog.csdn.net/u011471253/article/details/143630345

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!