RxJava+Retrofit+OkHttp实现多文件下载之断点续传
作者:wzgiceman 发布时间:2023-07-19 22:32:35
背景
断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!
效果
实现
下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理
1.创建service接口
和以前一样,先写接口
注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)
/*断点续传下载接口*/
@Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
@GET
Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);
2.复写ResponseBody
和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调
/**
* 自定义进度的body
* @author wzg
*/
public class DownloadResponseBody extends ResponseBody {
private ResponseBody responseBody;
private DownloadProgressListener progressListener;
private BufferedSource bufferedSource;
public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
if (null != progressListener) {
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
}
return bytesRead;
}
};
}
}
3.自定义进度回调接口
/**
* 成功回调处理
* Created by WZG on 2016/10/20.
*/
public interface DownloadProgressListener {
/**
* 下载进度
* @param read
* @param count
* @param done
*/
void update(long read, long count, boolean done);
}
4.复写Interceptor
复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody
/**
* 成功回调处理
* Created by WZG on 2016/10/20.
*/
public class DownloadInterceptor implements Interceptor {
private DownloadProgressListener listener;
public DownloadInterceptor(DownloadProgressListener listener) {
this.listener = listener;
}
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new DownloadResponseBody(originalResponse.body(), listener))
.build();
}
}
5.封装请求downinfo数据
这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据
public class DownInfo {
/*存储位置*/
private String savePath;
/*下载url*/
private String url;
/*基础url*/
private String baseUrl;
/*文件总长度*/
private long countLength;
/*下载长度*/
private long readLength;
/*下载唯一的HttpService*/
private HttpService service;
/*回调监听*/
private HttpProgressOnNextListener listener;
/*超时设置*/
private int DEFAULT_TIMEOUT = 6;
/*下载状态*/
private DownState state;
}
6.DownState状态封装
很简单,和大多数封装框架一样
public enum DownState {
START,
DOWN,
PAUSE,
STOP,
ERROR,
FINISH,
}
7.请求HttpProgressOnNextListener回调封装类
注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听
通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活
/**
* 下载过程中的回调处理
* Created by WZG on 2016/10/20.
*/
public abstract class HttpProgressOnNextListener<T> {
/**
* 成功后回调方法
* @param t
*/
public abstract void onNext(T t);
/**
* 开始下载
*/
public abstract void onStart();
/**
* 完成下载
*/
public abstract void onComplete();
/**
* 下载进度
* @param readLength
* @param countLength
*/
public abstract void updateProgress(long readLength, long countLength);
/**
* 失败或者错误方法
* 主动调用,更加灵活
* @param e
*/
public void onError(Throwable e){
}
/**
* 暂停下载
*/
public void onPuase(){
}
/**
* 停止下载销毁
*/
public void onStop(){
}
}
8.封装回调Subscriber
准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中
sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果
传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态
通过RxAndroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)
update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)
/**
* 用于在Http请求开始时,自动显示一个ProgressDialog
* 在Http请求结束是,关闭ProgressDialog
* 调用者自己对请求数据进行处理
* Created by WZG on 2016/7/16.
*/
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
//弱引用结果回调
private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
/*下载数据*/
private DownInfo downInfo;
public ProgressDownSubscriber(DownInfo downInfo) {
this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
this.downInfo=downInfo;
}
/**
* 订阅开始时调用
* 显示ProgressDialog
*/
@Override
public void onStart() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onStart();
}
downInfo.setState(DownState.START);
}
/**
* 完成,隐藏ProgressDialog
*/
@Override
public void onCompleted() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onComplete();
}
downInfo.setState(DownState.FINISH);
}
/**
* 对错误进行统一处理
* 隐藏ProgressDialog
*
* @param e
*/
@Override
public void onError(Throwable e) {
/*停止下载*/
HttpDownManager.getInstance().stopDown(downInfo);
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onError(e);
}
downInfo.setState(DownState.ERROR);
}
/**
* 将onNext方法中的返回结果交给Activity或Fragment自己处理
*
* @param t 创建Subscriber时的泛型类型
*/
@Override
public void onNext(T t) {
if (mSubscriberOnNextListener.get() != null) {
mSubscriberOnNextListener.get().onNext(t);
}
}
@Override
public void update(long read, long count, boolean done) {
if(downInfo.getCountLength()>count){
read=downInfo.getCountLength()-count+read;
}else{
downInfo.setCountLength(count);
}
downInfo.setReadLength(read);
if (mSubscriberOnNextListener.get() != null) {
/*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
/*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
downInfo.setState(DownState.DOWN);
mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
}
});
}
}
}
9.下载管理类封装HttpDownManager
单利获取
/**
* 获取单例
* @return
*/
public static HttpDownManager getInstance() {
if (INSTANCE == null) {
synchronized (HttpDownManager.class) {
if (INSTANCE == null) {
INSTANCE = new HttpDownManager();
}
}
}
return INSTANCE;
}
因为单利所以需要记录正在下载的数据和回到sub
/*回调sub队列*/
private HashMap<String,ProgressDownSubscriber> subMap;
/*单利对象*/
private volatile static HttpDownManager INSTANCE;
private HttpDownManager(){
downInfos=new HashSet<>();
subMap=new HashMap<>();
}
开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)
/**
* 开始下载
*/
public void startDown(DownInfo info){
/*正在下载不处理*/
if(info==null||subMap.get(info.getUrl())!=null){
return;
}
/*添加回调处理类*/
ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
/*记录回调sub*/
subMap.put(info.getUrl(),subscriber);
/*获取service,多次请求公用一个sercie*/
HttpService httpService;
if(downInfos.contains(info)){
httpService=info.getService();
}else{
DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
OkHttpClient.Builder builder = new OkHttpClient.Builder();
//手动创建一个OkHttpClient并设置超时时间
builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
builder.addInterceptor(interceptor);
Retrofit retrofit = new Retrofit.Builder()
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(info.getBaseUrl())
.build();
httpService= retrofit.create(HttpService.class);
info.setService(httpService);
}
/*得到rx对象-上一次下載的位置開始下載*/
httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
/*指定线程*/
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
/*失败后的retry配置*/
.retryWhen(new RetryWhenNetworkException())
/*读取下载写入文件*/
.map(new Func1<ResponseBody, DownInfo>() {
@Override
public DownInfo call(ResponseBody responseBody) {
try {
writeCache(responseBody,new File(info.getSavePath()),info);
} catch (IOException e) {
/*失败抛出异常*/
throw new HttpTimeException(e.getMessage());
}
return info;
}
})
/*回调线程*/
.observeOn(AndroidSchedulers.mainThread())
/*数据回调*/
.subscribe(subscriber);
}
写入文件
注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度
/**
* 写入文件
* @param file
* @param info
* @throws IOException
*/
public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
if (!file.getParentFile().exists())
file.getParentFile().mkdirs();
long allLength;
if (info.getCountLength()==0){
allLength=responseBody.contentLength();
}else{
allLength=info.getCountLength();
}
FileChannel channelOut = null;
RandomAccessFile randomAccessFile = null;
randomAccessFile = new RandomAccessFile(file, "rwd");
channelOut = randomAccessFile.getChannel();
MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
info.getReadLength(),allLength-info.getReadLength());
byte[] buffer = new byte[1024*8];
int len;
int record = 0;
while ((len = responseBody.byteStream().read(buffer)) != -1) {
mappedBuffer.put(buffer, 0, len);
record += len;
}
responseBody.byteStream().close();
if (channelOut != null) {
channelOut.close();
}
if (randomAccessFile != null) {
randomAccessFile.close();
}
}
停止下载
调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)
/**
* 停止下载
*/
public void stopDown(DownInfo info){
if(info==null)return;
info.setState(DownState.STOP);
info.getListener().onStop();
if(subMap.containsKey(info.getUrl())) {
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*同步数据库*/
}
暂停下载
原理和停止下载原理一样
/**
* 暂停下载
* @param info
*/
public void pause(DownInfo info){
if(info==null)return;
info.setState(DownState.PAUSE);
info.getListener().onPuase();
if(subMap.containsKey(info.getUrl())){
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
}
暂停全部和停止全部下载任务
/**
* 停止全部下载
*/
public void stopAllDown(){
for (DownInfo downInfo : downInfos) {
stopDown(downInfo);
}
subMap.clear();
downInfos.clear();
}
/**
* 暂停全部下载
*/
public void pauseAll(){
for (DownInfo downInfo : downInfos) {
pause(downInfo);
}
subMap.clear();
downInfos.clear();
}
整合代码HttpDownManager
同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)
补充
有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)
只需要替换DbUtil的方法即可
总结
到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!
1.Retrofit+Rxjava+okhttp基本使用方法
2.统一处理请求数据格式
3.统一的ProgressDialog和回调Subscriber处理
4.取消http请求
5.预处理http请求
6.返回数据的统一判断
7.失败后的retry封装处理
8.RxLifecycle管理生命周期,防止泄露
9.文件上传和文件下载(支持多文件断点续传)
源码:传送门-全部封装源码
来源:http://blog.csdn.net/wzgiceman/article/details/52911503


猜你喜欢
- 前言不久之前,部门进行了一次代码评审。代码整体比较简单,该吹B的地方都已经吹过了,无非是些if else的老问题而已。当翻到一段定时任务的一
- RESTful 一种软件架构风格,设计风格而不是标准,只是提供了一组设计原则和约束条件。它主要用于客户端和服务器交互类的软件。基于这个风格设
- 前言:前面总结学习了图片的使用以及Lru算法,今天来学习一下比较优秀的图片缓存开源框架。技术本身就要不断的更迭,从最初的自己使用SoftRe
- 本人工作有一个月多了。对于android很多东西,都有了新的了解或者说真正的掌握。为了让更多的像我这样的小白少走弯路,所以我会坚持将我在工作
- 目录为什么要用Geometry数据做图标?怎么获取Geometry数据?如何使用Geometry数据相信大家在阅读WPF相关GitHub开源
- 一.服务器端获取Session对象依赖于客户端携带的Cookie中的JSESSIONID数据。如果用户把浏览器的隐私级别调到最高,这时浏览器
- 我就废话不多说了,大家还是直接看代码吧~Caused by: java.net.SocketException: Software caus
- JAVA并发总览核心问题并不是程序的漏洞导致的,而是操作系统底层机制导致的原子性:可见性问题:改的是缓存,但是缓存对另一个线程不可见有序性问
- 前言每种语言都会有字符串的操作,因为字符串是我们平常开发使用频率最高的一种类型。今天我们来聊一下Java的字符串操作及在某些具体方法中与C#
- 首先打开 Visual Studio Installer 可以看到vs2022 只支持安装4.6及以上的版本,如图所示。那么该如何安装4.6
- 前言在阅读本篇文章时请关注如下问题:1.什么是三层架构?2.为什么使用三层架构?3.三层与以往使用的两层相比有什么不同?它的优势在哪里?4.
- C#字符集编码的使用ASCII:西欧字符集GB2312:国家简体中文字符集,兼容ASCII。BIG5:统一繁体字编码GBK:它是GB2312
- 前言:平时打开手机的应用时,会跳出来3秒钟的广告后,再进入应用。今天我们就来简单实现一下引导页的功能。1、首先,新建一个activity页面
- 本文为大家分享了Java多线程实现Runnable方式的具体方法,供大家参考,具体内容如下(一)步骤 1.定义实现Runnable
- 前言大家好,我是小郭,前面我们学习了利用Semaphore来防止多线程同时操作一个资源,通常我们都会利用并行来优化性能,但是对于串行化的业务
- 本文实例讲述了C#实现图片切割的方法。分享给大家供大家参考,具体如下:图片切割就是把一幅大图片按用户要求切割成多幅小图片。dotnet环境下
- 0.引言死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306中的30分钟后未支付订单取消。那么本期
- 依赖配置结合前面的内容,这里我们要嵌入数据库的操作,这里以操作MySQL为例整合Mybatis,首先需要在原来的基础上添加以下依赖<!
- Java停止线程的逻辑(协同、通知)在Java程序中,我们想要停止一个线程可以通过interrupt方法进行停止。但是当我们调用interr
- 推荐激活教程IntelliJ IDEA 2020最新激活码(亲测有效,可激活至 2089 年)最新idea2021注册码永久激活(激活到21