OKio源码分析

本篇文章主要分析Okio读写流程以及超时检测机制。首先会介绍Okio中几个重要的类,然后提供一段用Okio api 实现读写文件代码,根据这段代码进行整体读写流程分析,以及分析Okio为什么比直接使用Java io 高效,最后介绍了在读写时Okio如何进行超时检测。

1.OKio介绍

Okio作为Okhttp底层io库,它补充了java.io和java.nio的不足,使访问、存储和处理数据更加容易。

Okio中几个重要的类介绍

  • <font color='red'> ByteString </font> 是不可变的字节序列。对于字符数据,最基本的就是String。而ByteString就像是String的兄弟一般,它使得将二进制数据作为一个变量值变得容易。这个类很聪明:它知道如何将自己编码和解码为十六进制、base64和utf-8。
  • <font color='red'> Segment </font> Segment在Okio中作为数据缓冲的载体,一个Segment的数据缓冲大小为8192,即8k。每一个Segment都有前驱和后继结点,也就是说Sement是一个双向链表链表,准确的来说是一个双向循环链表。读取数据从Segment头结点读取写数据从Segment尾结点写。Okio中引入池的概念也就是源码中SegmentPool的实现。SegmentPool负责Segment创建和销毁,SegmentPool最大可以缓存8个Segment。
  • <font color='red'> Buffer </font> 是一个可变的字节序列。像Arraylist一样。得益于它的底层由Segment实现因此你不需要预先设置缓冲区的大小,
    当你将数据从一个缓冲区移动到另一个缓冲区时,它会重新分配Segment的持有关系,而不是跨Segment复制数据。其中Buffer实现了BufferedSource和BufferedSink,同时具读写功能。
  • <font color='red'> Sources </font> 类似于java中的InputStream,Source作为Okio中读取数据的顶层接口只提供了简单的api

    long read(Buffer sink, long byteCount) throws IOException;

    Timeout timeout();

    void close() throws IOException;

    更多读取api由它的子接口BufferedSource提供,实现类为RealBufferdSource,底层InputStream->Buffer,然后基于Buffer的读取。

  • <font color='red'> Sink </font> 类似于java中的OutPutStream,Sink作为Okio中写入数据的顶层接口也只提供了简单的api

    void write(Buffer source, long byteCount) throws IOException;

    void flush() throws IOException;

    Timeout timeout();

    void close() throws IOException;

    更多写入api由它的子接口BufferedSink提供,
    实现类为RealBufferedSink,底层将数据写入到Buffer,再由Buffer写入到OutPutStream中。

这里省略了GzipSource,GzipSink,HashingSink,HashingSource...等其他实现Source和Sink的类,只关注主流程。

根据前面介绍和UML图得知,数据的读写在RealBufferedSource和RealBufferedSink中实现

2.Okio读写流程

作为一个简单切入点,这里提供一段Okio实现的输入流写入到指定文件的代码。

  /***

* 将字节输入流写入到指定文件中

* @return true 写入成功,false 写入失败

*/

fun copy(inputStream: InputStream, dest: File): Boolean {

val source = Okio.buffer(Okio.source(inputStream))

val sink = Okio.buffer(Okio.sink(dest))

val buffer = Buffer()

return try {

var length = source.read(buffer, 8192L)

while (-1L != length) {

sink.write(buffer, length)

sink.flush()

length = source.read(buffer, 8192L)

}

true

} catch (e: Exception) {

e.printStackTrace()

false

} finally {

source.close()

sink.close()

}

}

Okio.source(inputStream)实现了对InputStream的包装,将InputStream包装在Source对象中并返回。

private static Source source(final InputStream in, final Timeout timeout) {

...

return new Source() {

@Override public long read(Buffer sink, long byteCount) throws IOException {

...

if (byteCount == 0) return 0;

try {

//超时检查

timeout.throwIfReached();

//从SegmentPool中获取Segment

Segment tail = sink.writableSegment(1);

//根据Segment中可用大小计算最大可以往Segment中写多少字节

int maxToCopy = (int)

Math.min(byteCount, Segment.SIZE - tail.limit);

//从inputStream中将数据写到Segment中

int bytesRead = in.read(tail.data, tail.limit, maxToCopy);

//如果读完则返回

if (bytesRead == -1) return -1;

//追加已经写入的数据量,用于下次将数据从limit位置开始写入,也就是limit之前都是写入的数据

tail.limit += bytesRead;

//修正buffer中存储的字节数量

sink.size += bytesRead;

return bytesRead;

} catch (AssertionError e) {

if (isAndroidGetsocknameError(e)) throw new IOException(e);

throw e;

}

}

@Override public void close() throws IOException {

in.close();

}

@Override public Timeout timeout() {

return timeout;

}

@Override public String toString() {

return "source(" + in + ")";

}

};

}

//Buffer#writableSegment

Segment writableSegment(int minimumCapacity) {

...

//1

if (head == null) {

head = SegmentPool.take();

return head.next = head.prev = head;

}

//2

Segment tail = head.prev;

if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {

tail = tail.push(SegmentPool.take());

}

return tail;

}

  1. 如果链表头为空,则从SegmentPool中获取新节点并指向head结点返回。
  2. 因为是双向循环列表所以head.prev始终获取的是尾结点(当链表长度为1时指向自己),如果tail结点存储数据的容量已满或者tail.owner为false(即该sement不能追加写入)则从SementPool中获取新节点插入到该结点尾部,并返回新节点。

Okio.buffer(Okio.source(src))等价于Okio.buffer(source),将source包装在RealBufferedSource(source)内并返回。

val source = Okio.buffer(Okio.source(src))

source.read(buffer,8192L)

执行read((buffer,8192L)),实际是调用的RealBufferedSource.read(buffer,byteCount)。而RealBufferedSource.read(buffer,byteCount)内部又会调用被包装的Source的read(buffer,length)。即我们上面分析过的读数据代码。

//RealBufferedSource#read(buffer,8192L)

public long read(Buffer sink, long byteCount) throws IOException {

if (buffer.size == 0) {

//调用被包装的Source。

//即从InputStream中读取Segment.SIZE个字节写入到buffer中

long read =

source.read(buffer,Segment.SIZE);

//如果未读到数据则返回

if (read == -1) return -1;

}

//读到数据,此时数据保存在buffer中,接下来将buffer写入到sink中。即将一个缓冲区写到另一个缓冲区。

long toRead = Math.min(byteCount, buffer.size);

return buffer.read(sink, toRead);

}

//Buffer#read(sink,toRead)

public long read(Buffer sink, long byteCount) {

if (size == 0) return -1L;

if (byteCount > size) byteCount = size;

//Okio高效的地方就是buffer#write()的实现,后面会详细分析。这里先理解为将buffer中的数据写入到外部传递进来的sink中

sink.write(this, byteCount);

return byteCount;

}

Okio.sink(dest)将File转换为OutPutStream,然后包装在Sink对象中。

 public static Sink sink(OutputStream out) {

return sink(out, new Timeout());

}

private static Sink sink(final OutputStream out, final Timeout timeout) {

...

return new Sink() {

@Override public void write(Buffer source, long byteCount) throws IOException {

checkOffsetAndCount(source.size, 0, byteCount);

while (byteCount > 0) {

//超时检测

timeout.throwIfReached();

//获取链表头结点

Segment head = source.head;

//计算一次可以读多少字节

int toCopy = (int) Math.min(byteCount, head.limit - head.pos);

//从head中读取数据写入到OutPutStream中

out.write(head.data, head.pos, toCopy);

//修正head读到哪个位置,下次继续从pos位置开始读

head.pos += toCopy;

//递减直到byteCount=0退出循环即表示本次写完

byteCount -= toCopy;

//修正buffer中存储的字节大小

source.size -= toCopy;

//如果该Segment已经读完

if (head.pos == head.limit) {

//从链表中删除head并将head的下个结点赋值给head

source.head = head.pop();

//回收head结点

SegmentPool.recycle(head);

}

}

}

@Override public void flush() throws IOException {

out.flush();

}

@Override public void close() throws IOException {

out.close();

}

@Override public Timeout timeout() {

return timeout;

}

@Override public String toString() {

return "sink(" + out + ")";

}

};

}

Okio.buffer(Okio.sink(dest))等价于Okio.buffer(Sink),将Sink包装在RealBufferedSink(Sink)内并返回。

val sink = Okio.buffer(Okio.sink(dest))

sink.write(buffer, length)

sink.flush()

执行write(buffer, length)和flush()其实调用的是RealBufferedSink的write(buffer,length)和flush()。RealBufferedSink的write(buffer,length)和flush()最终会调用被包装的Sink的write(buffer,length)和flush()。

//RealBufferedSink#write(Buffer source, long byteCount)

public void write(Buffer source, long byteCount)

throws IOException {

//将source中的数据写入到buffer中,Okio高效的地方就是buffer#write的实现

buffer.write(source, byteCount);

emitCompleteSegments();

}

public BufferedSink emitCompleteSegments() throws IOException {

long byteCount = buffer.completeSegmentByteCount();

//检查缓冲区是否被写满,写满则将数据写入到OutPutStream中。未写满则等到下次写满或调用flush或close时将数据写入到OutPutStream中,起到一个缓冲作用。

if (byteCount > 0) {

//调用被包装的Sink#write(buffer, byteCount)

//将buffer中的数据写入到OutPutStream中

sink.write(buffer, byteCount);

}

return this;

}

public long completeSegmentByteCount() {

long result = size;

if (result == 0) return 0;

Segment tail = head.prev;

if (tail.limit < Segment.SIZE && tail.owner) {

result -= tail.limit - tail.pos;

}

return result;

}

至此Okio的输入到输出基本流程已分析完。根据源码分析可知Okio就是对Java io的一个封装和优化,底层还是使用的InputStream和OutputStream。既然和Java io底层使用一样方式读和写,那么它优势体现在哪里呢?有人可能会说他体现在api的简洁上,结构清晰,链式编程,调用方便。说的对,这算是它的优势,而这优势并不能说服我抛弃Java io而使用它,其实你也可以基于java io封装一套链式编程。它和直接使用Java io的最大优势并不api的简洁上,而是io流拷贝的效率上以及对内存的复用上,下节中会详细介绍。

3.Okio为什么比直接使用Java io更有优势

上节中我们提到Okio和直接使用Java io的最大优势并不api的简洁上,而是io流拷贝的效率上以及对内存的复用上。在说这两个优势之前我们先看看直接使用Java io和Okio从输入到输出都经过哪些步骤。

  • Java io

InputStream --> BufferedInputStream --> 临时byte数组 --> BufferedOutPutStream --> OutPutStream

由此可见Java io从输入到输出流程中出现了临时byte数组。意味着从BufferedInputStream->临时byte数组拷贝一次数据,从临时byte数组->BufferedOutPutStream再拷贝一次数据。

  • Okio

InputStream --> inBuffer --> 临时buffer --> outBuffer --> OutPutStream

看起来中间部分和Java io步骤一样。实则不然,Java io我们刚才说过经历两次拷贝,而Okio中间部分inBuffer ->临时buffer->outBuffer 其实不完全是数据的拷贝。在分析buffer->buffer时我们会详细描述为什么不完全是数据拷贝。buffer->buffer 定义在 Buffer#write(Buffer source, long byteCount) 方法中。根据wirte方法注释看出Okio在实现 buffer->buffer 有两个指标。

此处引入该博主对注释的翻译

  • 不要浪费CPU

不要浪费CPU即不要到处复制数据,从将整个Segments从一个缓冲区重新分配到另一个缓冲区。

  • 不要浪费内存

Segment作为一个不可变量,缓冲区中除了头节点和尾节点的片段以外,相邻的片段,至少应该保证50%以上的数据负载量(指的是Segment中的data数据, Okio认为data数据量在50%以上才算是被有效利用的)。由于头结点中需要读取消耗字节数据,而尾节点中需要写入产生字节数据,因此头结点和尾节点是不能保持不变性的。

  • 在缓冲区之间移动片段

    在将一个缓冲区写入另一个缓冲区时,我们更喜欢重新分配整个段,将字节复制到最紧凑的形式。假设我们有一个缓冲区,其中的片段负载为[91%,61%],如果我们要在这上面附加一个负载量为[72%]的单一片段,这样将产生的结果为[91%,61%,72%]。这期间不会进行任何的字节复制操作。(即空间换时间,牺牲内存,提供速度)

    再假设,我们有一个缓冲区负载量为:[100%,2%],并且我们希望将其附加到一个负载量为[99%,3%]的缓冲区中。这个操作将产生以下部分:[100%、2%、99%、3%],也就是说,我们不会花时间去复制字节来提高内存的使用效率,如变成[100%,100%,4%]这样。(即这种情况下Okio不会采取时间换空间的策略,因为太浪费CPU)

    在合并缓冲区时,当相邻缓冲区的合并级别不超过100%时,我们将压缩相邻缓冲区。例如,当我们在[100%,40%]基础上附加[30%,80%]时,结果将会是[100%,70%,80%]。(也就是中间相邻的负载为40%和30%的两个Segment将会被合并为一个负载为70%的Segment)

  • 分割片段

    有时我们只想将source buffer中的一部分写入到sink buffer当中,例如,给定一个sink为 [51%,91%],现在我们想要将一个source为[92%,82%]的前30%写入到这个sink buffer当中。为了简化,我们首先将source buffer转换为等效缓冲区[30%,62%,82%](即拆分Segment),然后移动source的头结点Segment即可,最终生成sink[51%,91%,30%]和source[62%,82%]。

根据上面注释的定义,我们可知在进行buffer数据转移时,根据不同策略执行不同操作以达到CPU和内存之间的平衡,那么来看下buffer转移的代码实现。

//Buffer#write

public void write(Buffer source, long byteCount) {

while (byteCount > 0) {

//如果复制的数据量比原缓冲区已有数据量小

if (byteCount < (source.head.limit - source.head.pos)) {

//获取目标缓冲区尾结点

Segment tail = head != null ? head.prev : null;

//如果目标缓冲区尾结点不为空,并且是数据拥有者即可以追加数据并且目标缓冲区可以存下该数据

if (tail != null && tail.owner

&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {

//如将[10%]追加到[20%],直接拷贝。最终结果[30%]

//将原缓冲区拷贝到目标缓冲区

source.head.writeTo(tail, (int) byteCount);

source.size -= byteCount;

size += byteCount;

return;

} else {

//如果目标缓冲区尾结点为空即目标缓冲区为空缓冲区 或者不为空但是的空间不足,或者不是持有者,这时就需要把原缓冲区的头结点分割为两个Segment,

//然后将原缓冲区的头指针更新为分割后的第一个Segment, 如[92%, 82%]变成[30%, 62%, 82%]这样

source.head = source.head.split((int) byteCount);

}

}

// 从原缓冲区的链表中移除头结点, 并加入到目标缓冲区的尾结点

Segment segmentToMove = source.head;

long movedByteCount = segmentToMove.limit - segmentToMove.pos;

source.head = segmentToMove.pop();

//如果目标缓冲区为空,则创建链表并将原缓冲区的链表头结点赋值给目标缓冲区结点的头结点

if (head == null) {

head = segmentToMove;

head.next = head.prev = head;

} else {

//目标缓冲区不为空,则向目标缓冲区链表追加原缓冲区结点的头结点。并尝试合并,如[60%,20%]追加[10%]。那么目标缓冲区结点为[60%,20%,10%]。然后合并后为[60%,30%]。

//合并成功回收多余结点以节省空间

Segment tail = head.prev;

tail = tail.push(segmentToMove);

tail.compact();

}

source.size -= movedByteCount;

size += movedByteCount;

byteCount -= movedByteCount;

}

}

根据上面源码分析以及注释来回答为什么Okio比Java io高效。

Java中读写数据一般为了高效我们引入BufferedInputStream和BufferedOutPutStream。这里以BufferedInputStream读写磁盘文件为例分析。在BufferedInputStream中当一次读取的字节数大于缓冲区大小会摒弃缓冲区,直接从磁盘中读取。
如果一次读取的字节数小于缓冲区大小,则先从磁盘中读取缓冲区大小个字节(BufferedInputStream中默认定义为8k)。然后每次从缓冲区读取设置的读取数量。直到缓冲区读完。然后再从磁盘中读取...直到整个磁盘数据读完

而Okio读取时不管你读取的字节长度是否大于缓冲区大小。直接读取8k数据到缓冲区,然后根据你设置的读取大小和当前缓冲区已有数据大小做比较取最小值来进行数据转移。

举个例子

比如数据16K,读取一次到临时变量:

读取大小设置为4k

  • Okio经历0次拷贝,inBuffer->临时buffer,只是分割inBuffer数据,将分割后的数据赋值给临时buffer,只是指针的修改
  • 而Java io读取一次到临时变量经历1次拷贝,即buffer->临时byte数组。

读取大小设置为8k

  • Okio经历0次拷贝,inBuffer->临时buffer,只是指针的修改
  • 而Java io读取一次到临时变量经历0次拷贝,因为大于等于缓冲区大小则直接从磁盘读取即InputStream->临时byte数组。

读取大小设置为16k

  • Okio经历0次拷贝,inBuffer->临时buffer,只是指针的
    修改。但是经历两次read即经历两次指针修改。
  • 而Java io读取一次到临时变量经历0次拷贝,因为大于等于缓冲区大小则直接从磁盘读取即InputStream->临时byte数组。经历一次read,但是浪费内存。

从上面举例说明中可以看出Okio在CPU和内存做了很好的权衡,超过8k就只读8k,减少一次性加载到内存的数据。
没超过8k,数据的复制也只是修改链表指针。

小结:
Okio比直接使用Java io高效得益于它底层对缓冲区的实现结构,将数据的缓冲区定义为链表结构是为了更好从缓冲区到缓冲区数据的移动,即不浪费CPU(不到处复制数据),在内存方面它引入SegmentPool来复用Segment。毕竟直接开辟一个8k的byte[]还是很浪费的。以及对缓冲区链表结点的数据进行压缩处理减少不必要的内存开销。

4.Okio的超时检测

超时机制分为同步检测和异步检测机制,先从简单的开始。
下面以读取数据检测超时为例进行说明。

4.1 同步检测

通过前面分析,调用read()时其实调用了RealBufferedSource#read()。而RealBufferedSource#read()又会调用被包装的Source,即Okio#source()创建的Source的read()。

//Okio#source()返回的Source

private static Source source(final InputStream in, final Timeout timeout) {

...

return new Source() {

public long read(Buffer sink, long byteCount) throws IOException {

...

try {

//超时检测

timeout.throwIfReached();

...

int bytesRead = in.read(tail.data, tail.limit, maxToCopy);

...

return bytesRead;

} catch (AssertionError e) {

if (isAndroidGetsocknameError(e)) throw new IOException(e);

throw e;

}

}

...

};

}

//Timeout

public class Timeout {

private boolean hasDeadline;

private long deadlineNanoTime;

private long timeoutNanos;

public Timeout() {

}

public Timeout deadlineNanoTime(long deadlineNanoTime) {

this.hasDeadline = true;

this.deadlineNanoTime = deadlineNanoTime;

return this;

}

public void throwIfReached() throws IOException {

if (Thread.interrupted()) {

throw new InterruptedIOException("thread interrupted");

}

if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {

throw new InterruptedIOException("deadline reached");

}

}

}

根据代码分析在每次调用read()都会调用timeout#throwIfReached(),结合Timeout类中定义,当调用deadlineNanoTime()设置截止时间,hasDeadline,deadlineNanoTime会被赋值。即throwIfReached()的调用才会起到检查超时作用,也就是同步检测超时机制就是根据时间的流逝来判断是否超时。

4.2 异步检测

异步检测Okio用在对Socket输入流的读取和输出流的写入检测,这里仅以输入流检测为例进行说明。先对异步检测整体设计描述,让我们对整体上有一个
宏观上的认识;不至于在分析源码时抓不住重点,最后再对代码实现进行详细分析。

异步检测整体设计如下:

  • 结构上使用单链表作为检测超时的结构,将超时时间封装到结点中,按照超时时间的升序插入到链表中。也就是马上要过期的结点为头结点的下个结点。而头结点在这里起到了看门狗的作用。所以头节点保持不变。
  • 当开始从socket#inputStream中读取时,启动一个监视线程(Watchdog)。不断的获取头结点的下个结点即被监视的结点并判断是否为空,为空则等待60S,60S后如果还为空则退出监视器;不为空则取出该结点存储的超时时间判断是否超时。如果没超时则等待该结点存储的超时时间,时间到后或者被链表插入操作唤醒,则会走一遍流程;如果超时了则删除该结点,并关闭socket。
  • 整个过程如果没有发生超时,则在读取完后删除被监视的结点。直到监视线程wait()等待设置的时间后发现没有需要监视的结点了,然后退出整个监视线程。或者还在wait()中时,又read()了一次,即链表中添加了新的被监视结点。这时wait被唤醒,唤醒后开始监视新的结点。

下面对代码进行分析,从以Socket创建Source开始

//Okio.source(socket)

public static Source source(Socket socket) throws IOException {

//第一步 创建AsyncTimeout并包装socket,包装是为了在超时是调用timedOut()来关闭socket。AsyncTimeout是Timeout的子类

AsyncTimeout timeout = timeout(socket);

//第二步 创建source并包装socket.getInputStream()

Source source = source(socket.getInputStream(), timeout);

//第三步 创建source并包装第二步的source,也就是当外部调用source.read时,其实调用的是第二步的read,

而第三步的包装是为了在第二步read时多加一层监视

return timeout.source(source);

}

//创建AsyncTimeout并包装socket,AsyncTimeout是Timeout的子类

private static AsyncTimeout timeout(final Socket socket) {

return new AsyncTimeout() {

@Override protected IOException newTimeoutException(@Nullable IOException cause) {

...

}

@Override protected void timedOut() {

try {

socket.close();

} catch (Exception e) {

...

}

}

};

}

//第二步 创建source并包装socket.getInputStream()

private static Source source(final InputStream in, final Timeout timeout) {

...

return new Source() {

@Override public long read(Buffer sink, long byteCount) throws IOException {

...

try {

...

int bytesRead = in.read(tail.data, tail.limit, maxToCopy);

...

return bytesRead;

} catch (AssertionError e) {

if (isAndroidGetsocknameError(e)) throw new IOException(e);

throw e;

}

}

...

};

}

//第三步 创建source并包装第二步的source,也就是当外部调用source.read时,

//其实调用的是第二步的read,

//而第三步的包装是为了在read时多加一层超时检测

public final Source source(final Source source) {

return new Source() {

@Override public long read(Buffer sink, long byteCount) throws IOException {

boolean throwOnTimeout = false;

//在enter方法内部启动超时检测

enter();

try {

long result = source.read(sink, byteCount);

throwOnTimeout = true;

return result;

} catch (IOException e) {

throw exit(e);

} finally {

//read执行完毕删除被监视的结点

exit(throwOnTimeout);

}

}

@Override public void close() throws IOException {

boolean throwOnTimeout = false;

try {

source.close();

throwOnTimeout = true;

} catch (IOException e) {

throw exit(e);

} finally {

exit(throwOnTimeout);

}

}

@Override public Timeout timeout() {

return AsyncTimeout.this;

}

...

};

}

public final void enter() {

if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");

long timeoutNanos = timeoutNanos();

boolean hasDeadline = hasDeadline();

if (timeoutNanos == 0 && !hasDeadline) {

return;

}

inQueue = true;

scheduleTimeout(this, timeoutNanos, hasDeadline);

}

private static synchronized void scheduleTimeout(

AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {

if (head == null) {

head = new AsyncTimeout();

//启动监视器

new Watchdog().start();

}

long now = System.nanoTime();

...

node.timeoutAt = now + timeoutNanos;

//按照超时时间升序插入到链表中,头结点后的结点就是即将超时的节点

//还有多长时间就超时了

long remainingNanos = node.remainingNanos(now);

for (AsyncTimeout prev = head; true; prev = prev.next) {

if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {

node.next = prev.next;

prev.next = node;

if (prev == head) {

//如果是插入到头节点后,那么唤醒监视器

AsyncTimeout.class.notify();

}

break;

}

}

}

private static final class Watchdog extends Thread {

Watchdog() {

super("Okio Watchdog");

setDaemon(true);

}

public void run() {

while (true) {

try {

AsyncTimeout timedOut;

synchronized (AsyncTimeout.class) {

timedOut = awaitTimeout();

if (timedOut == null) continue;

if (timedOut == head) {

head = null;

return;

}

}

//表示读取超时关闭socket

timedOut.timedOut();

} catch (InterruptedException ignored) {

}

}

}

}

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {

//根据头结点(看门狗)获取被监视的结点

AsyncTimeout node = head.next;

//如果不存在被监视的结点则等待IDLE_TIMEOUT_MILLIS秒(60s)。

//等待后还是不存在并且时间已经超过了60s,则返回头节点(看门狗)然后退出监视器。

//等待后不为空即存在被监视的结点,则返回null,继续下次循环,循环后node!=null。

if (node == null) {

long startNanos = System.nanoTime();

AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);

return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS

? head // The idle timeout elapsed.

: null; // The situation has changed.

}

//还有多长时间过期

long waitNanos = node.remainingNanos(System.nanoTime());

//还没有超时,等待

if (waitNanos > 0) {

long waitMillis = waitNanos / 1000000L;

waitNanos -= (waitMillis * 1000000L);

AsyncTimeout.class.wait(waitMillis, (int) waitNanos);

//如果是等待后,则说明超时了,下次循环waitNanos<=0。走下面代码

//如果是被notifyAll后,说明有新结点的插入,则需要重新判断

return null;

}

//被监视的结点已超时,删除被被监视的结点,并返回用于外部调用该结点的timedOut()来关闭数据流。

head.next = node.next;

node.next = null;

return node;

}

以上是 OKio源码分析 的全部内容, 来源链接: utcz.com/z/267547.html

回到顶部