RxJava加Retrofit文件分段上传实现详解
作者:Chavin 发布时间:2022-03-02 05:36:10
前言
本文基于 RxJava 和 Retrofit 库,设计并实现了一种用于大文件分块上传的工具,并对其进行了全面的拆解分析。抛砖引玉,对同样有处理文件分块上传诉求的读者,可能会起到一定的启发作用。
文章主体由四部分构成:
首先分析问题,问题拆解为:多线程分段读取文件、构建和发出文件片段上传请求
基于 JDK 随机读取文件的类库,设计本地多线程分段读取文件的单元
基于 Retrofit 设计由文件片段构建上传的网络请求
从上述设计演变而来的完整代码实现
  另外,在文章提供的完整代码中,还附了一段由 PHP 编写,用来接收多线程分段数据的服务端接口实现,其中处理了因客户端都线程上传片段,导致服务端接收的文件片段无序,故需在适当时机合并分块构成目标文件。
受限于笔者的开发经验与理论理解,文章的思路和代码难免可能有偏颇,对于有改进和优化的部分,欢迎大家讨论区提出。
问题拆解
要完成文件分段上传到服务端,第一步是分段读取本地文件。通常分段是为了多线程同时执行上传,提高设备计算和网络资源利用率,减少上传时间优化体验,这样即需要一个支持多线程的文件分段读取工具。由于文件可能超过设备内存大小,在读取这类超大文件时需要控制最大读取量防止内存溢出。此时文件已从磁盘数据转换为内存中的字节数据,只需要将这些内存数据传给服务端即可。这样问题被分成 3 个子问题:
分段读取文件到内存中
控制多线程数量
将文件片段传给服务端
问题 1 很好解决,利用 Java 的 RandomAccessFile
可对文件的随机读取的特性,即可按需读取文件片段到内存中。
问题 2 相对复杂一点,但如果有阅读过 JDK 中线程池源码的读者,就会发现这个问题的和控制线程池中线程数量其实是类似的。
问题 3 就不复杂了,Retrofit 基于 OKhttp ,OkHttp是很容易基于字节数组构建 multipart/form-data
请求的。
分块并发读取文件
根据上述对问题 1、2 的拆解,可将读取抽象为一个文件读取器,构建时传入文件对象和分段大小以及最大并发数,以及分段数据的回调。当外部启动读取时将根据文件大小和配置的分段大小构建若干个 Task 用于读取对应片段的数据。
public BlockReader(@NotNull File file, @NotNull BlockCallback callback, int poolSize, int blockSize) {
mFile = file;
mCallback = callback;
mPoolSize = poolSize;
mBlockSize = blockSize;
}
public void start(@Nullable BlockFilter filter) {
Observable.empty().observeOn(Schedulers.computation()).doOnComplete(() -> {
long length = mFile.length();
for (long offset = 0; offset < length; offset += mBlockSize) {
if (null != filter && filter.ignore(offset)) {
continue;
}
mQueue.offer(new ReadTask(offset));
}
for (int i = 0; i < Math.min(mPoolSize, mQueue.size()); i++) {
Observable.empty().observeOn(Schedulers.io()).doOnComplete(this::schedule).subscribe();
}
}).subscribe();
}
多线程调度部分,可通过加锁和记录状态变量统计当前正运行的线程数,则可控制字节数组数,这样就相当于控制住了最大内存占用。
private void schedule() {
if (mRunning.get() >= mPoolSize) {
return;
}
ReadTask task;
synchronized (mQueue) {
if (mRunning.get() >= mPoolSize) {
return;
}
task = mQueue.poll();
if (null != task) {
mRunning.incrementAndGet();
}
}
if (null != task) {
task.run();
}
}
最后是文件随机读取,直接调用 RandomAccessFile
的 API 即可:
private class ReadTask implements Action {
@Override
public void run() {
try (RandomAccessFile raf = new RandomAccessFile(mFile, RAF_MODE);
ByteArrayOutputStream out = new ByteArrayOutputStream(mBlockSize)) {
raf.seek(mOffset);
byte[] buf = new byte[DEF_BLOCK_SIZE];
long cnt = 0;
for (int bytes = raf.read(buf); bytes != -1 && cnt < mBlockSize; bytes = raf.read(buf)) {
out.write(buf, 0, bytes);
cnt += bytes;
}
out.flush();
mCallback.onFinished(mOffset, out.toByteArray());
} catch (IOException e) {
mCallback.onFinished(mOffset, null);
} finally {
mRunning.decrementAndGet();
schedule();
}
}
}
文件片段上传
上传部分则使用 Retrofit 提供的注解和 OKHttp 的类库构建请求。但值得一提的是需要在磁盘IO线程同步完成网络IO,这样可以避免网络IO速度落后磁盘IO太多而导致任务堆积造成内存溢出。
public interface BlockUploader {
@POST("test/upload.php")
@Multipart
Single<Response<ResponseBody>> upload(@Header("filename") String filename,
@Header("total") long total,
@Header("offset") long offset,
@Part List<MultipartBody.Part> body);
}
private static void syncUpload(String fileName, long fileLength, long offset, byte[] bytes) {
RequestBody data = RequestBody.create(MediaType.parse("application/octet-stream"), bytes);
MultipartBody body = new MultipartBody.Builder()
.addFormDataPart("file", fileName, data)
.setType(MultipartBody.FORM)
.build();
retrofit.create(BlockUploader.class).upload(fileName, fileLength, offset, body.parts()).subscribe(resp -> {
if (resp.isSuccessful()) {
System.out.println("✓ offset: " + offset + " upload succeed " + resp.code());
} else {
System.out.println("✗ offset: " + offset + " upload failed " + resp.code());
}
}, throwable -> {
System.out.println("! offset: " + offset + " upload failed");
});
}
完整代码
为控制篇幅,完整代码请移步 Github,服务端部分处理形如:
来源:https://juejin.cn/post/7183887127992598585
猜你喜欢
- tcp客户端示例#include <errno.h> #include <sys/socket.h> #includ
- 本文实例讲述了C#实现导入CSV文件到Excel工作簿的方法。分享给大家供大家参考。具体如下:你必须在项目中添加对 Microsoft.Of
- 本文以新建的CUDA的.cu程序来进行说明,同样也适用于C程序。一,发现问题1,首先我们在vs2019中创建了工程以后(我所创建的工程名称为
- 微信平台开放后倒是挺火的,许多第三方应用都想试下,毕竟可以利用微信建立起来的关系链来拓展自己的应用还是挺不错的,可以节约很多在社交方面的开销
- package com.robin;import java.io.File;import java.io.FileInputStream;i
- 1、CyclicBarrier:一个同步辅助类,用于协调多个子线程,让多个子线程在这个屏障前等待,直到所有子线程都到达了这个屏障时,再一起继
- 本文为大家整理了C#图片切割、图片压缩、缩略图生成的实现代码,大家可以收藏,方便以后使用,具体内容如下/// 图片切割函数 /// <
- 1.文件页面编码导致的乱码。每一个文件(java,js,jsp,html等)都有其本身的编码格式,文件中的代码在一种编码中显示正常,在另外一
- 一、在pom.xml中配置jetty插件: <build> <plugins> <p
- 如果想你写的程序随系统开机一起启动的话,那么你可以照下面这个方法来做。 RunWhenStart(false, Applicati
- 1. System.Char 字符char 是 System.Char 的别名。System.Char 占两个字节,16个二进制位。Syst
- 这篇文章主要介绍了SpringBoot文件访问映射如何实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要
- 最近项目上的一个上传文件功能,项目是MVC+EF+LigerUI 来做的,贴出来大家一起分享下1、页面需要引用这个JS 和 CSS<s
- 简介方案对比本处列举表示类型或状态的常用方法的对比。法1:使用数字表示(不推荐)//1:支付宝支付;2:微信支付;3:银行卡支付privat
- 1.根据单个分隔字符用split截取例如string st="GT123_1";string[] sArray=st.s
- 1.本系统和现在有的考试系统有以下几种优势:a.和现在有的系统比较起来,本系统有科目、章节、老师、学生、班级等信息的管理,还有批阅试卷查看已
- 本文实例讲述了Java实现指定线程执行顺序的三种方式。分享给大家供大家参考,具体如下:方法一:通过共享对象锁加上可见变量来实现。public
- 目录1、二分查找算法思想2、二分查找图示说明3、二分查找优缺点3、java代码实现3.1 使用递归实现3.1 不使用递归实现(while循环
- 本文介绍了Maven构建自己的第一个Java后台的方法,分享给大家,具体如下:1.知识后顾关于如何运用Maven构建自己的第一个项目,上期我
- 平时写项目的时候,java之父叫我们多打日志,我们通常使用traceId和requestId来保存完整请求的链路日志,例如市面上的skywa