自学内容网 自学内容网

将大模型生成数据存入Excel,并用增量的方式存入Excel

将大模型生成数据存入Excel,并用增量的方式存入Excel

1. 需求与要解决的问题

首先就是大模型对话特别耗时,所以通过需要异步执行。
其次是中间对话会有终端或像死锁的那种情况,循环不再继续,所以要增量存到Excel。
最后就是多线程并发,与并发线程数量的控制,因为模型不支持太多线程并发执行。

2. 代码

@Async
    @Override
    public void generateMilitaryDescription() {
        log.info("开始执行 generateMilitaryDescription 方法");

        String outputPath = "/Users/fanzhen/Documents/tuijian-java-copy/src/main/java/com/landinn/common/client/military_descriptions.xlsx";
        XSSFWorkbook workbook;
        XSSFSheet sheet;
        int[] rowIndex = {1}; // 行索引初始化

        // 用于存储已存在的 golaxy_node_id 和 golaxy_vocab_id
        Set<String> existingVocabIds = new HashSet<>();

        // 检查文件是否存在
        File file = new File(outputPath);
        if (file.exists()) {
            try (FileInputStream fileIn = new FileInputStream(file)) {
                workbook = new XSSFWorkbook(fileIn); // 读取现有文件
                sheet = workbook.getSheetAt(0); // 获取第一个工作表
                rowIndex[0] = sheet.getLastRowNum() + 1; // 计算新数据的起始行
                log.info("检测到现有文件,从第 {} 行开始追加", rowIndex[0]);

                // 将现有的golaxy_vocab_id 存入集合
                for (int i = 1; i <= sheet.getLastRowNum(); i++) { // 从第 1 行开始读取数据
                    Row row = sheet.getRow(i);
                    if (row != null) {
                        Cell vocabIdCell = row.getCell(1); // 第 2 列为 golaxy_vocab_id
                        if (vocabIdCell != null && vocabIdCell.getCellType() == CellType.STRING) {
                            existingVocabIds.add(vocabIdCell.getStringCellValue());
                        }
                    }
                }
            } catch (IOException e) {
                log.error("读取现有文件时出错: {}", e.getMessage(), e);
                return;
            }
        } else {
            // 如果文件不存在,则创建新的文件和工作表
            workbook = new XSSFWorkbook();
            sheet = workbook.createSheet("军事描述");

            // 创建标题行
            Row headerRow = sheet.createRow(0);
            headerRow.createCell(0).setCellValue("golaxy_node_id");
            headerRow.createCell(1).setCellValue("golaxy_vocab_id");
            headerRow.createCell(2).setCellValue("node_name");
            headerRow.createCell(3).setCellValue("description");
            headerRow.createCell(4).setCellValue("created_at");
            headerRow.createCell(5).setCellValue("updated_at");
            log.info("未检测到文件,创建新文件");
        }

        // 查询未删除的节点
        List<TechniqueTreeNode> techniqueTreeNodes = techniqueTreeNodeMapper.selectAll();
        log.info("共检索到 {} 条 TechniqueTreeNode 数据", techniqueTreeNodes.size());

        // 创建线程池,限制最高并发为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // 核心线程数
                2,  // 最大线程数
                90,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(), // 不允许任务排队,任务必须直接交给线程处理
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝的任务由调用线程执行
        );

        Semaphore semaphore = new Semaphore(2); // 限制并发任务数

        for (TechniqueTreeNode node : techniqueTreeNodes) {
            // 将 golaxy_node_id 和 golaxy_vocab_id 转为字符串
            String golaxyNodeId = String.valueOf(node.getGolaxyNodeId());
            String golaxyVocabId = String.valueOf(node.getGolaxyVocabId());

            // 检查是否已存在
            if (existingVocabIds.contains(golaxyVocabId)) {
                log.info("golaxy_vocab_id '{}' 已存在,跳过该节点", golaxyVocabId);
                continue;
            }

            executor.submit(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                try {
                    String nodeName = node.getNodeName();
                    log.info("正在处理节点名称:{}", nodeName);

                    String threadName = Thread.currentThread().getName();
                    log.info("线程 {} 开始处理节点 '{}'", threadName, nodeName);

                    long startTime = System.currentTimeMillis();
                    String resultRes = chatModelClient.chatModel(nodeName + "的发展,对军事装备会带来哪些影响?请分别按照军事应用场景列出并详细描述。");
                    String result = "\n # 军事领域应用前景 \n" + resultRes;

                    long endTime = System.currentTimeMillis();

                    log.info("线程 {} 调用模型生成对话完成,耗时:{} 秒", threadName, (endTime - startTime) / 1000.0);
                    log.info("节点 '{}' 的描述结果:{}", nodeName, result);

                    // 写入 Excel 数据并立即保存到文件
                    synchronized (sheet) {
                        Row row = sheet.createRow(rowIndex[0]++);

                        // 创建每个单元格并设置为文本格式
                        Cell cell0 = row.createCell(0);
                        cell0.setCellValue(golaxyNodeId);

                        Cell cell1 = row.createCell(1);
                        cell1.setCellValue(golaxyVocabId);

                        Cell cell2 = row.createCell(2);
                        cell2.setCellValue(nodeName);

                        Cell cell3 = row.createCell(3);
                        cell3.setCellValue(result);

                        // 设置 created_at 和 updated_at 为文本格式
                        Cell cell4 = row.createCell(4);
                        cell4.setCellValue(DateUtil.format(node.getCreatedAt(), "yyyy-MM-dd HH:mm:ss"));

                        Cell cell5 = row.createCell(5);
                        cell5.setCellValue(DateUtil.format(node.getUpdatedAt(), "yyyy-MM-dd HH:mm:ss"));

                        // 增量保存文件
                        try (FileOutputStream fileOut = new FileOutputStream(outputPath)) {
                            workbook.write(fileOut);
                            log.info("Excel 文件已增量更新至 {}", outputPath);
                        } catch (IOException e) {
                            log.error("增量更新文件时出错:{}", e.getMessage(), e);
                        }
                    }

                    log.info("节点 '{}' 处理成功", nodeName);
                } catch (Exception e) {
                    log.error("处理节点 '{}' 时发生错误:{}", node.getNodeName(), e.getMessage(), e);
                } finally {
                    semaphore.release(); // 释放许可
                }
            });
        }

        executor.shutdown();
        try {
            if (!executor.awaitTermination(90, TimeUnit.SECONDS)) {
                log.warn("线程池未在超时时间内关闭");
            }
        } catch (InterruptedException e) {
            log.error("等待线程池关闭时发生错误:{}", e.getMessage(), e);
            Thread.currentThread().interrupt();
        }

        log.info("generateMilitaryDescription 方法执行完成");
    }

3. 部分代码分析

这段代码防止重复生成数据存到Excel,用Set<String> existingVocabIds存储已在Excel中的数据(golaxy_vocab_id是唯一id)。

// 用于存储已存在的golaxy_vocab_id
        Set<String> existingVocabIds = new HashSet<>();

        // 检查文件是否存在
        File file = new File(outputPath);
        if (file.exists()) {
            try (FileInputStream fileIn = new FileInputStream(file)) {
                workbook = new XSSFWorkbook(fileIn); // 读取现有文件
                sheet = workbook.getSheetAt(0); // 获取第一个工作表
                rowIndex[0] = sheet.getLastRowNum() + 1; // 计算新数据的起始行
                log.info("检测到现有文件,从第 {} 行开始追加", rowIndex[0]);

                // 将现有的 golaxy_node_id 和 golaxy_vocab_id 存入集合
                for (int i = 1; i <= sheet.getLastRowNum(); i++) { // 从第 1 行开始读取数据
                    Row row = sheet.getRow(i);
                    if (row != null) {
                        Cell vocabIdCell = row.getCell(1); // 第 2 列为 golaxy_vocab_id
                        if (vocabIdCell != null && vocabIdCell.getCellType() == CellType.STRING) {
                            existingVocabIds.add(vocabIdCell.getStringCellValue());
                        }
                    }
                }
            } catch (IOException e) {
                log.error("读取现有文件时出错: {}", e.getMessage(), e);
                return;
            }
        }

线程的并发限制

// 创建线程池,限制最高并发为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // 核心线程数
                2,  // 最大线程数
                90,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(), // 不允许任务排队,任务必须直接交给线程处理
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝的任务由调用线程执行
        );

        Semaphore semaphore = new Semaphore(2); // 限制并发任务数

 semaphore.acquire(); // 获取许可
semaphore.release(); // 释放许可

原文地址:https://blog.csdn.net/Blue_Pepsi_Cola/article/details/143883758

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!