flask websocket服务搭建,flask-sock 和 flask-socketio
flask websocket服务搭建,flask-sock 和 flask-socketio
首先说下socket、socketio 和 websockets 之间的区别
socket是网络上运行的两个程序之间的双向通信链路的一个端点。这是一个非常低级的东西,其他一切都是在 TCP 套接字之上实现的。 WebSocket 是 Web 的标准通信协议。它允许在客户端和服务器之间建立全双工通信通道。 Socket.IO 是一种基于 HTTP 和 WebSocket 构建的通信协议,提供自动重新连接、基于事件的通知等额外功能。 Flask-SocketIO 是 Socket.IO 服务器端协议的实现,作为 Flask 扩展。
两个主流框架flask-sock 和 flask-socketio
flask-sock是原生的websocket协议,flask-socketio是socketio协议,需要根据你自己的需求选择合适的框架,如果用socketio那么客户端建议也用socketio库进行连接
websocket收发的都是原生的数据包
而socketio在原生的数据包上还加了协议数据比如状态码和监听事件等
特性 | WebSocket | Socket.IO |
---|---|---|
连接协议 | 基于WebSocket协议 | 封装的WebSocket协议,提供更多功能 |
数据包结构 | 包含Opcode, Payload Length, Mask, Payload Data | 包含Event name和Data |
功能 | 定义标准的双向通信,没有内建的重连或事件支持 | 支持事件、重连、自动重试、命名空间等功能 |
发送和接收方式 | 通过raw WebSocket send和onmessage直接发送和接收封装的数据 | 通过emit和on方法,使用事件名来识别 |
支持的传输方式 | 只支持WebSocket | 支持多个传输方式(WebSocket, Polling等) |
安装两个库
pip install flask-socketio flask-sock
flask-sock例子
服务端example
from flask import Flask, render_template
from flask_sock import Sock
app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}
sock = Sock(app)
@app.route('/')
def index():
return render_template('index.html')
@sock.route('/echo')
def echo(ws):
while True:
data = ws.receive()
if data == 'close':
break
ws.send(data)
if __name__ == '__main__':
app.run()
客户端example
from flask import Flask, render_template
from flask_sock import Sock
app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}
sock = Sock(app)
@app.route('/')
def index():
return render_template('index.html')
@sock.route('/echo')
def echo(ws):
while True:
data = ws.receive()
if data == 'close':
break
ws.send(data)
if __name__ == '__main__':
app.run()
flask-socketio例子
服务端example
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key' # 设置一个密钥用于会话、Cookie等
socketio = SocketIO(app)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('message')
def handle_message(message):
print('Received message: ' + message)
emit('message', message) # Echo the received message back
@socketio.on('close')
def handle_close():
print('Closing connection')
if __name__ == '__main__':
socketio.run(app)
客户端example
import time
import socketio
# 创建 SocketIO 客户端实例
sio = socketio.Client()
@sio.event
def connect():
print("Connection established")
@sio.event
def message(data):
print(f"Message received: {data}")
@sio.event
def disconnect():
print("Disconnected from server")
# 连接到服务器
sio.connect('http://localhost:5000')
# 发送消息
sio.send("Hello, Socket.IO!")
# 等待一些时间以接收消息
time.sleep(1)
# 发送关闭消息并断开连接
sio.disconnect()
如果要用websocket格式来测试这个socketio接口
import websockets
import asyncio
import json
import time
import random
async def test_socketio():
# Socket.IO 在 WebSocket URL 上添加特定的路径和参数
uri = "ws://localhost:5000/socket.io/?EIO=4&transport=websocket"
async with websockets.connect(uri) as ws:
print("Connected to server")
# 发送 Socket.IO 握手消息
await ws.send("40") # Socket.IO v4 握手消息
# 发送消息
message = {
"type": "message",
"data": "Hello, Socket.IO!"
}
# Socket.IO 消息格式: "42" + JSON数组 [event, data]
socketio_msg = f'42{json.dumps(["message", message])}'
await ws.send(socketio_msg)
# 接收消息
try:
while True:
response = await asyncio.wait_for(ws.recv(), timeout=2)
print(f"Received: {response}")
# 如果是 PING 消息(2),回应 PONG(3)
if response.startswith("2"):
await ws.send("3")
continue
# 解析常规消息
if response.startswith("42"):
try:
data = json.loads(response[2:])
print(f"Parsed message: {data}")
except json.JSONDecodeError:
print("Failed to parse message")
except asyncio.TimeoutError:
print("No more messages")
# 发送关闭消息
await ws.send("41") # Socket.IO 关闭消息
if __name__ == "__main__":
asyncio.run(test_socketio())
原文地址:https://blog.csdn.net/stephen147/article/details/143428614
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!