自学内容网 自学内容网

flask websocket服务搭建,flask-sock 和 flask-socketio

flask websocket服务搭建,flask-sock 和 flask-socketio

首先说下socket、socketio 和 websockets 之间的区别

socket是网络上运行的两个程序之间的双向通信链路的一个端点。这是一个非常低级的东西,其他一切都是在 TCP 套接字之上实现的。 WebSocket 是 Web 的标准通信协议。它允许在客户端和服务器之间建立全双工通信通道。 Socket.IO 是一种基于 HTTP 和 WebSocket 构建的通信协议,提供自动重新连接、基于事件的通知等额外功能。 Flask-SocketIO 是 Socket.IO 服务器端协议的实现,作为 Flask 扩展。

两个主流框架flask-sock 和 flask-socketio

flask-sock是原生的websocket协议,flask-socketio是socketio协议,需要根据你自己的需求选择合适的框架,如果用socketio那么客户端建议也用socketio库进行连接

websocket收发的都是原生的数据包

而socketio在原生的数据包上还加了协议数据比如状态码和监听事件等

特性WebSocketSocket.IO
连接协议基于WebSocket协议封装的WebSocket协议,提供更多功能
数据包结构包含Opcode, Payload Length, Mask, Payload Data包含Event name和Data
功能定义标准的双向通信,没有内建的重连或事件支持支持事件、重连、自动重试、命名空间等功能
发送和接收方式通过raw WebSocket send和onmessage直接发送和接收封装的数据通过emit和on方法,使用事件名来识别
支持的传输方式只支持WebSocket支持多个传输方式(WebSocket, Polling等)

安装两个库

pip install flask-socketio flask-sock

flask-sock例子

服务端example

from flask import Flask, render_template
from flask_sock import Sock

app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}

sock = Sock(app)


@app.route('/')
def index():
    return render_template('index.html')


@sock.route('/echo')
def echo(ws):
    while True:
        data = ws.receive()
        if data == 'close':
            break
        ws.send(data)


if __name__ == '__main__':
    app.run()

客户端example

from flask import Flask, render_template
from flask_sock import Sock

app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}

sock = Sock(app)


@app.route('/')
def index():
    return render_template('index.html')


@sock.route('/echo')
def echo(ws):
    while True:
        data = ws.receive()
        if data == 'close':
            break
        ws.send(data)


if __name__ == '__main__':
    app.run()

flask-socketio例子

服务端example

from flask import Flask, render_template
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'  # 设置一个密钥用于会话、Cookie等

socketio = SocketIO(app)

@app.route('/')
def index():
    return render_template('index.html')

@socketio.on('message')
def handle_message(message):
    print('Received message: ' + message)
    emit('message', message)  # Echo the received message back

@socketio.on('close')
def handle_close():
    print('Closing connection')

if __name__ == '__main__':
    socketio.run(app)

客户端example

import time
import socketio

# 创建 SocketIO 客户端实例
sio = socketio.Client()

@sio.event
def connect():
    print("Connection established")

@sio.event
def message(data):
    print(f"Message received: {data}")

@sio.event
def disconnect():
    print("Disconnected from server")

# 连接到服务器
sio.connect('http://localhost:5000')

# 发送消息
sio.send("Hello, Socket.IO!")

# 等待一些时间以接收消息
time.sleep(1)

# 发送关闭消息并断开连接
sio.disconnect()

如果要用websocket格式来测试这个socketio接口

import websockets
import asyncio
import json
import time
import random

async def test_socketio():
    # Socket.IO 在 WebSocket URL 上添加特定的路径和参数
    uri = "ws://localhost:5000/socket.io/?EIO=4&transport=websocket"
    
    async with websockets.connect(uri) as ws:
        print("Connected to server")
        
        # 发送 Socket.IO 握手消息
        await ws.send("40")  # Socket.IO v4 握手消息
        
        # 发送消息
        message = {
            "type": "message",
            "data": "Hello, Socket.IO!"
        }
        
        # Socket.IO 消息格式: "42" + JSON数组 [event, data]
        socketio_msg = f'42{json.dumps(["message", message])}'
        await ws.send(socketio_msg)
        
        # 接收消息
        try:
            while True:
                response = await asyncio.wait_for(ws.recv(), timeout=2)
                print(f"Received: {response}")
                
                # 如果是 PING 消息(2),回应 PONG(3)
                if response.startswith("2"):
                    await ws.send("3")
                    continue
                
                # 解析常规消息
                if response.startswith("42"):
                    try:
                        data = json.loads(response[2:])
                        print(f"Parsed message: {data}")
                    except json.JSONDecodeError:
                        print("Failed to parse message")
        
        except asyncio.TimeoutError:
            print("No more messages")
        
        # 发送关闭消息
        await ws.send("41")  # Socket.IO 关闭消息

if __name__ == "__main__":
    asyncio.run(test_socketio())

原文地址:https://blog.csdn.net/stephen147/article/details/143428614

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!