自学内容网 自学内容网

文件下载时利用redis的队列模式顺序下载文件,防止多文件任务下载导致OOM

1、controller层控制


@Resource
    private RedissonClient redissonClient;
@Slf4j
@Service
public class CustomerSettlementExportServiceImpl implements ICustomerSettlementExportService {
/**
     * 文件加入队列顺序导出
     *
     * @param pubFileExportList 参数
     * @return 结果
     */
    public AjaxResult pubFileExport(List<PubFileExport> pubFileExportList) {
        if (CollectionUtils.isEmpty(pubFileExportList)) {
            return AjaxResult.error("客户信息不能为空!");
        }
        RQueue<String> queue = redissonClient.getQueue("downloadQueue:file:export");
        for (PubFileExport pubFileExport : pubFileExportList) {
            try {
                queue.add(JSONObject.toJSONString(pubFileExport));
            } catch (Exception e) {
                log.error("加入队列失败=", e);
                return AjaxResult.error("添加导出任务到队列失败!");
            }
        }
        return AjaxResult.success();
    }
}

以下代码是通过定时任务拉取队列进行文件下载

@Slf4j
@Service
public class FileDownloadServiceImpl implements IFileDownloadService {

    @Resource
    private ICustomerSettlementExportService customerSettlementExportService;

    @Resource
    private IPubFileExportService pubFileExportService;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    @Resource
    private RedissonClient redissonClient;

    @Scheduled(fixedRate = 10000)
    public void processDownloadsInit() {
        // 提前过滤掉特定 IP 地址
        if (isLocalIp()) {
            return;
        }
        RLock lock = null;
        try {
            lock = redissonClient.getLock(BmsConstants.QUEUE_KEY_LOCK);
            boolean isOrderLock = lock.tryLock(10, TimeUnit.SECONDS);
            if (!isOrderLock) {
                // 当前任务未执行完,不能获取锁
                log.error("未能获取分布式锁,当前任务未执行完或有其他实例正在执行");
                return;
            }
            // 阻塞获取队列中的任务
            String exportTask = redisTemplate.opsForList().rightPop(BmsConstants.QUEUE_KEY, 10, TimeUnit.SECONDS);
            if (exportTask != null) {
                log.info("队列文件下载开始=" + DateUtils.getTime());
                handleDownload(exportTask);
                log.info("队列文件下载结束=" + DateUtils.getTime());
            }
        } catch (Exception e) {
            log.error("队列文件下载异常=", e);
        } finally {
            safeUnlock(lock);
        }
    }

    /**
     * 执行队列文件下载
     *
     * @param exportTask 导出参数
     */
    private void handleDownload(String exportTask) {
        try {
            PubFileExport pubFileExport = JSONObject.parseObject(exportTask, PubFileExport.class);
            // 实际的文件下载逻辑
            if (pubFileExport.getExportStartTime() == null
                    || pubFileExport.getExportEndTime() == null
                    || StringUtils.isBlank(pubFileExport.getClientCode())) {
                log.error("客户数据文件下载的参数为空请检查!");
                return;
            }
            AjaxResult ajaxResult = customerSettlementExportService.exportSheets(pubFileExport);
            pubFileExportService.updateExportAllExcel(ajaxResult, pubFileExport.getGid(), pubFileExport.getFinalUrl());
        } catch (Exception e) {
            log.error("执行队列文件下载异常=", e);
        }
    }

    /**
     * 检查是否为本地 IP 地址
     *
     * @return 是否为本地 IP 地址
     */
    private boolean isLocalIp() {
        try {
            InetAddress localAddress = InetAddress.getLocalHost();
            String ip = localAddress.getHostAddress();
            return ip.startsWith("192.168");
        } catch (Exception e) {
            log.error("获取本地 IP 地址异常=", e);
            return false;
        }
    }

    /**
     * 安全释放锁
     *
     * @param lock 锁对象
     */
    private void safeUnlock(RLock lock) {
        if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
            try {
                lock.unlock();
            } catch (Exception e) {
                log.error("释放锁异常=", e);
            }
        }
    }

}

3、解析下代码

这段代码实现了文件下载任务的队列管理和处理。
主要功能包括:
任务入队:将多个文件导出任务加入Redis队列。
定时任务处理:每隔10秒从队列中取出任务并执行文件下载。
分布式锁控制:确保同一时间只有一个实例处理任务,避免重复执行。
本地IP过滤:防止本地IP地址触发任务。

控制流图(CFG)

flowchart TD
A[开始] --> B{是否为本地IP}
B -->|是| E[结束]
B -->|否| C[获取分布式锁]
C --> D{是否获取成功}
D -->|否| F[记录日志并结束]
D -->|是| G[从队列取任务]
G --> H{是否有任务}
H -->|否| I[释放锁并结束]
H -->|是| J[处理下载任务]
J --> K[记录开始时间]
K --> L[调用下载逻辑]
L --> M[记录结束时间]
M --> N[释放锁并结束]

详细说明

任务入队 (pubFileExport 方法):
检查任务列表是否为空。
将每个任务序列化后加入Redis队列。
定时任务处理 (processDownloadsInit 方法):
检查是否为本地IP地址,如果是则直接返回。
尝试获取分布式锁,如果失败则记录日志并返回。
从Redis队列中取出任务,如果有任务则调用处理方法。
记录任务开始和结束时间,并释放锁。
处理下载任务 (handleDownload 方法):
反序列化任务参数。
检查参数是否完整,如果不完整则记录错误并返回。
调用客户结算导出服务进行文件下载,并更新导出状态。
辅助方法:
isLocalIp:检查当前IP是否为本地IP。
safeUnlock:安全释放分布式锁。


原文地址:https://blog.csdn.net/weixin_44372802/article/details/145206163

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!