在Android中使用WebSocket实现消息通信的方法详解
作者:黄林晴 发布时间:2022-06-10 06:26:18
前言
消息推送功能可以说移动APP不可缺少的功能之一,一般简单的推送我们可以使用第三方推送的SDK,比如极光推送、信鸽推送等,但是对于消息聊天这种及时性有要求的或者三方推送不满足业务需求的,我们就需要使用WebSocket实现消息推送功能。
基本流程
WebSocket是什么,这里就不做介绍了,我们这里使用的开源框架是https://github.com/TakahikoKawasaki/nv-websocket-client
基于开源协议我们封装实现WebSocket的连接、注册、心跳、消息分发、超时任务功能,基本流程如下:
连接功能
首先我们新建一个项目,在build.grade中添加配置
compile 'com.neovisionaries:nv-websocket-client:2.2'
新建websocket管理类WsManger
public class WsManager {
private volatile static WsManager wsManger;
private WsManager() {
}
public static WsManager getWsManger() {
if (wsManger == null) {
synchronized (WsManager.class) {
if (wsManger == null) {
wsManger = new WsManager();
}
}
}
return wsManger;
}
}
接下来添加连接方法,我们将webSocket的状态分为三种,新建WsStatue枚举类对应起来
public enum WsStatus {
/**
* 连接成功
*/
CONNECT_SUCCESS,
/**
* 连接失败
*/
CONNECT_FAIL,
/**
* 正在连接
*/
CONNECTING;
}
连接方法如下所示:
/**
* 连接方法 这里要判断是否登录 此处省略
*/
public void connect() {
//WEB_SOCKET_API 是连接的url地址,
// CONNECT_TIMEOUT是连接的超时时间 这里是 5秒
try {
ws = new WebSocketFactory().createSocket(WEB_SOCKET_API, CONNECT_TIMEOUT)
//设置帧队列最大值为5
.setFrameQueueSize(5)
//设置不允许服务端关闭连接却未发送关闭帧
.setMissingCloseFrameAllowed(false)
//添加回调监听
.addListener(new WsListener())
//异步连接
.connectAsynchronously();
} catch (IOException e) {
e.printStackTrace();
}
setStatus(WsStatus.CONNECTING);
}
调用连接方法后 我们来看连接的回调 也就是WsListener
/**
* websocket回调事件
*/
private class WsListener extends WebSocketAdapter {
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
Log.d(TAG, "onConnected: 连接成功");
}
@Override
public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
Log.d(TAG, "onConnectError: 连接失败");
}
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
WebSocketFrame clientCloseFrame,
boolean closedByServer) throws Exception {
Log.d(TAG, "onDisconnected: 断开连接");
}
@Override
public void onTextMessage(WebSocket websocket, String text) throws Exception {
Log.d(TAG, "onTextMessage: 收到消息:" + text);
}
}
下面我们调用连接方法
WsManager.getWsManger().connect();
运行项目我们可以看到如下打印:
此处我们要做的处理是,如果收到连接失败或者断开连接的回调 需要重新连接,我们重新调用一次连接方法即可,并且如果超过三次重连失败,我们在业务中可以通过调用接口来获取数据,避免数据丢失,此处细节省略。
协议封装
此处协议如下所示:
{
"action":"",
"requestChild":{
"clientType":"",
"id":""
}
}
心跳、发送请求都属于客户端主动发送请求,对于请求结果我们分为成功和失败以及超时,发送超时我们是收不到服务器任何回复的,所以我们需要在发送之后将发送放在超时任务队列中,如果请求成功将任务从超时队列中移除,超时从超时队列中获取任务重新请求。
超时任务队列中回调有成功、失败、超时。
我们按照上述协议,新增对应实体类,采用Builder设计模式
public class Request {
/**
* 行为
*/
private String action;
/**
* 请求体
*/
private RequestChild req;
/**
* 请求次数
*/
private transient int reqCount;
/**
* 超时的时间
*/
private transient int timeOut;
public Request() {
}
public Request(String action, int reqCount, int timeOut, RequestChild req) {
this.action = action;
this.req = req;
this.reqCount = reqCount;
this.timeOut = timeOut;
}
public static class Builder {
//action 请求类型
private String action;
//请求子类数据 按照具体业务划分
private RequestChild req;
//请求次数 便于重试
private int reqCount;
//超时时间
private int timeOut;
public Builder action(String action) {
this.action = action;
return this;
}
public Builder req(RequestChild req) {
this.req = req;
return this;
}
public Builder reqCount(int reqCount) {
this.reqCount = reqCount;
return this;
}
public Builder timeOut(int timeOut) {
this.timeOut = timeOut;
return this;
}
public Request build() {
return new Request(action, reqCount, timeOut, req);
}
}
}
public class RequestChild {
/**
* 设备类型
*/
private String clientType;
/**
* 用于用户注册的id
*/
private String id;
public RequestChild(String clientType, String id) {
this.clientType = clientType;
this.id = id;
}
public RequestChild() {
}
public static class Builder {
private String clientType;
private String id;
public RequestChild.Builder setClientType(String clientType) {
this.clientType = clientType;
return this;
}
public RequestChild.Builder setId(String id) {
this.id = id;
return this;
}
public RequestChild build() {
return new RequestChild(clientType, id);
}
}
}
我们添加一个发送请求的方法如下:
/**
* 发送请求
*
* @param request 请求体
* @param reqCount 请求次数
* @param requestListern 请求回调
*/
private void senRequest(Request request, final int reqCount, final RequestListern requestListern) {
if (!isNetConnect()) {
requestListern.requestFailed("网络未连接");
return;
}
}
请求回调如下所示
public interface RequestListern {
/**
* 请求成功
*/
void requestSuccess();
/**
* 请求失败
*
* @param message 请求失败消息提示
*/
void requestFailed(String message);
}
接着我们要把请求放在超时队列中,新建超时任务类,对应的分别是请求参数、请求回调、任务调度
public class TimeOutTask {
/**
* 请求主体
*/
private Request request;
/**
* 通用返回
*/
private RequestCallBack requestCallBack;
/**
* r任务
*/
private ScheduledFuture scheduledFuture;
public TimeOutTask(Request request,
RequestCallBack requestCallBack,
ScheduledFuture scheduledFuture) {
this.request = request;
this.requestCallBack = requestCallBack;
this.scheduledFuture = scheduledFuture;
}
public ScheduledFuture getScheduledFuture() {
return scheduledFuture;
}
public void setScheduledFuture(ScheduledFuture scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
public Request getRequest() {
return request;
}
public void setRequest(Request request) {
this.request = request;
}
public RequestCallBack getRequestCallBack() {
return requestCallBack;
}
public void setRequestCallBack(RequestCallBack requestCallBack) {
this.requestCallBack = requestCallBack;
}
}
RequestCallBack是超时任务的回调,只是比请求回调多了个超时,因为超时的处理机制是一样的,所以这里我们没必要将超时回调到请求中
public interface RequestCallBack {
/**
* 请求成功
*/
void requestSuccess();
/**
* 请求失败
*
* @param request 请求体
* @param message 请求失败的消息
*/
void requestFailed(String message, Request request);
/**
* 请求超时
*
* @param request 请求体
*/
void timeOut(Request request);
}
/**
* 添加超时任务
*/
private ScheduledFuture enqueueTimeout(final Request request, final long timeout) {
Log.d(TAG, " " + "enqueueTimeout: 添加超时任务类型为:" + request.getAction());
return executor.schedule(new Runnable() {
@Override
public void run() {
TimeOutTask timeoutTask = callbacks.remove(request.getAction());
if (timeoutTask != null) {
timeoutTask.getRequestCallBack().timeOut(timeoutTask.getRequest());
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
超时任务的方法是通过任务调度定时调用,请求成功后我们会把超时任务移除,当到了超时时间时,任务还存在就说明任务超时了。
每次的任务我们以action为键值存在hashMap中
private Map<String, CallbackWrapper> callbacks = new HashMap<>();
将任务放入超时任务代码如下所示:
final ScheduledFuture timeoutTask = enqueueTimeout(request, request.getTimeOut());
final RequestCallBack requestCallBack = new RequestCallBack() {
@Override
public void requestSuccess() {
requestListern.requestSuccess();
}
@Override
public void requestFailed(String message, Request request) {
requestListern.requestFailed(message);
}
@Override
public void timeOut(Request request) {
timeOutHanlder(request);
}
};
callbacks.put(request.getAction(),
new CallbackWrapper(request, requestCallBack, timeoutTask));
一般而言,任务超时都是由于连接原因导致,所以我们这里可以尝试重试一次,如果还是超时,通过 timeOutHanlder(request);方法 进行重新连接,重连代码和连接代码一样,这里就省略了,做好这步操作,我们就可以发送消息了。
/**
* 超时任务
*/
private void timeOutHanlder(Request requset) {
setStatus(WsStatus.CONNECT_FAIL);
//这里假装有重连
Log.d(TAG, "timeOutHanlder: 请求超时 准备重连");
}
到这里我们的流程基本可以走通了。
心跳
首先我们要了解下心跳的作用是什么,心跳是在连接成功后,通过固定的间隔时间向服务器发送询问,当前是否还在线,有很多人说心跳失败我们就重连,成功就继续心跳,但是这里要注意的是,我们一般是收不到心跳失败回调的,心跳也是向服务器发送数据,所以我们要将所有的主动请求都放在超时任务队列中,
所以对websocket来说 请求结果有三种:成功、失败、超时,对于用户 只有成功、失败即可。
至于心跳、注册等请求发送的数据是什么,这就得看我们与服务端定的协议是什么样了,通常来说 分为action 和 requestBody,协议格式我们再第二步已经封装好了,这里我们以心跳任务为例验证上面的封装。
/**
* 心跳
*/
void keepAlive() {
Request request = new Request.Builder()
.reqCount(0)
.timeOut(REQUEST_TIMEOUT)
.action(ACTION_KEEPALIVE).build();
WsManager.getWsManger().senRequest(request, request.getReqCount() + 1, new RequestListern() {
@Override
public void requestSuccess() {
Log.d(TAG, "requestSuccess: 心跳发送成功了");
}
@Override
public void requestFailed(String message) {
}
});
}
我们每间隔10s中开启一次心跳任务
/**
* 开始心跳
*/
public void startKeepAlive() {
mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}
/**
* 心跳任务
*/
private Runnable mKeepAliveTask = new Runnable() {
@Override
public void run() {
keepAlive();
mHandler.removeCallbacks(mKeepAliveTask);
mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}
};
为了便于操作演示,在主页面上加个按钮 ,点击按钮调用startKeepAlive方法,运行如下所示:
我们可以看到心跳返回的statue是300 不成功,5秒之后走到了请求超时的方法中,所以如果状态返回成功的话,我们需要回调给调用者
/**
* 处理 任务回调
*
* @param action 请求类型
*/
void disPatchCallbackWarp(String action, boolean isSuccess) {
CallbackWrapper callBackWarp = callbacks.remove(action);
if (callBackWarp == null) {
Logger.d(TAG+" "+ "disPatchCallbackWarp: 任务队列为空");
} else {
callBackWarp.getScheduledFuture().cancel(true);
if (isSuccess) {
callBackWarp.getRequestCallBack().requestSuccess();
} else {
callBackWarp.getRequestCallBack().requestFailed("", new Request());
}
}
}
这样调用者才知道成功或失败。
发送其他消息与心跳一样,只是请求参数不同而已,修改Request参数即可。这样我们根据协议和业务就实现一个比较规范的webSocket消息推送流程了。
来源:https://blog.csdn.net/huangliniqng/article/details/105338690
猜你喜欢
- 背景Springboot 默认把异常的处理集中到一个ModelAndView中了,但项目的实际过程中,这样做,并不能满足我们的要求。具体的自
- DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。DataHub 除了供了一个缓冲的队列作用。同时由于
- 一、点睛邻接矩阵通常采用一个一维数组存储图中节点的信息,采用一个二维数组存储图中节点之间的邻接关系。邻接矩阵可以用来表示无向图、有向图和网。
- 前言: Java 8已经公布有一段时间了,种种迹象表明Java 8是一个有重大改变的发行版。在Java Code Geeks上已经有很多介绍
- 第一个Lambda表达式在Lambda出现之前,如果我们需要写一个多线程可能需要下面这种方式:Runnable runnable = new
- 一、问题描述有时候,我们会遇到在遍历List集合的过程中删除数据的情况。看着自己写的代码,感觉完全没有问题,但就是达不到预期的效果,这是为什
- private static void ExecuteSqlTransaction(string connectionString)&nbs
- 引言最近在写一个 Mybatis 代码自动生成插件,用的是Mybatis来扩展,其中有一个需求就是 生成javaMapper文件和 xmlM
- static修饰符是java里面非常常用的一个东西,用法也非常多。然而,在kotlin里竟然没有这个东西!那该如何替代呢?本文就总结了下ja
- 概览1. 基于链表的可选有界阻塞队列。根据FIFO的出入队顺序,从队列头部检索和获取元素,在队列尾部插入新元素。2. 当作为有界阻塞队列,在
- 日志输出是所有系统必备的,很多开发人员可能因为常常使用log4j而忽视了JDK logging模块,两者之间是否有联系?是怎样的联系?JDK
- 前言请看下面几个问题Spring为什么不推荐使用@Autowired 注解?为什么推荐使用@Resource 代替&nb
- 集合定义集合,集合是java中提供的一种容器,可以用来存储多个数据。特点:数组的长度是固定的。集合的长度是可变的。集合中存储的元素必须是引用
- 本文为大家分享了使用栈的迷宫算法java版,主要考察栈的使用,供大家参考,具体内容如下主要思路如下: do { if(当前位置可通过) {
- 网上看了很多篇文章关于如何配置mybatis的logback日志的,复杂的简单的都有,但是有用的没几个,耽误了很多时间。通过对logback
- 一、泛型的概念1.1 基础案例泛型在Java中的应用非常广泛,最常见则是在集合容器中,先看下基础用法:public class Generi
- 摘要:这个问题算是老生常谈了,我也是一段时间没弄过了,所以感觉有些忘了,就记录一下。一、后端通过shiro在session中存储数据://
- 问题描述:因为领导的一个需求,需要用到使用resultMap,很久没使用了,结果就除了点意外。就记录下这个问题准备两个类:author(作者
- 大多数网站会设置用户权限,如过滤非法用户,用户不登录时不能进行访问,或者设置访问的权限,如部分内容仅对VIP开放等等,这些权限的控制都可以用
- 本文实例讲述了C#实现客户端弹出消息框封装类。分享给大家供大家参考。具体如下:asp.net在服务器端运行,是不能在服务器端弹出对话框的,但