软件编程
位置:首页>> 软件编程>> Android编程>> RxJava+Retrofit+OkHttp实现多文件下载之断点续传

RxJava+Retrofit+OkHttp实现多文件下载之断点续传

作者:wzgiceman  发布时间:2023-07-19 22:32:35 

标签:RxJava,Retrofit,OkHttp

背景

断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

效果

RxJava+Retrofit+OkHttp实现多文件下载之断点续传

实现

下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

1.创建service接口

和以前一样,先写接口

注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)


/*断点续传下载接口*/
 @Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
 @GET
 Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);

2.复写ResponseBody

和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调


/**
* 自定义进度的body
* @author wzg
*/
public class DownloadResponseBody extends ResponseBody {
 private ResponseBody responseBody;
 private DownloadProgressListener progressListener;
 private BufferedSource bufferedSource;

public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
   this.responseBody = responseBody;
   this.progressListener = progressListener;
 }

@Override
 public BufferedSource source() {
   if (bufferedSource == null) {
     bufferedSource = Okio.buffer(source(responseBody.source()));
   }
   return bufferedSource;
 }

private Source source(Source source) {
   return new ForwardingSource(source) {
     long totalBytesRead = 0L;
     @Override
     public long read(Buffer sink, long byteCount) throws IOException {
       long bytesRead = super.read(sink, byteCount);
       // read() returns the number of bytes read, or -1 if this source is exhausted.
       totalBytesRead += bytesRead != -1 ? bytesRead : 0;
       if (null != progressListener) {
         progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
       }
       return bytesRead;
     }
   };
 }
}

3.自定义进度回调接口


/**
* 成功回调处理
* Created by WZG on 2016/10/20.
*/
public interface DownloadProgressListener {
 /**
  * 下载进度
  * @param read
  * @param count
  * @param done
  */
 void update(long read, long count, boolean done);
}

4.复写Interceptor

复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody


/**
* 成功回调处理
* Created by WZG on 2016/10/20.
*/
public class DownloadInterceptor implements Interceptor {

private DownloadProgressListener listener;

public DownloadInterceptor(DownloadProgressListener listener) {
   this.listener = listener;
 }

@Override
 public Response intercept(Chain chain) throws IOException {
   Response originalResponse = chain.proceed(chain.request());

return originalResponse.newBuilder()
       .body(new DownloadResponseBody(originalResponse.body(), listener))
       .build();
 }
}

5.封装请求downinfo数据

这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据


public class DownInfo {
 /*存储位置*/
 private String savePath;
 /*下载url*/
 private String url;
 /*基础url*/
 private String baseUrl;
 /*文件总长度*/
 private long countLength;
 /*下载长度*/
 private long readLength;
 /*下载唯一的HttpService*/
 private HttpService service;
 /*回调监听*/
 private HttpProgressOnNextListener listener;
 /*超时设置*/
 private int DEFAULT_TIMEOUT = 6;
 /*下载状态*/
 private DownState state;
 }

6.DownState状态封装

很简单,和大多数封装框架一样


public enum DownState {
 START,
 DOWN,
 PAUSE,
 STOP,
 ERROR,
 FINISH,
}

7.请求HttpProgressOnNextListener回调封装类

注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听

通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活


/**
* 下载过程中的回调处理
* Created by WZG on 2016/10/20.
*/
public abstract class HttpProgressOnNextListener<T> {
 /**
  * 成功后回调方法
  * @param t
  */
 public abstract void onNext(T t);

/**
  * 开始下载
  */
 public abstract void onStart();

/**
  * 完成下载
  */
 public abstract void onComplete();

/**
  * 下载进度
  * @param readLength
  * @param countLength
  */
 public abstract void updateProgress(long readLength, long countLength);

/**
  * 失败或者错误方法
  * 主动调用,更加灵活
  * @param e
  */
  public void onError(Throwable e){

}

/**
  * 暂停下载
  */
 public void onPuase(){

}

/**
  * 停止下载销毁
  */
 public void onStop(){

}
}

8.封装回调Subscriber

准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中

  1. sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果

  2. 传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态

  3. 通过RxAndroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)

  4. update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)


/**
* 用于在Http请求开始时,自动显示一个ProgressDialog
* 在Http请求结束是,关闭ProgressDialog
* 调用者自己对请求数据进行处理
* Created by WZG on 2016/7/16.
*/
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
 //弱引用结果回调
 private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
 /*下载数据*/
 private DownInfo downInfo;

public ProgressDownSubscriber(DownInfo downInfo) {
   this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
   this.downInfo=downInfo;
 }

/**
  * 订阅开始时调用
  * 显示ProgressDialog
  */
 @Override
 public void onStart() {
   if(mSubscriberOnNextListener.get()!=null){
     mSubscriberOnNextListener.get().onStart();
   }
   downInfo.setState(DownState.START);
 }

/**
  * 完成,隐藏ProgressDialog
  */
 @Override
 public void onCompleted() {
   if(mSubscriberOnNextListener.get()!=null){
     mSubscriberOnNextListener.get().onComplete();
   }
   downInfo.setState(DownState.FINISH);
 }

/**
  * 对错误进行统一处理
  * 隐藏ProgressDialog
  *
  * @param e
  */
 @Override
 public void onError(Throwable e) {
   /*停止下载*/
   HttpDownManager.getInstance().stopDown(downInfo);
   if(mSubscriberOnNextListener.get()!=null){
     mSubscriberOnNextListener.get().onError(e);
   }
   downInfo.setState(DownState.ERROR);
 }

/**
  * 将onNext方法中的返回结果交给Activity或Fragment自己处理
  *
  * @param t 创建Subscriber时的泛型类型
  */
 @Override
 public void onNext(T t) {
   if (mSubscriberOnNextListener.get() != null) {
     mSubscriberOnNextListener.get().onNext(t);
   }
 }

@Override
 public void update(long read, long count, boolean done) {
   if(downInfo.getCountLength()>count){
     read=downInfo.getCountLength()-count+read;
   }else{
     downInfo.setCountLength(count);
   }
   downInfo.setReadLength(read);
   if (mSubscriberOnNextListener.get() != null) {
     /*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
     rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Action1<Long>() {
       @Override
       public void call(Long aLong) {
          /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
         if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
         downInfo.setState(DownState.DOWN);
         mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
       }
     });
   }
 }

}

9.下载管理类封装HttpDownManager

单利获取


/**
  * 获取单例
  * @return
  */
 public static HttpDownManager getInstance() {
   if (INSTANCE == null) {
     synchronized (HttpDownManager.class) {
       if (INSTANCE == null) {
         INSTANCE = new HttpDownManager();
       }
     }
   }
   return INSTANCE;
 }

因为单利所以需要记录正在下载的数据和回到sub


/*回调sub队列*/
 private HashMap<String,ProgressDownSubscriber> subMap;
 /*单利对象*/
 private volatile static HttpDownManager INSTANCE;

private HttpDownManager(){
   downInfos=new HashSet<>();
   subMap=new HashMap<>();
 }

开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)


/**
  * 开始下载
  */
 public void startDown(DownInfo info){
   /*正在下载不处理*/
   if(info==null||subMap.get(info.getUrl())!=null){
     return;
   }
   /*添加回调处理类*/
   ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
   /*记录回调sub*/
   subMap.put(info.getUrl(),subscriber);
   /*获取service,多次请求公用一个sercie*/
   HttpService httpService;
   if(downInfos.contains(info)){
     httpService=info.getService();
   }else{
     DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
     OkHttpClient.Builder builder = new OkHttpClient.Builder();
     //手动创建一个OkHttpClient并设置超时时间
     builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
     builder.addInterceptor(interceptor);

Retrofit retrofit = new Retrofit.Builder()
         .client(builder.build())
         .addConverterFactory(GsonConverterFactory.create())
         .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
         .baseUrl(info.getBaseUrl())
         .build();
     httpService= retrofit.create(HttpService.class);
     info.setService(httpService);
   }
   /*得到rx对象-上一次下載的位置開始下載*/
   httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
       /*指定线程*/
       .subscribeOn(Schedulers.io())
       .unsubscribeOn(Schedulers.io())
         /*失败后的retry配置*/
       .retryWhen(new RetryWhenNetworkException())
       /*读取下载写入文件*/
       .map(new Func1<ResponseBody, DownInfo>() {
         @Override
         public DownInfo call(ResponseBody responseBody) {
           try {
             writeCache(responseBody,new File(info.getSavePath()),info);
           } catch (IOException e) {
             /*失败抛出异常*/
             throw new HttpTimeException(e.getMessage());
           }
           return info;
         }
       })
       /*回调线程*/
       .observeOn(AndroidSchedulers.mainThread())
       /*数据回调*/
       .subscribe(subscriber);

}

写入文件

注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度


 /**
  * 写入文件
  * @param file
  * @param info
  * @throws IOException
  */
 public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
   if (!file.getParentFile().exists())
     file.getParentFile().mkdirs();
   long allLength;
   if (info.getCountLength()==0){
     allLength=responseBody.contentLength();
   }else{
     allLength=info.getCountLength();
   }
     FileChannel channelOut = null;
     RandomAccessFile randomAccessFile = null;
     randomAccessFile = new RandomAccessFile(file, "rwd");
     channelOut = randomAccessFile.getChannel();
     MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
         info.getReadLength(),allLength-info.getReadLength());
     byte[] buffer = new byte[1024*8];
     int len;
     int record = 0;
     while ((len = responseBody.byteStream().read(buffer)) != -1) {
       mappedBuffer.put(buffer, 0, len);
       record += len;
     }
     responseBody.byteStream().close();
       if (channelOut != null) {
         channelOut.close();
       }
       if (randomAccessFile != null) {
         randomAccessFile.close();
       }
 }

停止下载

调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)


/**
  * 停止下载
  */
 public void stopDown(DownInfo info){
   if(info==null)return;
   info.setState(DownState.STOP);
   info.getListener().onStop();
   if(subMap.containsKey(info.getUrl())) {
     ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
     subscriber.unsubscribe();
     subMap.remove(info.getUrl());
   }
   /*同步数据库*/
 }

暂停下载

原理和停止下载原理一样


/**
  * 暂停下载
  * @param info
  */
 public void pause(DownInfo info){
   if(info==null)return;
   info.setState(DownState.PAUSE);
   info.getListener().onPuase();
   if(subMap.containsKey(info.getUrl())){
     ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
     subscriber.unsubscribe();
     subMap.remove(info.getUrl());
   }
   /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
 }

暂停全部和停止全部下载任务
 


/**
  * 停止全部下载
  */
 public void stopAllDown(){
   for (DownInfo downInfo : downInfos) {
     stopDown(downInfo);
   }
   subMap.clear();
   downInfos.clear();
 }

/**
  * 暂停全部下载
  */
 public void pauseAll(){
   for (DownInfo downInfo : downInfos) {
     pause(downInfo);
   }
   subMap.clear();
   downInfos.clear();
 }

整合代码HttpDownManager

同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)

补充

有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

只需要替换DbUtil的方法即可

总结

到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

    1.Retrofit+Rxjava+okhttp基本使用方法
    2.统一处理请求数据格式
    3.统一的ProgressDialog和回调Subscriber处理
    4.取消http请求
    5.预处理http请求
    6.返回数据的统一判断
    7.失败后的retry封装处理
    8.RxLifecycle管理生命周期,防止泄露
    9.文件上传和文件下载(支持多文件断点续传)

源码:传送门-全部封装源码

来源:http://blog.csdn.net/wzgiceman/article/details/52911503

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com