自学内容网 自学内容网

Flask 与 SocketIO 正确初始化及最佳实践调试

1、问题

我使用Flask和Flask-SocketIO 来做 Websocket 链接。前期正常使用,但是后期布置修改什么导致Websocket连接失败。排查需求,才发现初始化不正常导致。

SocketIO 和 Flask 应用的初始化顺序和引用循环的问题

2、环境

python-engineio==4.11.1
python-socketio==5.12.0
Flask-SocketIO==5.3.6

3、正常初始化【单文件】

from flask import Flask, render_template
from flask_socketio import SocketIO

app = Flask(__name__)
socketio = SocketIO(app)  # 初始化 SocketIO

# 默认路由,用于渲染 HTML 页面
@app.route('/')
def index():
    return render_template('index.html')

# 出现消息后,率先执行此处
@socketio.on("message", namespace="/ws")
def socket(message):
    print(f"接收到消息: {message}")
    for i in range(1, 10):
        socketio.sleep(1)
        print(f"发送消息: {i}")
        socketio.emit("response",           # 绑定通信
                      {"data": i},           # 返回socket数据
                      namespace="/ws")


# 当websocket连接成功时,自动触发connect默认方法
@socketio.on("connect", namespace="/ws")
def connect():
    print("链接建立成功..")


# 当websocket连接失败时,自动触发disconnect默认方法
@socketio.on("disconnect", namespace="/ws")
def disconnect():
    print("链接建立失败..")

if __name__ == '__main__':
    socketio.run(app, debug=True, host='0.0.0.0', port=5000,allow_unsafe_werkzeug=True)

4、多文件初始化【两个文件】

主要是 manage.py 和 init.py 两个文件。
在这里插入图片描述
app/init.py

from flask import Flask
from flask_socketio import SocketIO
from flasgger import Swagger
from flask_cors import CORS
from flask_migrate import Migrate
from app.config import config
from app.extension import db

# 创建一个全局的 socketio 对象,但不立即初始化
socketio = SocketIO()

# 其他 swagger 配置保持不变...

def create_app(DevelopmentConfig=None):
    if DevelopmentConfig is None:
        DevelopmentConfig = 'development'

    app = Flask(__name__)
    app.config['SECRET_KEY'] = '123456'
    
    # 加载配置项
    app.config.from_object(config.get(DevelopmentConfig))
    
    from app.api import config_blueprint
    # 注册蓝图
    config_blueprint(app)
    
    # 数据库配置初始化
    config_extensions(app)
    
    # 数据库迁移相关代码...
    from app.api.models.EsModels import sysEsLogs,sysCretitLogs
    from app.api.models.NetModels import sysNetLogs
    from app.api.models.UpsModels import sysUpsLogs
    migrate = Migrate(app, db)
    
    # Swagger初始化
    Swagger(app, config=swagger_config, template=swagger_template)
    
    # CORS配置
    CORS(app, resources={r'/*': {'origins': '*'}}, supports_credentials=True)
    
    # 初始化 SocketIO
    socketio.init_app(app, cors_allowed_origins="*")
    
    return app

socket_events.py

from app import socketio

# Socket.IO 事件处理
@socketio.on("message", namespace="/ws")
def socket(message):
    print(f"接收到消息: {message}")
    for i in range(1, 10):
        socketio.sleep(1)
        print(f"发送消息: {i}")
        socketio.emit("response",
                     {"data": i},
                     namespace="/ws")

@socketio.on("connect", namespace="/ws")
def connect():
    print("链接建立成功..")

@socketio.on("disconnect", namespace="/ws")
def disconnect():
    print("链接建立失败..") 

manage.py

# -*- coding: utf-8 -*-
# @Time    : 2024/5/2 12:01
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : manage.py
# @Software: PyCharm
import atexit
import os
import sys

from apscheduler.schedulers.background import BackgroundScheduler

from app import create_app, socketio
from app.common.util.LogHandler import log
from app.crontab.NetCronTab import NetworkMonitor
from app.crontab.UpsCronTab import UPSMonitor
from app.socket_events import *
# 默认为开发环境,按需求修改 production  development
config_name = 'development'

app = create_app(config_name)
# 解决中文乱码
# app.json.ensure_ascii = False

# from skywalking import agent, config
# config.init(agent_collector_backend_services='192.168.82.105:11800', agent_name='python-flask@devTenant',agent_instance_name="python-flask")
# agent.start()

# 数据库迁移
def start_scheduler():
    # 创建一个后台调度器
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
    from app.crontab.EsCronTab import EsIndexCron
    scheduler.add_job(func=EsIndexCron, trigger='interval', minutes=1)
    log.info("EsIndexCron 定时任务启动成功")


    # scheduler.add_job(func=send_alert, trigger="interval", seconds=20)
    # 启动调度器
    scheduler.start()
    atexit.register(lambda: scheduler.shutdown())


if __name__ == '__main__':
    # 获取当前文件的绝对路径
    current_file = os.path.abspath(__file__)
    base_dir = os.path.dirname(current_file)
    # 将项目目录添加到 sys.path
    if base_dir not in sys.path:
        sys.path.append(base_dir)
    if not app.debug or app.testing:
        start_scheduler()
        log.info("定时任务启动成功")
    socketio.run(app, debug=True, use_reloader=True, host='0.0.0.0', port=5001)
    # use_reloader=False, threaded=True,

在这里插入图片描述

在postman输入地址和监听事件

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述


原文地址:https://blog.csdn.net/heian_99/article/details/144831258

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!