自学内容网 自学内容网

使用Dify访问数据库(mysql)

1、在本地搭建数据库访问的服务,并使用ngrok暴露到公网。

#sql_tools.py

from flask import Flask, request, jsonify
import mysql.connector

# 数据库连接配置
config = {
    'user': 'your_username',
    'password': 'your_password',
    'host': 'localhost',
    'database': 'your_database',
    'raise_on_warnings': True
}

# 初始化Flask应用
app = Flask(__name__)

# 连接数据库
def connect_to_database():
    try:
        conn = mysql.connector.connect(**config)
        print("Connected to MySQL database")
        return conn
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        return None

# 执行SQL查询
def execute_query(conn, sql):
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        if sql.strip().lower().startswith("select"):
            # 如果是查询操作,返回结果
            result = cursor.fetchall()
            return result
        else:
            # 如果是插入、更新、删除操作,提交事务并返回受影响的行数
            conn.commit()
            return cursor.rowcount
    except mysql.connector.Error as err:
        print(f"Error executing SQL: {err}")
        return None
    finally:
        cursor.close()

# HTTP接口:执行SQL
@app.route('/execute', methods=['POST'])
def execute_sql():
    # 获取请求中的SQL语句
    data = request.json
    if not data or 'sql' not in data:
        return jsonify({"error": "SQL statement is required"}), 400

    sql = data['sql']
    conn = connect_to_database()
    if not conn:
        return jsonify({"error": "Failed to connect to database"}), 500

    # 执行SQL
    result = execute_query(conn, sql)
    conn.close()

    if result is None:
        return jsonify({"error": "Failed to execute SQL"}), 500

    # 返回结果
    return jsonify({"result": result})

# 启动Flask应用
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3000)

2、创建知识库,导入表结构描述。

3、创建数据库访问工作流。

代码执行:


import requests
def main(sql: str) -> dict:
    # 定义API的URL
    url = "https://xxx.ngrok-free.app/execute"

    # 构造请求体
    payload = {
        "sql": sql
    }

    # 发送POST请求
    try:
        response = requests.post(url, json=payload)
    
        # 检查响应状态码
        if response.status_code == 200:
            # 解析响应数据
            result = response.json()
            return {
                "result": f"{result}"
            }
        else:
            return {
                "result": f"请求失败,状态码:{response.status_code},{response.json()}"
            }
    except requests.exceptions.RequestException as e:
        return {
            "result": f"请求异常:{e}"
        }

4、创建数据库智能体


原文地址:https://blog.csdn.net/wxl781227/article/details/145211105

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!