本地化部署 私有化大语言模型
本地化部署 私有化大语言模型
本地化部署 私有化大语言模型
本文介绍了如何使用 Gradio 构建一个基于深度学习的文件上传与知识库查询系统。通过结合 FAISS 和 Sentence-Transformers,系统可以高效地处理和检索大量文档,并基于用户的问题生成回答。通过 Gradio 提供了一个简单直观的用户界面,便于用户上传文件和与智能助理进行对话。
Anaconda 环境
搭建运行
创建 Anaconda 新环境
打开 Terminal(终端)
打开工程位置:cd D:\Unity\ChatGLM\PrivateAgent\Private-Agent (这是我的位置)
下载环境依赖:pip install -r requirements.txt
下面是 requirements.txt 完整代码
# basic requirements
protobuf>=3.20.0
transformers>=4.41.0
tokenizers>=0.19.1
cpm_kernels>=1.0.11
torch>=2.5.1
gradio>=5.6.0
sentencepiece>=0.2.0
sentence_transformers>=3.3.1
accelerate>=1.1.1
streamlit>=1.40.1
fastapi>=0.115.5
loguru~=0.7.2
mdtex2html>=1.3.0
latex2mathml>=3.77.0
jupyter_client>=8.6.3
faiss-cpu>=1.9.0
peft>=0.13.2
pandas>=2.2.3
python-docx>=1.1.2
python-pptx>=1.0.2
openpyxl>=3.1.5
pymupdf>=1.24.14
frontend>=0.0.3
tools>=0.1.9
这是下载完的样子
下载 CUDA:
方法 1 :pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
方法 2 :conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia
这是 CUDA 下载好的样子
运行试试:python web_demo_Agent.py
可用性为 True 就是加载上了,我的显卡是 RTX 4080 16G显存
我这里加载的模型是:ChatGlm3-6B和ChatGlm3-6B-32K
到这里就可以访问语言模型了
代码概述
本系统基于以下几项关键技术:
Gradio:构建交互式Web界面的Python库。
深度学习模型:使用 transformers 和 peft 库加载和应用预训练的自然语言处理模型,如ChatGLM-6B。
FAISS:用于高效地进行相似度检索和文档索引。
Sentence-Transformers:将文档转换为向量形式,以便于在FAISS中进行高效检索。
多种文件格式支持:支持 .txt, .pdf, .docx, .pptx, .xlsx 等多种文件格式的上传与处理。
环境配置
安装依赖
确保安装所需的库:
gradio、torch、sentence-transformers、faiss-cpu、pandas、python-docx
python-pptx、openpyxl、PyMuPDF
大家如果创建完 requirements.txt 文件的话,直接执行 pip install -r requirements.txt 就行。
CUDA 环境配置
如果你使用GPU加速,确保CUDA环境已经配置好。可以通过以下代码检查CUDA的可用性:
import torch
print("CUDA 可用性:", torch.cuda.is_available())
print("当前设备数量:", torch.cuda.device_count())
if torch.cuda.is_available():
print("设备名称:", torch.cuda.get_device_name(0))
系统设计与实现
文件处理与加载
系统支持从多种文件格式中加载文本数据,包括 .txt, .pdf, .docx, .pptx, .xlsx 等。
每种文件格式的加载都通过不同的库来解析:
.txt 文件使用标准的 Python 文件操作读取。
.pdf 使用 PyMuPDF 解析文本。
.docx 使用 python-docx 解析段落。
.pptx 使用 python-pptx 解析幻灯片内容。
.xlsx 使用 pandas 读取 Excel 文件内容。
这些文件的文本内容随后会进行清洗,去除无用空白和换行,并添加到文档集合中。
文档索引构建
为了在大量文档中快速进行检索,系统使用 FAISS 来构建文档的向量索引。
通过 Sentence-Transformers 模型将文档转化为向量,并将这些向量添加到FAISS索引中。
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 向量索引添加
def build_index(docs):
embeddings = embedder.encode(docs)
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings))
return index
模型加载与推理
本系统支持加载多个深度学习模型,使用 transformers 库来加载预训练的 Causal LM 模型
(例如 ChatGLM-6B)。在用户提出问题时,系统会根据当前选择的模型生成回复。
from transformers import AutoTokenizer, AutoModelForCausalLM
# 模型加载方法
def load_model_and_tokenizer(model_dir):
model = AutoModelForCausalLM.from_pretrained(model_dir)
tokenizer = AutoTokenizer.from_pretrained(model_dir)
return model, tokenizer
文件上传与索引更新
用户上传文件后,系统会将其内容提取并增量更新文档索引,确保检索到最新的资料。
import shutil
# 文件更新方法
def upload_file(file_path):
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在。")
shutil.copy(file_path, './reference_docs/')
new_documents = load_documents_from_folder('./reference_docs')
incrementally_update_index(new_documents)
实时对话与文档检索
用户在聊天过程中输入问题,系统会根据问题生成上下文,并从已加载的文档中检索相关内容。
然后,将检索到的文档与问题一起传递给语言模型生成回答。
# 资料库问答
def search_docs(query, top_k=3):
query_vector = embedder.encode([query])
distances, indices = index.search(query_vector, top_k)
return [documents[i] for i in indices[0]]
Gradio 前端设计
前端使用 Gradio 来实现交互界面,用户可以通过文本框输入问题,上传文件,选择不同的模型
调整生成文本的参数(如温度、最大生成长度等)。
import gradio as gr
# Gradio UI 构建
with gr.Blocks() as demo:
user_input = gr.Textbox(show_label=False, placeholder="请输入您的问题...")
submit_btn = gr.Button("发送")
chatbot = gr.Chatbot(value=[{"role": "assistant", "content": "您好,我是您的智能助理!"}])
submit_btn.click(predict, inputs=[user_input], outputs=[chatbot])
主要功能
1. 上传文件:用户可以上传 .txt, .pdf, .docx, .pptx, .xlsx 文件系统会自动处理并更新知识库索引。
2. 文档检索:用户提出问题时,系统会从文档库中检索相关资料,并结合用户问题生成合理的回答。
3. 模型切换:支持动态切换不同的深度学习模型,用户可以根据需求选择不同的推理模型。
4. 实时对话:系统会根据历史对话生成新的回答,并提供流式响应。
完整代码
import os
import gradio as gr
import torch
from threading import Thread
from typing import Union, Tuple
from pathlib import Path
from peft import AutoPeftModelForCausalLM, PeftModelForCausalLM
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
PreTrainedModel,
PreTrainedTokenizer,
PreTrainedTokenizerFast,
StoppingCriteria,
StoppingCriteriaList,
TextIteratorStreamer
)
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import pandas as pd
import docx
import pptx
import fitz # PyMuPDF for PDF reading
from openpyxl import load_workbook
import logging
import shutil
import asyncio
import gc
from transformers import AutoModelForCausalLM, AutoTokenizer
ModelType = Union[PreTrainedModel, PeftModelForCausalLM]
TokenizerType = Union[PreTrainedTokenizer, PreTrainedTokenizerFast]
# 默认模型路径列表,用户可以根据需要添加更多模型路径
MODEL_PATHS = {
'ChatGLM3-6B': 'model/ChatGlm3-6B',
'ChatGlm3-6B-32K': 'model/ChatGlm3-6B-32K'
}
TOKENIZER_PATH = os.environ.get("TOKENIZER_PATH", '')
# 固定的资料路径
REFERENCE_FOLDER = './reference_docs' # 指定参考资料文件夹路径
# 默认的招呼语
DEFAULT_GREETING = "您好,我是您的智能助理,有什么可以帮助您的吗?"
# 加载身份描述文件
IDENTITY_FILE = './reference_docs/identity.txt'
# 检查 CUDA 是否可用
print("CUDA 可用性:", torch.cuda.is_available())
print("当前设备数量:", torch.cuda.device_count())
if torch.cuda.is_available():
print("设备名称:", torch.cuda.get_device_name(0))
# 加载 SentenceTransformer 嵌入模型
embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 初始化全局变量
documents = [] # 存储所有加载的文档内容
index = None # FAISS 索引对象
# 处理和规范化文件路径
def _resolve_path(path: Union[str, Path]) -> Path:
return Path(path).expanduser().resolve()
# 模型加载函数
def load_model_and_tokenizer(model_dir: Union[str, Path], trust_remote_code: bool = True) -> Tuple[AutoModelForCausalLM, AutoTokenizer]:
model_dir = Path(model_dir).resolve() # 解析路径
if (model_dir / 'adapter_config.json').exists():
model = AutoPeftModelForCausalLM.from_pretrained(
model_dir, trust_remote_code=trust_remote_code, device_map='auto'
)
tokenizer_dir = model.peft_config['default'].base_model_name_or_path
else:
model = AutoModelForCausalLM.from_pretrained(
model_dir, trust_remote_code=trust_remote_code, device_map='auto'
)
tokenizer_dir = model_dir
# 加载 tokenizer
tokenizer = AutoTokenizer.from_pretrained(
tokenizer_dir, trust_remote_code=trust_remote_code
)
return model, tokenizer
# 初始加载默认模型
current_model_name = 'ChatGLM3-6B'
# model token 加载
model, tokenizer = load_model_and_tokenizer(MODEL_PATHS[current_model_name], trust_remote_code=True)
# 将模型量化到 INT8(仅在模型支持量化时有效)
#model = torch.quantization.quantize_dynamic(
# model, # 量化的模型
# {torch.nn.Linear}, # 量化的层类型,通常是 Linear 层
# dtype=torch.qint8 # 量化为 INT8
#)
# 转换为半精度(FP16),如果需要
# model = model.half()
# 评估模式
model = model.cuda().eval()
# 量化为 INT8、转为 FP16、将模型转移到 GPU、设置为评估模式
# model = model.quantize(8).half().cuda().eval()
# Token 停止
class StopOnTokens(StoppingCriteria):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
stop_ids = [0, 2]
for stop_id in stop_ids:
if input_ids[0][-1] == stop_id:
return True
return False
# 文本解析函数
def parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line != ""]
count = 0
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split('`')
if count % 2 == 1:
lines[i] = f'<pre><code class="language-{items[-1]}">'
else:
lines[i] = f'<br></code></pre>'
else:
if i > 0:
if count % 2 == 1:
line = line.replace("`", "\`")
line = line.replace("<", "<")
line = line.replace(">", ">")
line = line.replace(" ", " ")
line = line.replace("*", "*")
line = line.replace("_", "_")
line = line.replace("-", "-")
line = line.replace(".", ".")
line = line.replace("!", "!")
line = line.replace("(", "(")
line = line.replace(")", ")")
line = line.replace("$", "$")
lines[i] = "<br>" + line
text = "".join(lines)
return text
# 数据清理函数
def clean_text(text):
"""清洗文本,去除多余的空格和换行"""
if not text or not isinstance(text, str):
print("无效文本,跳过处理")
return None
cleaned = " ".join(text.split()).strip()
# print(f"清理前: {text[:50]}... 清理后: {cleaned[:50]}...")
return cleaned if len(cleaned) > 5 else None # 最小长度为5
# 索引构建
def build_index(docs):
global index
if not docs or len(docs) == 0:
print("没有文档可以索引。")
return
embeddings = embedder.encode(docs)
dimension = embeddings.shape[1]
if index is None:
index = faiss.IndexFlatL2(dimension)
else:
index.reset()
index.add(np.array(embeddings))
print(f"索引已构建,共包含 {index.ntotal} 个文档。")
# 资料加载
def load_documents_from_folder(folder_path):
"""从文件夹加载所有文档内容,支持递归子文件夹"""
contents = []
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
ext = os.path.splitext(file_path)[-1].lower()
try:
if ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
content = clean_text(f.read())
if content:
contents.append(content)
print(f"成功加载TXT文件: {file_path}")
else:
print(f"TXT文件内容为空或被过滤: {file_path}")
elif ext == ".pdf":
with fitz.Document(file_path) as doc:
for page in doc:
content = clean_text(page.get_text())
if content:
contents.append(content)
print(f"成功加载PDF文件: {file_path}")
elif ext == ".docx":
doc_file = docx.Document(file_path)
for para in doc_file.paragraphs:
content = clean_text(para.text)
if content:
contents.append(content)
print(f"成功加载DOCX文件: {file_path}")
elif ext == ".pptx":
prs = pptx.Presentation(file_path)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content = clean_text(shape.text)
if content:
contents.append(content)
print(f"成功加载PPTX文件: {file_path}")
elif ext in [".xls", ".xlsx"]:
xls = pd.ExcelFile(file_path)
for sheet_name in xls.sheet_names:
df = xls.parse(sheet_name)
for col in df.columns:
content = clean_text(df[col].dropna().astype(str).str.cat(sep='\n'))
if content:
contents.append(content)
print(f"成功加载EXCEL文件: {file_path}")
except Exception as e:
logging.error(f"无法加载文件 {file_path},错误信息:{e}", exc_info=True)
print(f"加载文档数量: {len(contents)}")
return contents
# 资料初始化
def initialize_reference_documents():
"""初始化加载参考资料文件夹内的所有文档"""
global documents, index
if not os.path.exists(REFERENCE_FOLDER):
os.makedirs(REFERENCE_FOLDER)
# 加载文档
documents = load_documents_from_folder(REFERENCE_FOLDER)
# 如果文档加载成功,构建索引
if documents:
build_index(documents)
print(f"初始化完成,共加载了 {len(documents)} 个文档。")
else:
print("未找到任何文档,请检查参考文件夹。")
return len(documents)
# 文件刷新
def refresh_documents(chat_history):
"""刷新文件夹内容"""
global documents, index
try:
documents = load_documents_from_folder(REFERENCE_FOLDER)
build_index(documents)
message = f"文档已刷新,共索引了 {len(documents)} 个文档。"
chat_history.append({"role": "assistant", "content": message})
return chat_history
except Exception as e:
chat_history.append({"role": "assistant", "content": f"刷新资料库失败:{e}"})
return chat_history
# 增量更新索引的函数
def incrementally_update_index(new_documents):
global documents, index
# 仅将新文档的嵌入向量添加到索引
new_embeddings = [embedder.encode([doc]) for doc in new_documents] # 假设你有一个 `embedder`
index.add(np.array(new_embeddings))
print(f"成功更新索引,添加了 {len(new_documents)} 个文档的嵌入向量。")
# 修改文件上传函数,使用增量索引更新
def upload_file(file_path, chat_history):
global documents, index
print(f"开始上传文件:{file_path}")
if not file_path:
chat_history.append({"role": "assistant", "content": "没有选择文件,请重新上传!"})
return chat_history
if not os.path.exists(file_path):
chat_history.append({"role": "assistant", "content": "文件不存在,请检查文件路径!"})
return chat_history
try:
file_name = os.path.basename(file_path)
save_path = os.path.join(REFERENCE_FOLDER, file_name)
if not os.path.exists(REFERENCE_FOLDER):
os.makedirs(REFERENCE_FOLDER)
if os.path.exists(save_path):
chat_history.append({"role": "assistant", "content": f"文件 {file_name} 已存在,正在重命名..."})
base_name, ext = os.path.splitext(file_name)
counter = 1
while os.path.exists(save_path):
new_file_name = f"{base_name}_{counter}{ext}"
save_path = os.path.join(REFERENCE_FOLDER, new_file_name)
counter += 1
shutil.copy(file_path, save_path)
print(f"文件已复制到 {save_path},大小为 {os.path.getsize(save_path)} 字节")
# 加载新文件并增量更新索引
new_documents = load_documents_from_folder(REFERENCE_FOLDER)
incrementally_update_index(new_documents)
chat_history.append({"role": "assistant", "content": f"文件 {file_name} 上传成功并已保存!"})
return chat_history
except FileNotFoundError:
chat_history.append({"role": "assistant", "content": "文件没有找到,请检查文件路径!"})
except PermissionError:
chat_history.append({"role": "assistant", "content": "没有权限访问文件,请检查文件权限!"})
except Exception as e:
chat_history.append({"role": "assistant", "content": f"上传过程中发生错误:{e}"})
# 资料库检索
def search_docs(query, top_k=3, distance_threshold=15.0):
"""从资料库检索相关文档"""
if not index or not documents:
print("索引或文档未初始化。")
return ["还没有加载文件。请初始化或刷新参考文件夹。"]
query_vector = embedder.encode([query])
distances, indices = index.search(query_vector, top_k)
print(f"检索结果索引: {indices}, 距离: {distances}")
results = []
for i, dist in zip(indices[0], distances[0]):
if i < len(documents):
results.append((documents[i], dist))
if not results:
print("未找到任何相关内容。")
return ["未找到与查询相关的内容,请确保资料库中有相关内容。"]
results = sorted(results, key=lambda x: x[1])
filtered_results = [doc for doc, dist in results if dist < distance_threshold]
if not filtered_results:
print("距离超过阈值,返回最近的文档。")
return [doc for doc, _ in results[:top_k]]
return filtered_results
# 读取 identity.txt 的内容
def load_identity(file_path):
if not os.path.exists(file_path):
print(f"身份文件未找到:{file_path}")
return "我是您的智能助理,很高兴为您服务。"
try:
with open(file_path, 'r', encoding='utf-8') as f:
identity_content = f.read()
return identity_content.strip()
except Exception as e:
print(f"无法读取身份文件:{e}")
return "我是您的智能助理,很高兴为您服务。"
# 身份文件 读取
identity_content = load_identity(IDENTITY_FILE)
# 模型预测
def predict(chat_history, max_length, top_p, temperature):
"""处理用户输入,生成回答"""
stop = StopOnTokens()
global identity_content
if not chat_history:
# 如果 chat_history 为空,提示用户输入
chat_history = [{"role": "assistant", "content": "请先输入您的问题。"}]
yield chat_history, chat_history
return
# 获取用户的最新输入
user_input = chat_history[-1]['content']
# 检索相关文档
related_docs = search_docs(user_input, top_k=3, distance_threshold=15.0)
# 确保 related_docs 格式正确
if not related_docs:
related_docs = ["未找到相关资料"]
if "未找到" in related_docs[0]:
# 更新 chat_history,添加助手的回复
chat_history.append({"role": "assistant", "content": related_docs[0]})
yield chat_history, chat_history
return
# 构建上下文并限制长度
context = "\n".join(related_docs[:3]) # 仅使用前3个相关文档
if len(context) > 1000:
context = context[:1000] + "..."
# 将身份描述添加到上下文
if identity_content:
identity_prompt = f"助手的自我介绍:\n{identity_content}\n\n"
else:
identity_prompt = ""
# 构造完整输入
full_input = f"{identity_prompt}以下是相关资料内容:\n{context}\n\n问题:{user_input}\n回答:"
messages = chat_history[:-1] # 不包含最后一个用户消息
messages.append({"role": "user", "content": full_input})
# 添加一个空的助手回复到 chat_history
chat_history.append({"role": "assistant", "content": ""})
print("\n\n====conversation====\n", messages)
# 生成模型输入
model_inputs = tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt"
).to(next(model.parameters()).device)
streamer = TextIteratorStreamer(tokenizer, timeout=60, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = {
"input_ids": model_inputs,
"streamer": streamer,
"max_new_tokens": max_length,
"do_sample": True,
"top_p": top_p,
"temperature": temperature,
"stopping_criteria": StoppingCriteriaList([stop]),
"repetition_penalty": 1.2,
}
# 启动生成线程
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(model.generate, **generate_kwargs)
# 实时返回生成的内容
partial_response = ""
for new_token in streamer:
try:
if new_token != '':
partial_response += new_token
chat_history[-1]['content'] = partial_response
yield chat_history, chat_history
except Exception as e:
print(f"流式生成时出错:{e}")
break
# 模型切换
def update_model( model_name):
global model, tokenizer, current_model_name
current_model_name = model_name
return [{"role": "assistant", "content": f"模型切换中:{current_model_name}"}]
# 清空缓存并卸载模型
def clear_previous_model():
global model, tokenizer
if model:
del model
if tokenizer:
del tokenizer
torch.cuda.empty_cache() # 清理 GPU 显存
import gc
gc.collect() # 强制垃圾回收
# 同步模型切换
def update_modelswit(model_name):
clear_previous_model() # 清空上一个模型
global model, tokenizer
current_model_name = model_name
# 加载新模型
model, tokenizer = load_model_and_tokenizer(MODEL_PATHS[model_name], trust_remote_code=True)
model = model.to('cuda') # 将模型移动到 GPU
model = model.eval() # 设置为推理模式
# 初始化参考资料
initialize_reference_documents()
current_model_name = model_name
# 返回模型切换后的状态
return [{"role": "assistant", "content": f"模型已切换为:{current_model_name}"}]
# 清空对话
def reset_state():
return [{"role": "assistant", "content": DEFAULT_GREETING}], [{"role": "assistant", "content": DEFAULT_GREETING}]
# 初始化参考资料
initialize_reference_documents()
# 构建 UI
with gr.Blocks() as demo:
# 自定义按钮的样式 #FF7617
gr.HTML("""
<style>
.my-btn {
background-color: #F26D14 !important;
color: white !important;
border-radius: 5px !important;
font-size: 16px !important;
padding: 10px 20px !important;
}
.my-btn:hover {
background-color: #E15B10 !important;
}
.my-btn:active {
background-color: #D14A00 !important;
}
.submitBtn {
background-color: #F26D14 !important;
color: white !important;
border-radius: 5px !important;
font-size: 16px !important;
padding: 10px 20px !important;
}
.submitBtn:hover {
background-color: #E15B10 !important;
}
.submitBtn:active {
background-color: #D14A00 !important;
}
</style>
""")
gr.HTML(f"""<h1 align="center">Maddie 私有化智能体</h1>
<h3 align="center">欢迎使用智能知识库,请上传文档并提出问题!</h3>""")
chatbot = gr.Chatbot(value=[{"role": "assistant", "content": DEFAULT_GREETING}], type="messages")
with gr.Row():
with gr.Column(scale=4): # 保持原比例,输入框和对话历史不变
# 输入框
user_input = gr.Textbox(
show_label=False,
placeholder="输入您的问题...",
lines=5, # 保持原来行数
container=False,
elem_id="user_input_box"
)
# 发送按钮和上传按钮保持原大小
submitBtn = gr.Button("发送",elem_classes=["submitBtn"], elem_id="submit_btn")
with gr.Column(scale=1):
# 文件上传组件,减小宽度
file_upload = gr.File(
label="上传文件",
file_types=['.txt', '.pdf', '.docx', '.xlsx', '.pptx'],
type="filepath",
elem_id="file_upload",
scale=0.7 # 缩小文件上传组件的宽度
)
upload_button = gr.Button("上传文件",elem_classes=["my-btn"])
# refresh_button = gr.Button("刷新资料库")
with gr.Column(scale=1): # 右侧的控件部分保持较小比例
model_selector = gr.Dropdown(
list(MODEL_PATHS.keys()),
label="选择模型",
# value=current_model_name,
scale=0.8 # 窄一点的下拉框
)
modelswitch = gr.Button("模型切换",elem_classes=["my-btn"], size="sm")
max_length = gr.Slider(0, 32768, value=8192, step=1.0, label="最大长度", interactive=True)
top_p = gr.Slider(0, 1, value=0.8, step=0.01, label="Top P", interactive=True)
temperature = gr.Slider(0.01, 1, value=0.6, step=0.01, label="温度", interactive=True)
emptyBtn = gr.Button("清空对话",elem_classes=["my-btn"])
# 初始对话历史
history = gr.State([{"role": "assistant", "content": DEFAULT_GREETING}])
# 示例消息解析逻辑
def parse_text(text):
return text.strip() # 简单解析,可以按需扩展
# 发送消息逻辑
def send_message(input_text, chat_history):
chat_history.append({"role": "user", "content": parse_text(input_text)})
return "", chat_history
user_input.submit(
send_message,
inputs=[user_input, history],
outputs=[user_input, chatbot]
)
# 点击按钮发送消息
submitBtn.click(
send_message,
inputs=[user_input, history],
outputs=[user_input, history]
).then(
predict,
inputs=[history, max_length, top_p, temperature],
outputs=[chatbot, history]
)
# 清空对话
emptyBtn.click(reset_state, outputs=[chatbot, history], queue=False)
# 模型选择
model_selector.change(update_model, inputs=[model_selector], outputs=[chatbot])
# 模型切换
modelswitch.click(update_modelswit, inputs=[model_selector], outputs=[chatbot])
# 上传文件并刷新资料库
upload_button.click(
lambda file_path, chat_history: (
upload_file(file_path, chat_history) # 上传文件
) if file_path else (
chat_history # 如果没有选择文件,返回当前对话
),
inputs=[file_upload, chatbot],
outputs=[chatbot]
).then(
refresh_documents, # 上传成功后刷新资料库
inputs=[chatbot],
outputs=[chatbot]
)
# 监听回车事件 (Enter)
gr.HTML("""
<script>
window.addEventListener('load', function() {
// 获取用户输入框和按钮元素
const inputBox = document.querySelector("#user_input_box"); // 通过 ID 获取输入框
const submitButton = document.querySelector("#submit_btn"); // 通过 ID 获取按钮
// 监听回车键事件
inputBox.addEventListener("keydown", function(event) {
// 检查是否按下了回车键
if (event.key === "Enter") {
event.preventDefault(); // 阻止默认的换行行为
submitButton.click(); // 模拟点击提交按钮
}
});
});
</script>
""")
# 刷新资料库
# refresh_button.click(refresh_documents, inputs=[chatbot], outputs=[chatbot])
# 启动服务
demo.queue()
demo.launch(server_name="127.0.0.1", server_port=7870, inbrowser=True, share=True)
功能说明
运行示例
运行成功后在浏览器键入:127.0.0.1:7870 可以打开测试网页,当然也会自动打开。
文件上传
点击上传文件可以选择你要添加进资料库中的文件。
点击上传文件可以更新资料库以及索引
如果成功就会显示这样的提示。
资料库对话
正常问答就可以访问资料库。
模型切换
点击模型切换下拉菜单 可以查看模型列表。目前是手动添加。
如需更改注意环境兼容性以及在代码中重新配置模型路径。
# 默认模型路径列表,用户可以根据需要添加更多模型路径
MODEL_PATHS = {
'ChatGLM3-6B': 'model/ChatGlm3-6B',
'ChatGlm3-6B-32K': 'model/ChatGlm3-6B-32K'
}
选择好模型之后,对话框会提示正在切换的模型。
点击模型切换按钮后台会显示加载模型进度。
模型加载完毕也会有相关提示。
模型对话
目前切换完模型之后,资料库索引有一点问题。Gradio 对话显示时好时坏的,暂时还没找到原因。
后期有时间再补充吧,如果哪位大佬修改好了可以踢我就一脚,我去观摩观摩。
项目文件介绍
model:存放模型文件的文件夹。
reference_docs:资料库文件夹,所有参考资料都在这里。
static:用户生成的文档或数据比如结果文件、日志文件、分析报告等,基本上用不到。
PrivateAgent.py:就是执行文件。
requirements.txt:依赖加载文件。
requirements02.txt:依赖加载文件备份。
执行命令.txt:放了一些常用的命令。
执行命令合集
//打开项目盘
D:
//打开根目录
cd D:\Unity\ChatGLM\PrivateAgent\Private-Agent
//依赖安装
pip install -r requirements.txt
//卸载当前版本的 PyTorch:
pip uninstall torch torchvision torchaudio -y
//安装 CUDA
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia
//运行程序
python PrivateAgent.py
//查看 Python 版本
python --version
//显卡 CUDA 版本查询
nvidia-smi
//查看当前平台支持的版本
pip debug --verbose
//查看 **依赖 版本 例如:pip show pytorch
pip show **
暂时先这样吧,如果实在看不明白就留言,看到我会回复的。希望这个教程对您有帮助!
路漫漫其修远兮,与君共勉。
原文地址:https://blog.csdn.net/weixin_43925843/article/details/143643941
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!