软件编程
位置:首页>> 软件编程>> Android编程>> Android6.0 消息机制原理解析

Android6.0 消息机制原理解析

作者:kc58236582  发布时间:2023-08-06 12:19:44 

标签:Android,消息机制

消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环: 

一、消息机制使用 

通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:        


mHandlerThread = new ServiceThread(TAG,
       Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/);
   mHandlerThread.start();
   mHandler = new PowerManagerHandler(mHandlerThread.getLooper());

这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。 

而每个handler,大致如下:


  private final class PowerManagerHandler extends Handler {
   public PowerManagerHandler(Looper looper) {
     super(looper, null, true /*async*/);
   }

@Override
   public void handleMessage(Message msg) {
     switch (msg.what) {
       case MSG_USER_ACTIVITY_TIMEOUT:
         handleUserActivityTimeout();
         break;
       case MSG_SANDMAN:
         handleSandman();
         break;
       case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT:
         handleScreenBrightnessBoostTimeout();
         break;
       case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT:
         checkWakeLockAquireTooLong();
         Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);
         m.setAsynchronous(true);
         mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
         break;
     }
   }
 }

二、消息机制原理
那我们先来看下HandlerThread的主函数run函数: 


public void run() {
   mTid = Process.myTid();
   Looper.prepare();
   synchronized (this) {
     mLooper = Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper
     notifyAll();
   }
   Process.setThreadPriority(mPriority);
   onLooperPrepared();
   Looper.loop();
   mTid = -1;
 }

再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。


public static void prepare() {
   prepare(true);
 }

private static void prepare(boolean quitAllowed) {
   if (sThreadLocal.get() != null) {
     throw new RuntimeException("Only one Looper may be created per thread");
   }
   sThreadLocal.set(new Looper(quitAllowed));
 }

Looper的构造函数中创建了MessageQueue


  private Looper(boolean quitAllowed) {
   mQueue = new MessageQueue(quitAllowed);
   mThread = Thread.currentThread();
 }

我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针


MessageQueue(boolean quitAllowed) {
   mQuitAllowed = quitAllowed;
   mPtr = nativeInit();
 }

native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。


static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
 NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
 if (!nativeMessageQueue) {
   jniThrowRuntimeException(env, "Unable to allocate native queue");
   return 0;
 }

nativeMessageQueue->incStrong(env);
 return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper 


NativeMessageQueue::NativeMessageQueue() :
   mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
 mLooper = Looper::getForThread();
 if (mLooper == NULL) {
   mLooper = new Looper(false);
   Looper::setForThread(mLooper);
 }
}

然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍


Looper::Looper(bool allowNonCallbacks) :
   mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
   mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
   mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
 mWakeEventFd = eventfd(0, EFD_NONBLOCK);
 LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);

AutoMutex _l(mLock);
 rebuildEpollLocked();
}

2.1 c层创建epoll 

我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll


void Looper::rebuildEpollLocked() {
 // Close old epoll instance if we have one.
 if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
   ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
   close(mEpollFd);
 }

// Allocate the new epoll instance and register the wake pipe.
 mEpollFd = epoll_create(EPOLL_SIZE_HINT);
 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

struct epoll_event eventItem;
 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
 eventItem.events = EPOLLIN;
 eventItem.data.fd = mWakeEventFd;
 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
     errno);

for (size_t i = 0; i < mRequests.size(); i++) {
   const Request& request = mRequests.valueAt(i);
   struct epoll_event eventItem;
   request.initEventItem(&eventItem);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
   if (epollResult < 0) {
     ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
         request.fd, errno);
   }
 }
}

继续回到HandlerThread的run函数,我们继续分析Looper的loop函数


public void run() {
   mTid = Process.myTid();
   Looper.prepare();
   synchronized (this) {
     mLooper = Looper.myLooper();
     notifyAll();
   }
   Process.setThreadPriority(mPriority);
   onLooperPrepared();
   Looper.loop();
   mTid = -1;
 }

我们看看Looper的loop函数:


public static void loop() {
   final Looper me = myLooper();
   if (me == null) {
     throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
   }
   final MessageQueue queue = me.mQueue;//得到Looper的mQueue

// Make sure the identity of this thread is that of the local process,
   // and keep track of what that identity token actually is.
   Binder.clearCallingIdentity();
   final long ident = Binder.clearCallingIdentity();

for (;;) {
     Message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_wait
     if (msg == null) {
       // No message indicates that the message queue is quitting.
       return;
     }

// This must be in a local variable, in case a UI event sets the logger
     Printer logging = me.mLogging;//自己打的打印
     if (logging != null) {
       logging.println(">>>>> Dispatching to " + msg.target + " " +
           msg.callback + ": " + msg.what);
     }

msg.target.dispatchMessage(msg);

if (logging != null) {
       logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
     }

// Make sure that during the course of dispatching the
     // identity of the thread wasn't corrupted.
     final long newIdent = Binder.clearCallingIdentity();
     if (ident != newIdent) {
       Log.wtf(TAG, "Thread identity changed from 0x"
           + Long.toHexString(ident) + " to 0x"
           + Long.toHexString(newIdent) + " while dispatching to "
           + msg.target.getClass().getName() + " "
           + msg.callback + " what=" + msg.what);
     }

msg.recycleUnchecked();
   }
 }

MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message


Message next() {
   // Return here if the message loop has already quit and been disposed.
   // This can happen if the application tries to restart a looper after quit
   // which is not supported.
   final long ptr = mPtr;//之前保留的指针
   if (ptr == 0) {
     return null;
   }

int pendingIdleHandlerCount = -1; // -1 only during first iteration
   int nextPollTimeoutMillis = 0;
   for (;;) {
     if (nextPollTimeoutMillis != 0) {
       Binder.flushPendingCommands();
     }

nativePollOnce(ptr, nextPollTimeoutMillis);

下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数


static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
   jlong ptr, jint timeoutMillis) {
 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
 nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

2.2 c层epoll_wait阻塞 

pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollInner函数


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
 int result = 0;
 for (;;) {
   while (mResponseIndex < mResponses.size()) {
     const Response& response = mResponses.itemAt(mResponseIndex++);
     int ident = response.request.ident;
     if (ident >= 0) {
       int fd = response.request.fd;
       int events = response.events;
       void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
       ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
           "fd=%d, events=0x%x, data=%p",
           this, ident, fd, events, data);
#endif
       if (outFd != NULL) *outFd = fd;
       if (outEvents != NULL) *outEvents = events;
       if (outData != NULL) *outData = data;
       return ident;
     }
   }

if (result != 0) {
#if DEBUG_POLL_AND_WAKE
     ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
     if (outFd != NULL) *outFd = 0;
     if (outEvents != NULL) *outEvents = 0;
     if (outData != NULL) *outData = NULL;
     return result;
   }

result = pollInner(timeoutMillis);
 }
}

pollInner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。 



int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
 ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

// Adjust the timeout based on when the next message is due.
 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
   nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
   int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
   if (messageTimeoutMillis >= 0
       && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
     timeoutMillis = messageTimeoutMillis;
   }
#if DEBUG_POLL_AND_WAKE
   ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
       this, mNextMessageUptime - now, timeoutMillis);
#endif
 }

// Poll.
 int result = POLL_WAKE;
 mResponses.clear();//清空mResponses
 mResponseIndex = 0;

// We are about to idle.
 mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的

// No longer idling.
 mPolling = false;

// Acquire lock.
 mLock.lock();

// Rebuild epoll set if needed.
 if (mEpollRebuildRequired) {
   mEpollRebuildRequired = false;
   rebuildEpollLocked();
   goto Done;
 }

// Check for poll error.
 if (eventCount < 0) {
   if (errno == EINTR) {
     goto Done;
   }
   ALOGW("Poll failed with an unexpected error, errno=%d", errno);
   result = POLL_ERROR;
   goto Done;
 }

// Check for poll timeout.
 if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
   ALOGD("%p ~ pollOnce - timeout", this);
#endif
   result = POLL_TIMEOUT;
   goto Done;
 }

// Handle all events.
#if DEBUG_POLL_AND_WAKE
 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

for (int i = 0; i < eventCount; i++) {
   int fd = eventItems[i].data.fd;
   uint32_t epollEvents = eventItems[i].events;
   if (fd == mWakeEventFd) {//通知唤醒线程的事件
     if (epollEvents & EPOLLIN) {
       awoken();
     } else {
       ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
     }
   } else {
     ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件
     if (requestIndex >= 0) {
       int events = 0;
       if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
       if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
       if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
       if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
       pushResponse(events, mRequests.valueAt(requestIndex));//放在mResponses中
     } else {
       ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
           "no longer registered.", epollEvents, fd);
     }
   }
 }
Done: ;

// Invoke pending message callbacks.
 mNextMessageUptime = LLONG_MAX;
 while (mMessageEnvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的
   nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
   const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
   if (messageEnvelope.uptime <= now) {
     // Remove the envelope from the list.
     // We keep a strong reference to the handler until the call to handleMessage
     // finishes. Then we drop it so that the handler can be deleted *before*
     // we reacquire our lock.
     { // obtain handler
       sp<MessageHandler> handler = messageEnvelope.handler;
       Message message = messageEnvelope.message;
       mMessageEnvelopes.removeAt(0);
       mSendingMessage = true;
       mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
       ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
           this, handler.get(), message.what);
#endif
       handler->handleMessage(message);
     } // release handler

mLock.lock();
     mSendingMessage = false;
     result = POLL_CALLBACK;
   } else {
     // The last message left at the head of the queue determines the next wakeup time.
     mNextMessageUptime = messageEnvelope.uptime;
     break;
   }
 }

// Release lock.
 mLock.unlock();

// Invoke all response callbacks.
 for (size_t i = 0; i < mResponses.size(); i++) {//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调
   Response& response = mResponses.editItemAt(i);
   if (response.request.ident == POLL_CALLBACK) {
     int fd = response.request.fd;
     int events = response.events;
     void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
     ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
         this, response.request.callback.get(), fd, events, data);
#endif
     // Invoke the callback. Note that the file descriptor may be closed by
     // the callback (and potentially even reused) before the function returns so
     // we need to be a little careful when removing the file descriptor afterwards.
     int callbackResult = response.request.callback->handleEvent(fd, events, data);
     if (callbackResult == 0) {
       removeFd(fd, response.request.seq);
     }

// Clear the callback reference in the response structure promptly because we
     // will not clear the response vector itself until the next poll.
     response.request.callback.clear();
     result = POLL_CALLBACK;
   }
 }
 return result;
}

继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息


    for (;;) {
     Message msg = queue.next(); // might block
     if (msg == null) {
       // No message indicates that the message queue is quitting.
       return;
     }

// This must be in a local variable, in case a UI event sets the logger
     Printer logging = me.mLogging;//自己的打印
     if (logging != null) {
       logging.println(">>>>> Dispatching to " + msg.target + " " +
           msg.callback + ": " + msg.what);
     }

msg.target.dispatchMessage(msg);

if (logging != null) {
       logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
     }

// Make sure that during the course of dispatching the
     // identity of the thread wasn't corrupted.
     final long newIdent = Binder.clearCallingIdentity();
     if (ident != newIdent) {
       Log.wtf(TAG, "Thread identity changed from 0x"
           + Long.toHexString(ident) + " to 0x"
           + Long.toHexString(newIdent) + " while dispatching to "
           + msg.target.getClass().getName() + " "
           + msg.callback + " what=" + msg.what);
     }

msg.recycleUnchecked();
   }
 }

2.3 增加调试打印 

我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印


public void setMessageLogging(@Nullable Printer printer) {
   mLogging = printer;
 }
Printer就是一个interface

public interface Printer {
 /**
  * Write a line of text to the output. There is no need to terminate
  * the given string with a newline.
  */
 void println(String x);
}

2.4 java层消息分发处理 

再来看消息的分发,先是调用Handler的obtainMessage函数               


Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);
msg.setAsynchronous(true);
mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);

先看obtainMessage调用了Message的obtain函数


public final Message obtainMessage(int what)
 {
   return Message.obtain(this, what);
 }

Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler


public static Message obtain(Handler h, int what) {
   Message m = obtain();//就是新建一个Message
   m.target = h;
   m.what = what;

return m;
 }

我们再联系之前分发消息 

msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。


  public void dispatchMessage(Message msg) {
   if (msg.callback != null) {
     handleCallback(msg);//这种就是用post形式发送,带Runnable的
   } else {
     if (mCallback != null) {//这种是handler传参的时候就是传入了mCallback回调了
       if (mCallback.handleMessage(msg)) {
         return;
       }
     }
     handleMessage(msg);//最后就是在自己实现的handleMessage处理
   }
 }

2.3 java层 消息发送 

我们再看下java层的消息发送,主要也是调用Handler的sendMessage post之类函数,最终都会调用下面这个函数


  public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
   MessageQueue queue = mQueue;
   if (queue == null) {
     RuntimeException e = new RuntimeException(
         this + " sendMessageAtTime() called with no mQueue");
     Log.w("Looper", e.getMessage(), e);
     return false;
   }
   return enqueueMessage(queue, msg, uptimeMillis);
 }

我们再来看java层发送消息最终都会调用enqueueMessage函数


private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
   msg.target = this;
   if (mAsynchronous) {
     msg.setAsynchronous(true);
   }
   return queue.enqueueMessage(msg, uptimeMillis);
 }

最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数


boolean enqueueMessage(Message msg, long when) {
   if (msg.target == null) {
     throw new IllegalArgumentException("Message must have a target.");
   }
   if (msg.isInUse()) {
     throw new IllegalStateException(msg + " This message is already in use.");
   }

synchronized (this) {
     if (mQuitting) {
       IllegalStateException e = new IllegalStateException(
           msg.target + " sending message to a Handler on a dead thread");
       Log.w(TAG, e.getMessage(), e);
       msg.recycle();
       return false;
     }

msg.markInUse();
     msg.when = when;
     Message p = mMessages;
     boolean needWake;
     if (p == null || when == 0 || when < p.when) {
       // New head, wake up the event queue if blocked.
       msg.next = p;
       mMessages = msg;
       needWake = mBlocked;
     } else {
       // Inserted within the middle of the queue. Usually we don't have to wake
       // up the event queue unless there is a barrier at the head of the queue
       // and the message is the earliest asynchronous message in the queue.
       needWake = mBlocked && p.target == null && msg.isAsynchronous();
       Message prev;
       for (;;) {
         prev = p;
         p = p.next;
         if (p == null || when < p.when) {
           break;
         }
         if (needWake && p.isAsynchronous()) {
           needWake = false;
         }
       }
       msg.next = p; // invariant: p == prev.next
       prev.next = msg;
     }

// We can assume mPtr != 0 because mQuitting is false.
     if (needWake) {
       nativeWake(mPtr);
     }
   }
   return true;
 }

我们看下这个native方法,最后也是调用了Looper的wake函数


static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
 nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
 mLooper->wake();
}

Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。 


void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
 ALOGD("%p ~ wake", this);
#endif

uint64_t inc = 1;
 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
 if (nWrite != sizeof(uint64_t)) {
   if (errno != EAGAIN) {
     ALOGW("Could not write wake signal, errno=%d", errno);
   }
 }
}

2.4 c层发送消息 

在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。


void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
   const Message& message) {
#if DEBUG_CALLBACKS
 ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
     this, uptime, handler.get(), message.what);
#endif

size_t i = 0;
 { // acquire lock
   AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
   while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
     i += 1;
   }

MessageEnvelope messageEnvelope(uptime, handler, message);
   mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

// Optimization: If the Looper is currently sending a message, then we can skip
   // the call to wake() because the next thing the Looper will do after processing
   // messages is to decide when the next wakeup time should be. In fact, it does
   // not even matter whether this code is running on the Looper thread.
   if (mSendingMessage) {
     return;
   }
 } // release lock

// Wake the poll loop only when we enqueue a new message at the head.
 if (i == 0) {
   wake();
 }
}

当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数


  while (mMessageEnvelopes.size() != 0) {
   nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
   const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
   if (messageEnvelope.uptime <= now) {
     // Remove the envelope from the list.
     // We keep a strong reference to the handler until the call to handleMessage
     // finishes. Then we drop it so that the handler can be deleted *before*
     // we reacquire our lock.
     { // obtain handler
       sp<MessageHandler> handler = messageEnvelope.handler;
       Message message = messageEnvelope.message;
       mMessageEnvelopes.removeAt(0);
       mSendingMessage = true;
       mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
       ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
           this, handler.get(), message.what);
#endif
       handler->handleMessage(message);
     } // release handler

mLock.lock();
     mSendingMessage = false;
     result = POLL_CALLBACK;
   } else {
     // The last message left at the head of the queue determines the next wakeup time.
     mNextMessageUptime = messageEnvelope.uptime;
     break;
   }
 }

有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下


  sp<StubMessageHandler> handler = new StubMessageHandler();
 mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1));
StubMessageHandler继承MessageHandler就必须实现handleMessage方法

class StubMessageHandler : public MessageHandler {
public:
 Vector<Message> messages;

virtual void handleMessage(const Message& message) {
   messages.push(message);
 }
};

我们再顺便看下Message和MessageHandler类


struct Message {
 Message() : what(0) { }
 Message(int what) : what(what) { }

/* The message type. (interpretation is left up to the handler) */
 int what;
};

/**
* Interface for a Looper message handler.
*
* The Looper holds a strong reference to the message handler whenever it has
* a message to deliver to it. Make sure to call Looper::removeMessages
* to remove any pending messages destined for the handler so that the handler
* can be destroyed.
*/
class MessageHandler : public virtual RefBase {
protected:
 virtual ~MessageHandler() { }

public:
 /**
  * Handles a message.
  */
 virtual void handleMessage(const Message& message) = 0;
};

2.5 c层addFd 

我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调


int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
 return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
 ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
     events, callback.get(), data);
#endif

if (!callback.get()) {
   if (! mAllowNonCallbacks) {
     ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
     return -1;
   }

if (ident < 0) {
     ALOGE("Invalid attempt to set NULL callback with ident < 0.");
     return -1;
   }
 } else {
   ident = POLL_CALLBACK;
 }

{ // acquire lock
   AutoMutex _l(mLock);

Request request;
   request.fd = fd;
   request.ident = ident;
   request.events = events;
   request.seq = mNextRequestSeq++;
   request.callback = callback;
   request.data = data;
   if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

struct epoll_event eventItem;
   request.initEventItem(&eventItem);

ssize_t requestIndex = mRequests.indexOfKey(fd);
   if (requestIndex < 0) {
     int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);//加入epoll
     if (epollResult < 0) {
       ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
       return -1;
     }
     mRequests.add(fd, request);//放入mRequests中
   } else {
     int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);//更新
     if (epollResult < 0) {
       if (errno == ENOENT) {
         // Tolerate ENOENT because it means that an older file descriptor was
         // closed before its callback was unregistered and meanwhile a new
         // file descriptor with the same number has been created and is now
         // being registered for the first time. This error may occur naturally
         // when a callback has the side-effect of closing the file descriptor
         // before returning and unregistering itself. Callback sequence number
         // checks further ensure that the race is benign.
         //
         // Unfortunately due to kernel limitations we need to rebuild the epoll
         // set from scratch because it may contain an old file handle that we are
         // now unable to remove since its file descriptor is no longer valid.
         // No such problem would have occurred if we were using the poll system
         // call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
         ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
             "being recycled, falling back on EPOLL_CTL_ADD, errno=%d",
             this, errno);
#endif
         epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
         if (epollResult < 0) {
           ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",
               fd, errno);
           return -1;
         }
         scheduleEpollRebuildLocked();
       } else {
         ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
         return -1;
       }
     }
     mRequests.replaceValueAt(requestIndex, request);
   }
 } // release lock
 return 1;
}

在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。


    } else {
     ssize_t requestIndex = mRequests.indexOfKey(fd);
     if (requestIndex >= 0) {
       int events = 0;
       if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
       if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
       if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
       if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
       pushResponse(events, mRequests.valueAt(requestIndex));
     } else {
       ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
           "no longer registered.", epollEvents, fd);
     }
   }

下面我们就会遍历mResponses中的Response,然后调用其request中的回调


  for (size_t i = 0; i < mResponses.size(); i++) {
   Response& response = mResponses.editItemAt(i);
   if (response.request.ident == POLL_CALLBACK) {
     int fd = response.request.fd;
     int events = response.events;
     void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
     ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
         this, response.request.callback.get(), fd, events, data);
#endif
     // Invoke the callback. Note that the file descriptor may be closed by
     // the callback (and potentially even reused) before the function returns so
     // we need to be a little careful when removing the file descriptor afterwards.
     int callbackResult = response.request.callback->handleEvent(fd, events, data);
     if (callbackResult == 0) {
       removeFd(fd, response.request.seq);
     }

// Clear the callback reference in the response structure promptly because we
     // will not clear the response vector itself until the next poll.
     response.request.callback.clear();
     result = POLL_CALLBACK;
   }
 }

同样我们再来看看Looper_test.cpp是如何使用的?


  Pipe pipe;
 StubCallbackHandler handler(true);

handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);

我们看下handler的setCallback函数


class CallbackHandler {
public:
 void setCallback(const sp<Looper>& looper, int fd, int events) {
   looper->addFd(fd, 0, events, staticHandler, this);//就是调用了looper的addFd函数,并且回调
 }

protected:
 virtual ~CallbackHandler() { }

virtual int handler(int fd, int events) = 0;

private:
 static int staticHandler(int fd, int events, void* data) {//这个就是回调函数
   return static_cast<CallbackHandler*>(data)->handler(fd, events);
 }
};

class StubCallbackHandler : public CallbackHandler {
public:
 int nextResult;
 int callbackCount;

int fd;
 int events;

StubCallbackHandler(int nextResult) : nextResult(nextResult),
     callbackCount(0), fd(-1), events(-1) {
 }

protected:
 virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的
   callbackCount += 1;
   this->fd = fd;
   this->events = events;
   return nextResult;
 }
};

我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback


int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
 return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}

这里的Looper_callbackFunc是一个typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);

我们再来看SimpleLooperCallback


class SimpleLooperCallback : public LooperCallback {
protected:
 virtual ~SimpleLooperCallback();

public:
 SimpleLooperCallback(Looper_callbackFunc callback);
 virtual int handleEvent(int fd, int events, void* data);

private:
 Looper_callbackFunc mCallback;
};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :
   mCallback(callback) {
}

SimpleLooperCallback::~SimpleLooperCallback() {
}

int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
 return mCallback(fd, events, data);
}

最后我们是调用callback->handleEvent(fd, events, data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler 的this指针
 因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler 的handler函数中了。 

当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。
 int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

2.6 java层addFd 

一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。 

我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能


  public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,
     @OnFileDescriptorEventListener.Events int events,
     @NonNull OnFileDescriptorEventListener listener) {
   if (fd == null) {
     throw new IllegalArgumentException("fd must not be null");
   }
   if (listener == null) {
     throw new IllegalArgumentException("listener must not be null");
   }

synchronized (this) {
     updateOnFileDescriptorEventListenerLocked(fd, events, listener);
   }
 }

我们再来看看OnFileDescriptorEventListener 这个回调


  public interface OnFileDescriptorEventListener {
   public static final int EVENT_INPUT = 1 << 0;
   public static final int EVENT_OUTPUT = 1 << 1;
   public static final int EVENT_ERROR = 1 << 2;

/** @hide */
   @Retention(RetentionPolicy.SOURCE)
   @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR})
   public @interface Events {}

@Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events);
 }

接着调用了updateOnFileDescriptorEventListenerLocked函数


private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,
     OnFileDescriptorEventListener listener) {
   final int fdNum = fd.getInt$();

int index = -1;
   FileDescriptorRecord record = null;
   if (mFileDescriptorRecords != null) {
     index = mFileDescriptorRecords.indexOfKey(fdNum);
     if (index >= 0) {
       record = mFileDescriptorRecords.valueAt(index);
       if (record != null && record.mEvents == events) {
         return;
       }
     }
   }

if (events != 0) {
     events |= OnFileDescriptorEventListener.EVENT_ERROR;
     if (record == null) {
       if (mFileDescriptorRecords == null) {
         mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();
       }
       record = new FileDescriptorRecord(fd, events, listener);//fd保存在FileDescriptorRecord对象
       mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords然后保存在
     } else {
       record.mListener = listener;
       record.mEvents = events;
       record.mSeq += 1;
     }
     nativeSetFileDescriptorEvents(mPtr, fdNum, events);//调用native函数
   } else if (record != null) {
     record.mEvents = 0;
     mFileDescriptorRecords.removeAt(index);
   }
 }

native最后调用了NativeMessageQueue的setFileDescriptorEvents函数 


static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
   jlong ptr, jint fd, jint events) {
 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
 nativeMessageQueue->setFileDescriptorEvents(fd, events);
}

setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback


void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
 if (events) {
   int looperEvents = 0;
   if (events & CALLBACK_EVENT_INPUT) {
     looperEvents |= Looper::EVENT_INPUT;
   }
   if (events & CALLBACK_EVENT_OUTPUT) {
     looperEvents |= Looper::EVENT_OUTPUT;
   }
   mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
       reinterpret_cast<void*>(events));
 } else {
   mLooper->removeFd(fd);
 }
}

果然是,需要实现handleEvent函数


class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
 NativeMessageQueue();
 virtual ~NativeMessageQueue();

virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);

void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
 void wake();
 void setFileDescriptorEvents(int fd, int events);

virtual int handleEvent(int fd, int events, void* data);

handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数


int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
 int events = 0;
 if (looperEvents & Looper::EVENT_INPUT) {
   events |= CALLBACK_EVENT_INPUT;
 }
 if (looperEvents & Looper::EVENT_OUTPUT) {
   events |= CALLBACK_EVENT_OUTPUT;
 }
 if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
   events |= CALLBACK_EVENT_ERROR;
 }
 int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
 int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
     gMessageQueueClassInfo.dispatchEvents, fd, events); //调用回调
 if (!newWatchedEvents) {
   return 0; // unregister the fd
 }
 if (newWatchedEvents != oldWatchedEvents) {
   setFileDescriptorEvents(fd, newWatchedEvents);
 }
 return 1;
}

最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数


// Called from native code.
 private int dispatchEvents(int fd, int events) {
   // Get the file descriptor record and any state that might change.
   final FileDescriptorRecord record;
   final int oldWatchedEvents;
   final OnFileDescriptorEventListener listener;
   final int seq;
   synchronized (this) {
     record = mFileDescriptorRecords.get(fd);//通过fd得到FileDescriptorRecord
     if (record == null) {
       return 0; // spurious, no listener registered
     }

oldWatchedEvents = record.mEvents;
     events &= oldWatchedEvents; // filter events based on current watched set
     if (events == 0) {
       return oldWatchedEvents; // spurious, watched events changed
     }

listener = record.mListener;
     seq = record.mSeq;
   }

// Invoke the listener outside of the lock.
   int newWatchedEvents = listener.onFileDescriptorEvents(//listener回调
       record.mDescriptor, events);
   if (newWatchedEvents != 0) {
     newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;
   }

// Update the file descriptor record if the listener changed the set of
   // events to watch and the listener itself hasn't been updated since.
   if (newWatchedEvents != oldWatchedEvents) {
     synchronized (this) {
       int index = mFileDescriptorRecords.indexOfKey(fd);
       if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record
           && record.mSeq == seq) {
         record.mEvents = newWatchedEvents;
         if (newWatchedEvents == 0) {
           mFileDescriptorRecords.removeAt(index);
         }
       }
     }
   }

// Return the new set of events to watch for native code to take care of.
   return newWatchedEvents;
 }

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com