【代码片】文件多线程读取下载+异步上传云存储
1、文件读取下载转存cos业务方法
public Boolean transferStorageMaterial(Long startTime, Long endTime) {
// 按照时间段查询待转存储的数据
List<MaterialPrepareBo> materialPrepareList = materialVideoMapper.selectVideoNon2CosChuangliang(startTime, endTime);
if (CollectionUtils.isEmpty(materialPrepareList)) {
log.info("[Chuangliang素材转存]:查无待转存的素材, type:{}", type);
return true;
}
// 删除文件,批量上传。
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(12);
try {
// 待上传的文件集合
Map<String, MaterialPrepareBo> toHandleMd5Map = materialPrepareList.stream()
.collect(Collectors.toMap(MaterialPrepareBo::getMd5, materialPrepareBo -> materialPrepareBo, (existing, replacement) -> existing));
// 待上传集合-根据md5去重复,防止重复上传
List<String> toHandleDistinctMd5List = materialPrepareList.stream().map(MaterialPrepareBo::getMd5).distinct().collect(Collectors.toList());
// 待上传集合-记录文件上传结果(此处主要用于记录失败结果,成功的数据会异步更新)
ConcurrentHashMap<String, MaterialPrepareBo> md5CosUrlMap = new ConcurrentHashMap(toHandleDistinctMd5List.size());
List<Future<?>> futures = new ArrayList<>(toHandleDistinctMd5List.size());
for (String md5 : toHandleDistinctMd5List) {
MaterialPrepareBo materialPrepareBo = toHandleMd5Map.get(md5);
futures.add(executorService.submit(() -> {
try {
appgrowingMaterialService.batchDownloadAndUploadFile(materialPrepareBo);
md5CosUrlMap.put(md5, materialPrepareBo);
} catch (Exception e) {
materialPrepareBo.setErrorMsg("executorService error");
log.error("executorService error: {}", e);
}
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("executorService future.get error: {}", e);
}
}
// 最后处理上传结果
for (MaterialPrepareBo materialPrepareBo : materialPrepareList) {
String md5 = materialPrepareBo.getMd5();
if (StringUtils.isEmpty(md5)) {
materialPrepareBo.setCosUrl(CommonConstant.FAILED_MSG);
materialPrepareBo.setErrorMsg("file md5 StringUtils.isEmpty is true");
continue;
}
MaterialPrepareBo temp = md5CosUrlMap.get(md5);
materialPrepareBo.setCosPosterUrl(temp.getCosPosterUrl());
materialPrepareBo.setCosUrl(temp.getCosUrl());
materialPrepareBo.setErrorMsg(temp.getErrorMsg());
}
// 主要用于更新errorMsg,cosUrl已异步更新
materialVideoMapper.batchUpdateVideoCosUrl(materialPrepareList);
return true;
} catch (Exception e) {
log.error("[Chuangliang素材转存]:上传文件异常", e);
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
return false;
}
2、文件读取下载和上传业务方法
@Override
public String batchDownloadAndUploadFile( MaterialPrepareBo materialPrepareBo){
log.info("[创量素材]转存cos素材materialId: {}", materialPrepareBo.getMaterialId());
String url = materialPrepareBo.getUrl();
String path = "chuangliang/video/result".intern();
Path downloadedFile = null;
try {
int lastDotIndex = url.lastIndexOf('.');
if (-1 == lastDotIndex || 0 == lastDotIndex) {
log.error("Resource Unknown file type! url: {}",url);
materialPrepareBo.setErrorMsg("Resource Unknown file type!");
return CommonConstant.FAILED_MSG;
}
String fileType = url.substring(lastDotIndex + 1);
String todayPath = path + DateUtils.getTodayYMD();
// 下载文件的路径
String fileName = UUIDUtils.getUUID(false,true) + "." + fileType;
downloadedFile = Paths.get("D:\\"+fileName);
// 下载到本地
boolean result = downloadFile(url, downloadedFile.toString());
if (!result){
log.error("Resource downloaded fail! url: {}",url);
materialPrepareBo.setErrorMsg("素材流读取失败");
return CommonConstant.FAILED_MSG;
}
// 上传文件
asyncDoUpload(Files.readAllBytes(downloadedFile), fileName, todayPath, materialPrepareBo.getMaterialId(), materialPrepareBo.getType(), type);
return CommonConstant.SUCCESS;
} catch (Exception e) {
materialPrepareBo.setErrorMsg("下载和上传文件失败");
log.error("下载和上传文件失败, 异常: {}", e);
}finally {
// 删除本地文件
if (Objects.nonNull(downloadedFile)) {
try {
deleteLocalFile(downloadedFile);
} catch (IOException e) {
log.warn("删除临时文件失败, 异常: {}", e);
}
}
}
return CommonConstant.FAILED_MSG;
}
3、文件同步读取下载
private boolean downloadFile(String fileUrl, String destination) throws IOException {
try (BufferedInputStream in = new BufferedInputStream(new URL(fileUrl).openStream());
FileOutputStream fos = new FileOutputStream(destination)) {
long start = System.currentTimeMillis();
byte[] dataBuffer = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
fos.write(dataBuffer, 0, bytesRead);
}
log.info("素材流读取耗时:{}, 文件地址: {}", System.currentTimeMillis() - start, fileUrl);
return true;
} catch (IOException e) {
log.error("素材流读取失败:{}", e.getMessage());
return false;
}
}
4、文件异步上传cos
@Async
public String asyncDoUpload(byte[] resource, String fileName, String path, Long materialId) {
long start = System.currentTimeMillis();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
List<UploadUtils.Bytes> bytes = new ArrayList<>();
bytes.add(new UploadUtils.Bytes("file", fileName, resource));
HashMap<String, String> postParam = new HashMap<>();
postParam.put("path", path);
postParam.put("bucketName", BUCKET_NAME);
int currentAttempt = 0;
while (currentAttempt < 3) {
currentAttempt++;
try {
Map<String, Object> resultMap = UploadUtils.uploadByteByHTTP(bytes, uploadDomain.concat(UPLOAD_URL), postParam);
log.info("调用上传文件服务耗时:{}, url :{} ,返回结果为:{} ", System.currentTimeMillis() - start, uploadDomain.concat(UPLOAD_URL), resultMap);
if (resultMap.isEmpty()) {
log.info("文件上传返回结果为空,上传失败");
Thread.sleep(1000);
continue;
}
if (!resultMap.get("statusCode").toString().equals(String.valueOf(HttpStatus.OK.value()))) {
log.error("调用上传服务http状态异常!状态码:{}", resultMap.get("statusCode").toString());
}
else {
CosUploadDto data = JsonUtils.fromJson(resultMap.get("data").toString(), CosUploadDto.class);
materialVideoMapper.updateCosUrlByMaterialId(data.getData(), materialId);
return data.getData();
}
} catch (Exception e) {
log.error("上传文件时发生异常:" + e);
}
//等待1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.error("文件上传失败,已达到最大重试次数");
return "";
}
5、自定义线程池
@Bean(name = "TranslatePool")
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(keepAlive);
executor.setQueueCapacity(queueCapacity);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
6、线程池任务单个提交
// 待上传集合-根据md5去重复,防止重复上传
List<String> toHandleDistinctMd5List = todoHandleList.stream().map(MaterialPrepareBo::getMd5).distinct().collect(Collectors.toList());
// 待上传集合-记录文件上传结果(此处主要用于记录失败结果,成功的数据会异步更新)
ConcurrentHashMap<String, MaterialPrepareBo> md5CosUrlMap = new ConcurrentHashMap(toHandleDistinctMd5List.size());
for (String md5 : toHandleDistinctMd5List) {
MaterialPrepareBo materialPrepareBo = toHandleMd5Map.get(md5);
executorService.submit(() -> {
try {
appgrowingMaterialService.batchDownloadAndUploadFile(materialPrepareBo.getUrl(), type, materialPrepareBo);
md5CosUrlMap.put(md5, materialPrepareBo);
} catch (Exception e) {
materialPrepareBo.setErrorMsg("executorService error");
log.error("executorService error: {}", e);
}
});
}
executorService.awaitTermination(Long.MIN_VALUE, TimeUnit.MICROSECONDS);
参考文章:文件多线程读取下载+异步上传云存储
Powered By niaonao
原文地址:https://blog.csdn.net/niaonao/article/details/136472866
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!