软件编程
位置:首页>> 软件编程>> Android编程>> Android Rxjava3 使用场景详解

Android Rxjava3 使用场景详解

作者:香草可乐  发布时间:2023-08-06 08:58:50 

标签:Android,Rxjava3

一、Rxjava使用场景

为了模拟实际场景,从wanandroid网站找了二个接口,如下:(对Wanandroid表示感谢!)

public interface ApiServer {

/**
    * 接口一
    * 获取文章列表
    * @return
    */
   @GET("article/list/1/json")
   Observable<BaseResponse<ArticleListResp>> getArticleList();

/**
    * 接口二
    * 获取热词
    * @return
    */
   @GET("hotkey/json")
   Observable<BaseResponse<List<HotKeyResp.DataBean>>> getHotKey();

}

1、多任务嵌套回调

场景:比如调用接口一有回调后才能调用接口二,如果接口一调用失败不再调用接口二。下面是二种写法。

写法一,代码如下:

//为了看清楚代码,没有使用lambda简化
//接口一
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
//接口二
Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
Observable.just(articleList)
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .map(new Function<Observable<BaseResponse<ArticleListResp>>, Observable<BaseResponse<List<HotKeyResp.DataBean>>>>() {
           @Override
           public Observable<BaseResponse<List<HotKeyResp.DataBean>>> apply(Observable<BaseResponse<ArticleListResp>> baseResponseObservable) throws Throwable {
              //处理第一个请求返回的数据
               if(baseResponseObservable!=null) mTv.setText(baseResponseObservable.blockingSingle().toString());
               return hotKey;   //发起第二次网络请求
           }
       }).subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Consumer<Observable<BaseResponse<List<HotKeyResp.DataBean>>>>() {
           @Override
           public void accept(Observable<BaseResponse<List<HotKeyResp.DataBean>>> baseResponseObservable) throws Throwable {
               //处理第二次网络请求的结果
               if(baseResponseObservable!=null) mTvTwo.setText(baseResponseObservable.blockingSingle().toString());
           }
       }, new Consumer<Throwable>() {
           @Override
           public void accept(Throwable throwable) throws Throwable {
               //异常的处理:比如Dialog的Dismiss,缺省页展示等
               //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,如果多个请求同理
               //但是请求成功的还是能正常处理
               LogUtil.e(throwable.toString());
           }
       });

写法二,代码如下:

//为了看清楚代码,没有使用lambda简化
       //接口一
       Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
       //接口二
       Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
       //请求第一个
       articleList.subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .doOnNext(new Consumer<BaseResponse<ArticleListResp>>() {
                   @Override
                   public void accept(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
                       //处理第一个网络请求的结果
                       if(articleListRespBaseResponse!=null) mTv.setText(articleListRespBaseResponse.toString());
                   }
               }).observeOn(Schedulers.io())
               .flatMap(new Function<BaseResponse<ArticleListResp>, ObservableSource<BaseResponse<List<HotKeyResp.DataBean>>>>() {
                   @Override
                   public ObservableSource<BaseResponse<List<HotKeyResp.DataBean>>> apply(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
                       return hotKey;   //将第一个网络请求转换为第二个网络请求
                   }
               }).observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<BaseResponse<List<HotKeyResp.DataBean>>>() {
                   @Override
                   public void accept(BaseResponse<List<HotKeyResp.DataBean>> listBaseResponse) throws Throwable {
                       //处理第二次网络请求的结果
                       if(listBaseResponse!=null) mTvTwo.setText(listBaseResponse.toString());
                   }
               }, new Consumer<Throwable>() {
                   @Override
                   public void accept(Throwable throwable) throws Throwable {
                       //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,多个请求同理
                       //但是在异常前面已经成功的网络请求还是能正常处理
                       //异常的处理:比如Dialog的Dismiss,缺省页展示等
                       LogUtil.e(throwable.toString());
                   }
               });

注意异常处理和线程切换,其他细节代码和注释比较详细。

2、多任务合并处理

场景:接口一和接口二返回数据后一起处理。
代码如下:

private void zipRequest() {
   //为了看清楚代码,没有使用lambda简化
   //接口一
   Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
   //接口二
   Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
   Observable.zip(articleList, hotKey, this::combiNotification)  //传入方法定义合并规则
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Observer<String>() {
               @Override
               public void onSubscribe(@NonNull Disposable d) {

}

@Override
               public void onNext(@NonNull String msg) {
                   if(!TextUtils.isEmpty(msg)){
                       mTv.setText(msg);
                   }
               }

@Override
               public void onError(@NonNull Throwable e) {

}

@Override
               public void onComplete() {

}
           });

}

//合并的规则,以及定义合并的返回值
public String combiNotification(BaseResponse<ArticleListResp> articleListRespBaseResponse, BaseResponse<List<HotKeyResp.DataBean>> hotkeyResponse) {
   //比如这里取二个接口数据toString返回
   if (articleListRespBaseResponse != null && hotkeyResponse != null) {
       return articleListRespBaseResponse.toString() + hotkeyResponse.toString();
   }
   return null;
}

3、轮询

场景一:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,每次轮询必须等上一次轮询有结果后才能开始下一次轮询。

特别注意repeatWhen操作符,只有在repeatWhen的Function方法中发射onNext事件,重复(repeat)才能触发,发射onError或者onComplite都会结束重复(repeat),基于这一点,通过flatMap操作符将事件转化为延迟一定时间的onNext事件,就达到了延时轮询的目的。至于onNext事件发射的什么不重要。

延伸:retryWhen的Function方法发射onError事件才会重试(retry)。

takeUntil操作符可以定义一定的条件,当达到条件时自动结束整个事件的目的,事件结束时会回调subscribe。

代码如下:

/**
* 轮询
* @param pollingTimes 轮询的次数
*/
private void timedPolling(int pollingTimes) {
   AtomicInteger times = new AtomicInteger();
   Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
   articleList.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
       @Override
       public ObservableSource<?> apply(Observable<Object> objectObservable) throws Throwable {
           return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {  //转换事件
               @Override
               public ObservableSource<?> apply(Object o) throws Throwable {
                   //这里发射延时的onNext事件,触发repeat动作,发射的0不会回调到下面的subscribe
                   return Observable.just(0).delay(2, TimeUnit.SECONDS);  
               }
           });
       }
   }).subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           //takeUntil定义了二个结束条件:前面是达到了轮询的次数,后面是网络请求返回了成功,当然也可以写成代码块做其他的返回判断
           .takeUntil(response -> times.incrementAndGet() >= pollingTimes || response.getErrorCode() == 0)
           .subscribe(new Observer<BaseResponse<ArticleListResp>>() {
               @Override
               public void onSubscribe(@NonNull Disposable d) {

}

@Override
               public void onNext(@NonNull BaseResponse<ArticleListResp> articleListRespBaseResponse) {

}

@Override
               public void onError(@NonNull Throwable e) {

}

@Override
               public void onComplete() {

}
           });
}

如果想改成不限制次数的也比较简单。

场景二:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,这里的轮询不关心上次请求的结果。
代码如下:

/**
* 轮询一定的次数
* @param pollTimes 轮询次数
*/
private void timedPolling(int pollTimes) {
   //网络请求
   Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
   //返回值用于取消轮询
   mSubscribe = Observable.intervalRange(0, pollTimes, 0, 2000, TimeUnit.MILLISECONDS)
           .flatMap(new Function<Long, ObservableSource<BaseResponse<ArticleListResp>>>() {
               @Override
               public ObservableSource<BaseResponse<ArticleListResp>> apply(Long aLong) throws Throwable {
                   return articleList;  //转换事件
               }
           }).subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
               @Override
               public void accept(BaseResponse<ArticleListResp> listBaseResponse) throws Throwable {
                   //如果满足了退出轮询的条件,可以调用下面的方法退出轮询
                   //mSubscribe.dispose();
               }
           });
}

思路是定时发射事件,然后将事件转化为网络请求。同理可以写出不限次数的轮询。

场景三:不限次数轮询(间隔一定的时间),不关心上次请求的结果。

假如接口返回的code为0时需要取消轮询,代码如下:

Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
//返回值用于取消轮询
mSubscribe = Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
       .flatMap(new Function<Long, ObservableSource<BaseResponse<ArticleListResp>>>() {
           @Override
           public ObservableSource<BaseResponse<ArticleListResp>> apply(Long aLong) throws Throwable {
               return articleList;
           }
       })
       .takeUntil(response -> response.getErrorCode() == 0)  //使用takeUntil自动取消发射
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
           @Override
           public void accept(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
               //处理回调
           }
       }, new Consumer<Throwable>() {
           @Override
           public void accept(Throwable throwable) throws Throwable {
              //处理异常
           }
       });

如果是其他取消条件,也可以写在代码块里:

.takeUntil(response -> {
   //处理接口数据,然后判断是返回true还是false,true:停止发射,false:继续发射
   return false;
})  //使用takeUntil自动取消发射

不管何种轮询,注意在OnDestroy中取消。

4、其他小场景

1)倒计时

验证码的倒计时功能,代码如下:

/**
* 倒计时
* @param countDownSeconds 倒计时的秒数
*/
private void countDown(int countDownSeconds) {
   Observable.intervalRange(0, countDownSeconds, 0, 1000, TimeUnit.MILLISECONDS)
           .map(new Function<Long, String>() {
               @Override
               public String apply(Long aLong) throws Throwable {
                   return (countDownSeconds - aLong) + "s后重新获取";
               }
           }).observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Observer<String>() {
               @Override
               public void onSubscribe(@NonNull Disposable d) {
                   mTv.setEnabled(false);
               }

@Override
               public void onNext(@NonNull String s) {
                   mTv.setText(s);
               }

@Override
               public void onError(@NonNull Throwable e) {
                   mTv.setEnabled(true);
                   mTv.setText("获取验证码");
               }

@Override
               public void onComplete() {
                   mTv.setText("获取验证码");
                   mTv.setEnabled(true);
               }
           });
}

效果

Android Rxjava3 使用场景详解

2)打字机效果

几行代码实现打字机效果:


@RequiresApi(api = Build.VERSION_CODES.M)  //6.0
public class DaziView extends View {
   private TextPaint mTextPaint;
   private StaticLayout mStaticLayout;

public DaziView(Context context) {
       super(context,null);
   }

public DaziView(Context context, @Nullable AttributeSet attrs) {
       super(context, attrs);
       initTextPaint();
   }

/**
    * 初始化画笔
    */
   private void initTextPaint() {
       mTextPaint = new TextPaint(Paint.ANTI_ALIAS_FLAG);
       mTextPaint.setTextSize(48);
       mTextPaint.setColor(Color.parseColor("#000000"));
   }

/**
    * 绘制
    * @param content
    */
 public void drawText(String content){
       if(!TextUtils.isEmpty(content)){
           Observable.intervalRange(0,content.length()+1,0,150, TimeUnit.MILLISECONDS)
                   .subscribe(new Consumer<Long>() {
               @Override
               public void accept(Long aLong) throws Throwable {
               //动态改变文本长度
                   mStaticLayout = StaticLayout.Builder.obtain(content, 0, aLong.intValue(), mTextPaint, getWidth())
                           .build();
                   invalidate();
               }
           });
       }
 }

@Override
   protected void onDraw(Canvas canvas) {
       super.onDraw(canvas);
       //绘制文本
       mStaticLayout.draw(canvas);
   }
}

Android Rxjava3 使用场景详解

文本

<string name="dazi_content">\u3000\u3000你好,这是一个打字机,这是一个打字机这是一个打字机这是一个打字机。\n\u3000\u3000换行空格继续打印。</string>

二、结合Rxbinding的使用场景

RxBinding 提供的绑定能够将任何 Android View 事件转换为 Observable。

一旦将 View 事件转换为 Observable ,将发射数据流形式的 UI 事件,我们就可以订阅这个数据流,这与订阅其他 Observable 方式相同。

引入下面的库:

implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'

1、点击事件防抖

点击事件的写法:

RxView.clicks(button)   //button为控件
       .subscribe(new Consumer<Unit>() {
           @Override
           public void accept(Unit unit) throws Throwable {
               //点击事件
           }
       });

长点击事件的写法:

RxView.longClicks(button)
       .subscribe(new Consumer<Unit>() {
           @Override
           public void accept(Unit unit) throws Throwable {
               //长点击自动响应,不需要等放开手指
           }
       });

点击防抖事件的写法:

RxView.clicks(button)
       .throttleFirst(1000, TimeUnit.MILLISECONDS)   //一秒以内第一次点击事件有效
       .subscribe(new Consumer<Unit>() {
           @Override
           public void accept(Unit unit) throws Throwable {
               //点击事件
           }
       });

2、输入搜索优化

RxTextView.textChanges(editText)  //传入EditText控件
       .debounce(1000,TimeUnit.MILLISECONDS)  //一秒内没有新的事件时,取最后一次事件发射
       .skip(1)    //跳过第一次EditText的空内容
       .subscribeOn(AndroidSchedulers.mainThread())
       .subscribe(new Consumer<CharSequence>() {
           @Override
           public void accept(CharSequence charSequence) throws Throwable {
               //EditText的内容
           }
       }, new Consumer<Throwable>() {
           @Override
           public void accept(Throwable throwable) throws Throwable {

}
       });

3、联合判断

combineLatest 操作符将多个 Observable 发射的事件组装起来,然后再发射组装后的新事件。

Observable<CharSequence> observableEdittext = RxTextView.textChanges(editText).skip(1);
Observable<CharSequence> observableEdittextTwo =   RxTextView.textChanges(editText_two).skip(1);

Observable.combineLatest(observableEdittext, observableEdittextTwo, new BiFunction<CharSequence, CharSequence, Boolean>() {
    @Override
    public Boolean apply(CharSequence charSequence, CharSequence charSequence2) throws Throwable {
        if(!TextUtils.isEmpty(charSequence)&&!TextUtils.isEmpty(charSequence2)){
            return true;
        }
        return false;
    }
}).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Throwable {
            //TODO 其他处理
    }
});

三、防泄漏

1、Observable.unsubscribeOn

Observable<Integer> just = Observable.just(0);
just.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io());  //取消事件,防止泄漏

2、disposable.dispose

这个比较常用。

3、CompositeDisposable

对订阅事件统一管理

CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposableOne);
compositeDisposable.add(disposableTwo);
compositeDisposable.clear();

参考了以下文章,表示感谢:

最适合使用 RxJava 处理的四种场景

Android RxJava应用:网络请求轮询(有条件)

Rxjava3文档级教程三: 实战演练

来源:https://juejin.cn/post/7083120734003953695

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com