Android6.0 消息机制原理解析

消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环: 

一、消息机制使用 

通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:        

mHandlerThread = new ServiceThread(TAG,

Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/);

mHandlerThread.start();

mHandler = new PowerManagerHandler(mHandlerThread.getLooper());

这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。 

而每个handler,大致如下:

private final class PowerManagerHandler extends Handler {

public PowerManagerHandler(Looper looper) {

super(looper, null, true /*async*/);

}

@Override

public void handleMessage(Message msg) {

switch (msg.what) {

case MSG_USER_ACTIVITY_TIMEOUT:

handleUserActivityTimeout();

break;

case MSG_SANDMAN:

handleSandman();

break;

case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT:

handleScreenBrightnessBoostTimeout();

break;

case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT:

checkWakeLockAquireTooLong();

Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);

m.setAsynchronous(true);

mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);

break;

}

}

}

二、消息机制原理

那我们先来看下HandlerThread的主函数run函数: 

public void run() {

mTid = Process.myTid();

Looper.prepare();

synchronized (this) {

mLooper = Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper

notifyAll();

}

Process.setThreadPriority(mPriority);

onLooperPrepared();

Looper.loop();

mTid = -1;

}

再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。

public static void prepare() {

prepare(true);

}

private static void prepare(boolean quitAllowed) {

if (sThreadLocal.get() != null) {

throw new RuntimeException("Only one Looper may be created per thread");

}

sThreadLocal.set(new Looper(quitAllowed));

}

Looper的构造函数中创建了MessageQueue

private Looper(boolean quitAllowed) {

mQueue = new MessageQueue(quitAllowed);

mThread = Thread.currentThread();

}

我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针

MessageQueue(boolean quitAllowed) {

mQuitAllowed = quitAllowed;

mPtr = nativeInit();

}

native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {

NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();

if (!nativeMessageQueue) {

jniThrowRuntimeException(env, "Unable to allocate native queue");

return 0;

}

nativeMessageQueue->incStrong(env);

return reinterpret_cast<jlong>(nativeMessageQueue);

}

NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper 

NativeMessageQueue::NativeMessageQueue() :

mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {

mLooper = Looper::getForThread();

if (mLooper == NULL) {

mLooper = new Looper(false);

Looper::setForThread(mLooper);

}

}

然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍

Looper::Looper(bool allowNonCallbacks) :

mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),

mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),

mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {

mWakeEventFd = eventfd(0, EFD_NONBLOCK);

LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);

AutoMutex _l(mLock);

rebuildEpollLocked();

}

2.1 c层创建epoll 

我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll

void Looper::rebuildEpollLocked() {

// Close old epoll instance if we have one.

if (mEpollFd >= 0) {

#if DEBUG_CALLBACKS

ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);

#endif

close(mEpollFd);

}

// Allocate the new epoll instance and register the wake pipe.

mEpollFd = epoll_create(EPOLL_SIZE_HINT);

LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

struct epoll_event eventItem;

memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union

eventItem.events = EPOLLIN;

eventItem.data.fd = mWakeEventFd;

int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",

errno);

for (size_t i = 0; i < mRequests.size(); i++) {

const Request& request = mRequests.valueAt(i);

struct epoll_event eventItem;

request.initEventItem(&eventItem);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);

if (epollResult < 0) {

ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",

request.fd, errno);

}

}

}

继续回到HandlerThread的run函数,我们继续分析Looper的loop函数

public void run() {

mTid = Process.myTid();

Looper.prepare();

synchronized (this) {

mLooper = Looper.myLooper();

notifyAll();

}

Process.setThreadPriority(mPriority);

onLooperPrepared();

Looper.loop();

mTid = -1;

}

我们看看Looper的loop函数:

public static void loop() {

final Looper me = myLooper();

if (me == null) {

throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");

}

final MessageQueue queue = me.mQueue;//得到Looper的mQueue

// Make sure the identity of this thread is that of the local process,

// and keep track of what that identity token actually is.

Binder.clearCallingIdentity();

final long ident = Binder.clearCallingIdentity();

for (;;) {

Message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_wait

if (msg == null) {

// No message indicates that the message queue is quitting.

return;

}

// This must be in a local variable, in case a UI event sets the logger

Printer logging = me.mLogging;//自己打的打印

if (logging != null) {

logging.println(">>>>> Dispatching to " + msg.target + " " +

msg.callback + ": " + msg.what);

}

msg.target.dispatchMessage(msg);

if (logging != null) {

logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);

}

// Make sure that during the course of dispatching the

// identity of the thread wasn't corrupted.

final long newIdent = Binder.clearCallingIdentity();

if (ident != newIdent) {

Log.wtf(TAG, "Thread identity changed from 0x"

+ Long.toHexString(ident) + " to 0x"

+ Long.toHexString(newIdent) + " while dispatching to "

+ msg.target.getClass().getName() + " "

+ msg.callback + " what=" + msg.what);

}

msg.recycleUnchecked();

}

}

MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message

Message next() {

// Return here if the message loop has already quit and been disposed.

// This can happen if the application tries to restart a looper after quit

// which is not supported.

final long ptr = mPtr;//之前保留的指针

if (ptr == 0) {

return null;

}

int pendingIdleHandlerCount = -1; // -1 only during first iteration

int nextPollTimeoutMillis = 0;

for (;;) {

if (nextPollTimeoutMillis != 0) {

Binder.flushPendingCommands();

}

nativePollOnce(ptr, nextPollTimeoutMillis);

下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,

jlong ptr, jint timeoutMillis) {

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);

nativeMessageQueue->pollOnce(env, obj, timeoutMillis);

}

2.2 c层epoll_wait阻塞 

pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollInner函数

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {

int result = 0;

for (;;) {

while (mResponseIndex < mResponses.size()) {

const Response& response = mResponses.itemAt(mResponseIndex++);

int ident = response.request.ident;

if (ident >= 0) {

int fd = response.request.fd;

int events = response.events;

void* data = response.request.data;

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - returning signalled identifier %d: "

"fd=%d, events=0x%x, data=%p",

this, ident, fd, events, data);

#endif

if (outFd != NULL) *outFd = fd;

if (outEvents != NULL) *outEvents = events;

if (outData != NULL) *outData = data;

return ident;

}

}

if (result != 0) {

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - returning result %d", this, result);

#endif

if (outFd != NULL) *outFd = 0;

if (outEvents != NULL) *outEvents = 0;

if (outData != NULL) *outData = NULL;

return result;

}

result = pollInner(timeoutMillis);

}

}

pollInner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。 

int Looper::pollInner(int timeoutMillis) {

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);

#endif

// Adjust the timeout based on when the next message is due.

if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {

nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);

if (messageTimeoutMillis >= 0

&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {

timeoutMillis = messageTimeoutMillis;

}

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",

this, mNextMessageUptime - now, timeoutMillis);

#endif

}

// Poll.

int result = POLL_WAKE;

mResponses.clear();//清空mResponses

mResponseIndex = 0;

// We are about to idle.

mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的

// No longer idling.

mPolling = false;

// Acquire lock.

mLock.lock();

// Rebuild epoll set if needed.

if (mEpollRebuildRequired) {

mEpollRebuildRequired = false;

rebuildEpollLocked();

goto Done;

}

// Check for poll error.

if (eventCount < 0) {

if (errno == EINTR) {

goto Done;

}

ALOGW("Poll failed with an unexpected error, errno=%d", errno);

result = POLL_ERROR;

goto Done;

}

// Check for poll timeout.

if (eventCount == 0) {

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - timeout", this);

#endif

result = POLL_TIMEOUT;

goto Done;

}

// Handle all events.

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);

#endif

for (int i = 0; i < eventCount; i++) {

int fd = eventItems[i].data.fd;

uint32_t epollEvents = eventItems[i].events;

if (fd == mWakeEventFd) {//通知唤醒线程的事件

if (epollEvents & EPOLLIN) {

awoken();

} else {

ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);

}

} else {

ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件

if (requestIndex >= 0) {

int events = 0;

if (epollEvents & EPOLLIN) events |= EVENT_INPUT;

if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;

if (epollEvents & EPOLLERR) events |= EVENT_ERROR;

if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;

pushResponse(events, mRequests.valueAt(requestIndex));//放在mResponses中

} else {

ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "

"no longer registered.", epollEvents, fd);

}

}

}

Done: ;

// Invoke pending message callbacks.

mNextMessageUptime = LLONG_MAX;

while (mMessageEnvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的

nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);

if (messageEnvelope.uptime <= now) {

// Remove the envelope from the list.

// We keep a span reference to the handler until the call to handleMessage

// finishes. Then we drop it so that the handler can be deleted *before*

// we reacquire our lock.

{ // obtain handler

sp<MessageHandler> handler = messageEnvelope.handler;

Message message = messageEnvelope.message;

mMessageEnvelopes.removeAt(0);

mSendingMessage = true;

mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",

this, handler.get(), message.what);

#endif

handler->handleMessage(message);

} // release handler

mLock.lock();

mSendingMessage = false;

result = POLL_CALLBACK;

} else {

// The last message left at the head of the queue determines the next wakeup time.

mNextMessageUptime = messageEnvelope.uptime;

break;

}

}

// Release lock.

mLock.unlock();

// Invoke all response callbacks.

for (size_t i = 0; i < mResponses.size(); i++) {//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调

Response& response = mResponses.editItemAt(i);

if (response.request.ident == POLL_CALLBACK) {

int fd = response.request.fd;

int events = response.events;

void* data = response.request.data;

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",

this, response.request.callback.get(), fd, events, data);

#endif

// Invoke the callback. Note that the file descriptor may be closed by

// the callback (and potentially even reused) before the function returns so

// we need to be a little careful when removing the file descriptor afterwards.

int callbackResult = response.request.callback->handleEvent(fd, events, data);

if (callbackResult == 0) {

removeFd(fd, response.request.seq);

}

// Clear the callback reference in the response structure promptly because we

// will not clear the response vector itself until the next poll.

response.request.callback.clear();

result = POLL_CALLBACK;

}

}

return result;

}

继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息

for (;;) {

Message msg = queue.next(); // might block

if (msg == null) {

// No message indicates that the message queue is quitting.

return;

}

// This must be in a local variable, in case a UI event sets the logger

Printer logging = me.mLogging;//自己的打印

if (logging != null) {

logging.println(">>>>> Dispatching to " + msg.target + " " +

msg.callback + ": " + msg.what);

}

msg.target.dispatchMessage(msg);

if (logging != null) {

logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);

}

// Make sure that during the course of dispatching the

// identity of the thread wasn't corrupted.

final long newIdent = Binder.clearCallingIdentity();

if (ident != newIdent) {

Log.wtf(TAG, "Thread identity changed from 0x"

+ Long.toHexString(ident) + " to 0x"

+ Long.toHexString(newIdent) + " while dispatching to "

+ msg.target.getClass().getName() + " "

+ msg.callback + " what=" + msg.what);

}

msg.recycleUnchecked();

}

}

2.3 增加调试打印 

我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印

public void setMessageLogging(@Nullable Printer printer) {

mLogging = printer;

}

Printer就是一个interface

public interface Printer {

/**

* Write a line of text to the output. There is no need to terminate

* the given string with a newline.

*/

void println(String x);

}

2.4 java层消息分发处理 

再来看消息的分发,先是调用Handler的obtainMessage函数               

Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);

msg.setAsynchronous(true);

mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);

先看obtainMessage调用了Message的obtain函数

public final Message obtainMessage(int what)

{

return Message.obtain(this, what);

}

Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler

public static Message obtain(Handler h, int what) {

Message m = obtain();//就是新建一个Message

m.target = h;

m.what = what;

return m;

}

我们再联系之前分发消息 

msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。

public void dispatchMessage(Message msg) {

if (msg.callback != null) {

handleCallback(msg);//这种就是用post形式发送,带Runnable的

} else {

if (mCallback != null) {//这种是handler传参的时候就是传入了mCallback回调了

if (mCallback.handleMessage(msg)) {

return;

}

}

handleMessage(msg);//最后就是在自己实现的handleMessage处理

}

}

2.3 java层 消息发送 

我们再看下java层的消息发送,主要也是调用Handler的sendMessage post之类函数,最终都会调用下面这个函数

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {

MessageQueue queue = mQueue;

if (queue == null) {

RuntimeException e = new RuntimeException(

this + " sendMessageAtTime() called with no mQueue");

Log.w("Looper", e.getMessage(), e);

return false;

}

return enqueueMessage(queue, msg, uptimeMillis);

}

我们再来看java层发送消息最终都会调用enqueueMessage函数

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {

msg.target = this;

if (mAsynchronous) {

msg.setAsynchronous(true);

}

return queue.enqueueMessage(msg, uptimeMillis);

}

最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数

boolean enqueueMessage(Message msg, long when) {

if (msg.target == null) {

throw new IllegalArgumentException("Message must have a target.");

}

if (msg.isInUse()) {

throw new IllegalStateException(msg + " This message is already in use.");

}

synchronized (this) {

if (mQuitting) {

IllegalStateException e = new IllegalStateException(

msg.target + " sending message to a Handler on a dead thread");

Log.w(TAG, e.getMessage(), e);

msg.recycle();

return false;

}

msg.markInUse();

msg.when = when;

Message p = mMessages;

boolean needWake;

if (p == null || when == 0 || when < p.when) {

// New head, wake up the event queue if blocked.

msg.next = p;

mMessages = msg;

needWake = mBlocked;

} else {

// Inserted within the middle of the queue. Usually we don't have to wake

// up the event queue unless there is a barrier at the head of the queue

// and the message is the earliest asynchronous message in the queue.

needWake = mBlocked && p.target == null && msg.isAsynchronous();

Message prev;

for (;;) {

prev = p;

p = p.next;

if (p == null || when < p.when) {

break;

}

if (needWake && p.isAsynchronous()) {

needWake = false;

}

}

msg.next = p; // invariant: p == prev.next

prev.next = msg;

}

// We can assume mPtr != 0 because mQuitting is false.

if (needWake) {

nativeWake(mPtr);

}

}

return true;

}

我们看下这个native方法,最后也是调用了Looper的wake函数

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);

nativeMessageQueue->wake();

}

void NativeMessageQueue::wake() {

mLooper->wake();

}

Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。 

void Looper::wake() {

#if DEBUG_POLL_AND_WAKE

ALOGD("%p ~ wake", this);

#endif

uint64_t inc = 1;

ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));

if (nWrite != sizeof(uint64_t)) {

if (errno != EAGAIN) {

ALOGW("Could not write wake signal, errno=%d", errno);

}

}

}

2.4 c层发送消息 

在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,

const Message& message) {

#if DEBUG_CALLBACKS

ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",

this, uptime, handler.get(), message.what);

#endif

size_t i = 0;

{ // acquire lock

AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();

while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {

i += 1;

}

MessageEnvelope messageEnvelope(uptime, handler, message);

mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

// Optimization: If the Looper is currently sending a message, then we can skip

// the call to wake() because the next thing the Looper will do after processing

// messages is to decide when the next wakeup time should be. In fact, it does

// not even matter whether this code is running on the Looper thread.

if (mSendingMessage) {

return;

}

} // release lock

// Wake the poll loop only when we enqueue a new message at the head.

if (i == 0) {

wake();

}

}

当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数

while (mMessageEnvelopes.size() != 0) {

nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);

if (messageEnvelope.uptime <= now) {

// Remove the envelope from the list.

// We keep a span reference to the handler until the call to handleMessage

// finishes. Then we drop it so that the handler can be deleted *before*

// we reacquire our lock.

{ // obtain handler

sp<MessageHandler> handler = messageEnvelope.handler;

Message message = messageEnvelope.message;

mMessageEnvelopes.removeAt(0);

mSendingMessage = true;

mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",

this, handler.get(), message.what);

#endif

handler->handleMessage(message);

} // release handler

mLock.lock();

mSendingMessage = false;

result = POLL_CALLBACK;

} else {

// The last message left at the head of the queue determines the next wakeup time.

mNextMessageUptime = messageEnvelope.uptime;

break;

}

}

有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下

sp<StubMessageHandler> handler = new StubMessageHandler();

mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1));

StubMessageHandler继承MessageHandler就必须实现handleMessage方法

class StubMessageHandler : public MessageHandler {

public:

Vector<Message> messages;

virtual void handleMessage(const Message& message) {

messages.push(message);

}

};

我们再顺便看下Message和MessageHandler类

struct Message {

Message() : what(0) { }

Message(int what) : what(what) { }

/* The message type. (interpretation is left up to the handler) */

int what;

};

/**

* Interface for a Looper message handler.

*

* The Looper holds a span reference to the message handler whenever it has

* a message to deliver to it. Make sure to call Looper::removeMessages

* to remove any pending messages destined for the handler so that the handler

* can be destroyed.

*/

class MessageHandler : public virtual RefBase {

protected:

virtual ~MessageHandler() { }

public:

/**

* Handles a message.

*/

virtual void handleMessage(const Message& message) = 0;

};

2.5 c层addFd 

我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调

int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {

return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);

}

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {

#if DEBUG_CALLBACKS

ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,

events, callback.get(), data);

#endif

if (!callback.get()) {

if (! mAllowNonCallbacks) {

ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");

return -1;

}

if (ident < 0) {

ALOGE("Invalid attempt to set NULL callback with ident < 0.");

return -1;

}

} else {

ident = POLL_CALLBACK;

}

{ // acquire lock

AutoMutex _l(mLock);

Request request;

request.fd = fd;

request.ident = ident;

request.events = events;

request.seq = mNextRequestSeq++;

request.callback = callback;

request.data = data;

if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

struct epoll_event eventItem;

request.initEventItem(&eventItem);

ssize_t requestIndex = mRequests.indexOfKey(fd);

if (requestIndex < 0) {

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);//加入epoll

if (epollResult < 0) {

ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);

return -1;

}

mRequests.add(fd, request);//放入mRequests中

} else {

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);//更新

if (epollResult < 0) {

if (errno == ENOENT) {

// Tolerate ENOENT because it means that an older file descriptor was

// closed before its callback was unregistered and meanwhile a new

// file descriptor with the same number has been created and is now

// being registered for the first time. This error may occur naturally

// when a callback has the side-effect of closing the file descriptor

// before returning and unregistering itself. Callback sequence number

// checks further ensure that the race is benign.

//

// Unfortunately due to kernel limitations we need to rebuild the epoll

// set from scratch because it may contain an old file handle that we are

// now unable to remove since its file descriptor is no longer valid.

// No such problem would have occurred if we were using the poll system

// call instead, but that approach carries others disadvantages.

#if DEBUG_CALLBACKS

ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "

"being recycled, falling back on EPOLL_CTL_ADD, errno=%d",

this, errno);

#endif

epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);

if (epollResult < 0) {

ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",

fd, errno);

return -1;

}

scheduleEpollRebuildLocked();

} else {

ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);

return -1;

}

}

mRequests.replaceValueAt(requestIndex, request);

}

} // release lock

return 1;

}

在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。

} else {

ssize_t requestIndex = mRequests.indexOfKey(fd);

if (requestIndex >= 0) {

int events = 0;

if (epollEvents & EPOLLIN) events |= EVENT_INPUT;

if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;

if (epollEvents & EPOLLERR) events |= EVENT_ERROR;

if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;

pushResponse(events, mRequests.valueAt(requestIndex));

} else {

ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "

"no longer registered.", epollEvents, fd);

}

}

下面我们就会遍历mResponses中的Response,然后调用其request中的回调

for (size_t i = 0; i < mResponses.size(); i++) {

Response& response = mResponses.editItemAt(i);

if (response.request.ident == POLL_CALLBACK) {

int fd = response.request.fd;

int events = response.events;

void* data = response.request.data;

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",

this, response.request.callback.get(), fd, events, data);

#endif

// Invoke the callback. Note that the file descriptor may be closed by

// the callback (and potentially even reused) before the function returns so

// we need to be a little careful when removing the file descriptor afterwards.

int callbackResult = response.request.callback->handleEvent(fd, events, data);

if (callbackResult == 0) {

removeFd(fd, response.request.seq);

}

// Clear the callback reference in the response structure promptly because we

// will not clear the response vector itself until the next poll.

response.request.callback.clear();

result = POLL_CALLBACK;

}

}

同样我们再来看看Looper_test.cpp是如何使用的?

Pipe pipe;

StubCallbackHandler handler(true);

handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);

我们看下handler的setCallback函数

class CallbackHandler {

public:

void setCallback(const sp<Looper>& looper, int fd, int events) {

looper->addFd(fd, 0, events, staticHandler, this);//就是调用了looper的addFd函数,并且回调

}

protected:

virtual ~CallbackHandler() { }

virtual int handler(int fd, int events) = 0;

private:

static int staticHandler(int fd, int events, void* data) {//这个就是回调函数

return static_cast<CallbackHandler*>(data)->handler(fd, events);

}

};

class StubCallbackHandler : public CallbackHandler {

public:

int nextResult;

int callbackCount;

int fd;

int events;

StubCallbackHandler(int nextResult) : nextResult(nextResult),

callbackCount(0), fd(-1), events(-1) {

}

protected:

virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的

callbackCount += 1;

this->fd = fd;

this->events = events;

return nextResult;

}

};

我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback

int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {

return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);

}

这里的Looper_callbackFunc是一个typedef

typedef int (*Looper_callbackFunc)(int fd, int events, void* data);

我们再来看SimpleLooperCallback

class SimpleLooperCallback : public LooperCallback {

protected:

virtual ~SimpleLooperCallback();

public:

SimpleLooperCallback(Looper_callbackFunc callback);

virtual int handleEvent(int fd, int events, void* data);

private:

Looper_callbackFunc mCallback;

};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :

mCallback(callback) {

}

SimpleLooperCallback::~SimpleLooperCallback() {

}

int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {

return mCallback(fd, events, data);

}

最后我们是调用callback->handleEvent(fd, events, data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler 的this指针

 因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler 的handler函数中了。 

当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。

 int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

2.6 java层addFd 

一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。 

我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能

public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,

@OnFileDescriptorEventListener.Events int events,

@NonNull OnFileDescriptorEventListener listener) {

if (fd == null) {

throw new IllegalArgumentException("fd must not be null");

}

if (listener == null) {

throw new IllegalArgumentException("listener must not be null");

}

synchronized (this) {

updateOnFileDescriptorEventListenerLocked(fd, events, listener);

}

}

我们再来看看OnFileDescriptorEventListener 这个回调

public interface OnFileDescriptorEventListener {

public static final int EVENT_INPUT = 1 << 0;

public static final int EVENT_OUTPUT = 1 << 1;

public static final int EVENT_ERROR = 1 << 2;

/** @hide */

@Retention(RetentionPolicy.SOURCE)

@IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR})

public @interface Events {}

@Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events);

}

接着调用了updateOnFileDescriptorEventListenerLocked函数

private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,

OnFileDescriptorEventListener listener) {

final int fdNum = fd.getInt$();

int index = -1;

FileDescriptorRecord record = null;

if (mFileDescriptorRecords != null) {

index = mFileDescriptorRecords.indexOfKey(fdNum);

if (index >= 0) {

record = mFileDescriptorRecords.valueAt(index);

if (record != null && record.mEvents == events) {

return;

}

}

}

if (events != 0) {

events |= OnFileDescriptorEventListener.EVENT_ERROR;

if (record == null) {

if (mFileDescriptorRecords == null) {

mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();

}

record = new FileDescriptorRecord(fd, events, listener);//fd保存在FileDescriptorRecord对象

mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords然后保存在

} else {

record.mListener = listener;

record.mEvents = events;

record.mSeq += 1;

}

nativeSetFileDescriptorEvents(mPtr, fdNum, events);//调用native函数

} else if (record != null) {

record.mEvents = 0;

mFileDescriptorRecords.removeAt(index);

}

}

native最后调用了NativeMessageQueue的setFileDescriptorEvents函数 

static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,

jlong ptr, jint fd, jint events) {

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);

nativeMessageQueue->setFileDescriptorEvents(fd, events);

}

setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback

void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {

if (events) {

int looperEvents = 0;

if (events & CALLBACK_EVENT_INPUT) {

looperEvents |= Looper::EVENT_INPUT;

}

if (events & CALLBACK_EVENT_OUTPUT) {

looperEvents |= Looper::EVENT_OUTPUT;

}

mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,

reinterpret_cast<void*>(events));

} else {

mLooper->removeFd(fd);

}

}

果然是,需要实现handleEvent函数

class NativeMessageQueue : public MessageQueue, public LooperCallback {

public:

NativeMessageQueue();

virtual ~NativeMessageQueue();

virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);

void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);

void wake();

void setFileDescriptorEvents(int fd, int events);

virtual int handleEvent(int fd, int events, void* data);

handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数

int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {

int events = 0;

if (looperEvents & Looper::EVENT_INPUT) {

events |= CALLBACK_EVENT_INPUT;

}

if (looperEvents & Looper::EVENT_OUTPUT) {

events |= CALLBACK_EVENT_OUTPUT;

}

if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {

events |= CALLBACK_EVENT_ERROR;

}

int oldWatchedEvents = reinterpret_cast<intptr_t>(data);

int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,

gMessageQueueClassInfo.dispatchEvents, fd, events); //调用回调

if (!newWatchedEvents) {

return 0; // unregister the fd

}

if (newWatchedEvents != oldWatchedEvents) {

setFileDescriptorEvents(fd, newWatchedEvents);

}

return 1;

}

最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数

// Called from native code.

private int dispatchEvents(int fd, int events) {

// Get the file descriptor record and any state that might change.

final FileDescriptorRecord record;

final int oldWatchedEvents;

final OnFileDescriptorEventListener listener;

final int seq;

synchronized (this) {

record = mFileDescriptorRecords.get(fd);//通过fd得到FileDescriptorRecord

if (record == null) {

return 0; // spurious, no listener registered

}

oldWatchedEvents = record.mEvents;

events &= oldWatchedEvents; // filter events based on current watched set

if (events == 0) {

return oldWatchedEvents; // spurious, watched events changed

}

listener = record.mListener;

seq = record.mSeq;

}

// Invoke the listener outside of the lock.

int newWatchedEvents = listener.onFileDescriptorEvents(//listener回调

record.mDescriptor, events);

if (newWatchedEvents != 0) {

newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;

}

// Update the file descriptor record if the listener changed the set of

// events to watch and the listener itself hasn't been updated since.

if (newWatchedEvents != oldWatchedEvents) {

synchronized (this) {

int index = mFileDescriptorRecords.indexOfKey(fd);

if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record

&& record.mSeq == seq) {

record.mEvents = newWatchedEvents;

if (newWatchedEvents == 0) {

mFileDescriptorRecords.removeAt(index);

}

}

}

}

// Return the new set of events to watch for native code to take care of.

return newWatchedEvents;

}

以上是 Android6.0 消息机制原理解析 的全部内容, 来源链接: utcz.com/z/352260.html

回到顶部