自学内容网 自学内容网

【代码片】文件多线程读取下载+异步上传云存储

1、文件读取下载转存cos业务方法

    public Boolean transferStorageMaterial(Long startTime, Long endTime) {
        // 按照时间段查询待转存储的数据
        List<MaterialPrepareBo> materialPrepareList = materialVideoMapper.selectVideoNon2CosChuangliang(startTime, endTime);
        if (CollectionUtils.isEmpty(materialPrepareList)) {
            log.info("[Chuangliang素材转存]:查无待转存的素材, type:{}", type);
            return true;
        }
        // 删除文件,批量上传。
        // 创建固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(12);
        try {
            // 待上传的文件集合
            Map<String, MaterialPrepareBo> toHandleMd5Map = materialPrepareList.stream()
                    .collect(Collectors.toMap(MaterialPrepareBo::getMd5, materialPrepareBo -> materialPrepareBo, (existing, replacement) -> existing));

            // 待上传集合-根据md5去重复,防止重复上传
            List<String> toHandleDistinctMd5List = materialPrepareList.stream().map(MaterialPrepareBo::getMd5).distinct().collect(Collectors.toList());
            // 待上传集合-记录文件上传结果(此处主要用于记录失败结果,成功的数据会异步更新)
            ConcurrentHashMap<String, MaterialPrepareBo> md5CosUrlMap = new ConcurrentHashMap(toHandleDistinctMd5List.size());
            List<Future<?>> futures = new ArrayList<>(toHandleDistinctMd5List.size());
            for (String md5 : toHandleDistinctMd5List) {
                MaterialPrepareBo materialPrepareBo = toHandleMd5Map.get(md5);
                futures.add(executorService.submit(() -> {
                    try {
                        appgrowingMaterialService.batchDownloadAndUploadFile(materialPrepareBo);
                        md5CosUrlMap.put(md5, materialPrepareBo);
                    } catch (Exception e) {
                        materialPrepareBo.setErrorMsg("executorService error");
                        log.error("executorService error: {}", e);
                    }
                }));
            }
            // 等待所有任务完成
            for (Future<?> future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    log.error("executorService future.get error: {}", e);
                }
            }

            // 最后处理上传结果
            for (MaterialPrepareBo materialPrepareBo : materialPrepareList) {
                String md5 = materialPrepareBo.getMd5();
                if (StringUtils.isEmpty(md5)) {
                    materialPrepareBo.setCosUrl(CommonConstant.FAILED_MSG);
                    materialPrepareBo.setErrorMsg("file md5 StringUtils.isEmpty is true");
                    continue;
                }
                MaterialPrepareBo temp = md5CosUrlMap.get(md5);
                materialPrepareBo.setCosPosterUrl(temp.getCosPosterUrl());
                materialPrepareBo.setCosUrl(temp.getCosUrl());
                materialPrepareBo.setErrorMsg(temp.getErrorMsg());
            }
            // 主要用于更新errorMsg,cosUrl已异步更新
            materialVideoMapper.batchUpdateVideoCosUrl(materialPrepareList);
            return true;
        } catch (Exception e) {
            log.error("[Chuangliang素材转存]:上传文件异常", e);
        } finally {
            if (executorService != null) {
                executorService.shutdown();
            }
        }
        return false;
    }

2、文件读取下载和上传业务方法

    @Override
    public String batchDownloadAndUploadFile( MaterialPrepareBo materialPrepareBo){
        log.info("[创量素材]转存cos素材materialId: {}", materialPrepareBo.getMaterialId());
    String url = materialPrepareBo.getUrl();
        String path = "chuangliang/video/result".intern();
        Path downloadedFile = null;
        try {
            int lastDotIndex = url.lastIndexOf('.');
            if (-1 == lastDotIndex || 0 == lastDotIndex) {
                log.error("Resource Unknown file type! url: {}",url);
                materialPrepareBo.setErrorMsg("Resource Unknown file type!");
                return CommonConstant.FAILED_MSG;
            }
            String fileType = url.substring(lastDotIndex + 1);
            String todayPath = path + DateUtils.getTodayYMD();
            // 下载文件的路径
            String fileName = UUIDUtils.getUUID(false,true) + "." + fileType;
            downloadedFile = Paths.get("D:\\"+fileName);
            // 下载到本地
            boolean result = downloadFile(url, downloadedFile.toString());
            if (!result){
                log.error("Resource downloaded fail! url: {}",url);
                materialPrepareBo.setErrorMsg("素材流读取失败");
                return CommonConstant.FAILED_MSG;
            }
            // 上传文件
            asyncDoUpload(Files.readAllBytes(downloadedFile), fileName, todayPath, materialPrepareBo.getMaterialId(), materialPrepareBo.getType(), type);
            return CommonConstant.SUCCESS;
        } catch (Exception e) {
            materialPrepareBo.setErrorMsg("下载和上传文件失败");
            log.error("下载和上传文件失败, 异常: {}", e);
        }finally {
            // 删除本地文件
            if (Objects.nonNull(downloadedFile)) {
                try {
                    deleteLocalFile(downloadedFile);
                } catch (IOException e) {
                    log.warn("删除临时文件失败, 异常: {}", e);
                }
            }
        }
        return CommonConstant.FAILED_MSG;
    }

3、文件同步读取下载

    private boolean downloadFile(String fileUrl, String destination) throws IOException {
        try (BufferedInputStream in = new BufferedInputStream(new URL(fileUrl).openStream());
             FileOutputStream fos = new FileOutputStream(destination)) {
            long start = System.currentTimeMillis();
            byte[] dataBuffer = new byte[1024];
            int bytesRead;
            while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
                fos.write(dataBuffer, 0, bytesRead);
            }
            log.info("素材流读取耗时:{},  文件地址: {}", System.currentTimeMillis() - start, fileUrl);
            return true;
        } catch (IOException e) {
            log.error("素材流读取失败:{}", e.getMessage());
            return false;
        }
    }

4、文件异步上传cos

    @Async
    public String asyncDoUpload(byte[] resource, String fileName, String path, Long materialId) {
        long start = System.currentTimeMillis();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.MULTIPART_FORM_DATA);

        List<UploadUtils.Bytes> bytes = new ArrayList<>();
        bytes.add(new UploadUtils.Bytes("file", fileName, resource));
        HashMap<String, String> postParam = new HashMap<>();
        postParam.put("path", path);
        postParam.put("bucketName", BUCKET_NAME);
        int currentAttempt = 0;
        while (currentAttempt < 3) {
            currentAttempt++;
            try {
                Map<String, Object> resultMap = UploadUtils.uploadByteByHTTP(bytes, uploadDomain.concat(UPLOAD_URL), postParam);
                log.info("调用上传文件服务耗时:{}, url :{} ,返回结果为:{} ", System.currentTimeMillis() - start, uploadDomain.concat(UPLOAD_URL), resultMap);
                if (resultMap.isEmpty()) {
                    log.info("文件上传返回结果为空,上传失败");
                    Thread.sleep(1000);
                    continue;
                }
                if (!resultMap.get("statusCode").toString().equals(String.valueOf(HttpStatus.OK.value()))) {
                    log.error("调用上传服务http状态异常!状态码:{}", resultMap.get("statusCode").toString());
                }
                else {
                    CosUploadDto data = JsonUtils.fromJson(resultMap.get("data").toString(), CosUploadDto.class);
                    materialVideoMapper.updateCosUrlByMaterialId(data.getData(), materialId);
                    return data.getData();
                }
            } catch (Exception e) {
                log.error("上传文件时发生异常:" + e);
            }
            //等待1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        log.error("文件上传失败,已达到最大重试次数");
        return "";
    }

5、自定义线程池

    @Bean(name = "TranslatePool")
    public ThreadPoolTaskExecutor getThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setKeepAliveSeconds(keepAlive);
        executor.setQueueCapacity(queueCapacity);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

6、线程池任务单个提交

            // 待上传集合-根据md5去重复,防止重复上传
            List<String> toHandleDistinctMd5List = todoHandleList.stream().map(MaterialPrepareBo::getMd5).distinct().collect(Collectors.toList());
            // 待上传集合-记录文件上传结果(此处主要用于记录失败结果,成功的数据会异步更新)
            ConcurrentHashMap<String, MaterialPrepareBo> md5CosUrlMap = new ConcurrentHashMap(toHandleDistinctMd5List.size());
            for (String md5 : toHandleDistinctMd5List) {
                MaterialPrepareBo materialPrepareBo = toHandleMd5Map.get(md5);
                executorService.submit(() -> {
                    try {
                        appgrowingMaterialService.batchDownloadAndUploadFile(materialPrepareBo.getUrl(), type, materialPrepareBo);
                        md5CosUrlMap.put(md5, materialPrepareBo);
                    } catch (Exception e) {
                        materialPrepareBo.setErrorMsg("executorService error");
                        log.error("executorService error: {}", e);
                    }
                });
            }
            executorService.awaitTermination(Long.MIN_VALUE, TimeUnit.MICROSECONDS);

参考文章:文件多线程读取下载+异步上传云存储
Powered By niaonao


原文地址:https://blog.csdn.net/niaonao/article/details/136472866

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!