Java学习笔记16Netty缓冲区ByteBuf详解
Java学习笔记16-Netty缓冲区ByteBuf详解
Netty自己的ByteBuf
ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。
JDK ByteBuffer的缺点:
- 无法动态扩容:长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
- API使用复杂:读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些api,否则很容易出现错误。
ByteBuf做了哪些增强:
- API操作便捷性
- 动态扩容
- 多种ByteBuf实现
- 高效的零拷贝机制
ByteBuf操作
ByteBuf三个重要属性:readerIndex读取位置、writerIndex写入位置、capacity容量
提供了两个指针变量来支持顺序读和写操作,分别是readerIndex和writerIndex
常用方法定义
随机访问索引 getByte
顺序读 read*
顺序写 write*
清除已读内容 discardReadBytes
清除缓冲区 clear
搜索操作
标记和重置
引用计数和释放
discardable bytes readable bytes writable bytes
已读可丢弃区域
可读区域
待写区域
0<= readerIndex
<= writerIndex
<= capacity
ByteBuf动态扩容
capacity默认值:256字节;最大值:Integer.MAX_VALUE(2GB)
write*方法调用时,通过AbstractByteBuf.ensureWritable0进行检查。
容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)
根据新capacity的最小值要求,对应有两套计算方法:
没超过4M:从64字节开始,每次增加1倍,直至计算出来的newCapacity满足新容量的最小要求。
示例:当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是64*2*2*2=512
超过4M:新容量 = 新容量最小要求 / 4M * 4M +4M
示例:当前大小3M,已写3M,继续写2M数据,需要的容量最小要求是5M,则新容量是9M(不能超过最大值)。
4M的来源:一个固定的阀值AbstractByteBufAllocator.CALCULATE_THRESHOLD
选择合适的ByteBuf实现
了解核心的:3个纬度的划分方式,8种具体实现
堆内/堆外 是否池化 访问方式 具体实现类 备注
unpool
safe
UnpooledHeapByteBuf
数组实现
heap堆内
unsafe
UnpooledUnsafeHeapByteBuf
Unsafe类直接操作内存
pool
safe
PooledHeapByteBuf
~
unsafe
PooledUnsafeHeapByteBuf
~
unpool
safe
UnpooledDirectByteBuf
NIO DirectByteBuffer
direct堆外
unsafe
UnpooledUnsafeDirectByteBuf
~
pool
safe
PooledDirectByteBuf
~
unsafe
PooledUnsafeDirectByteBuf
~
在使用中,都是通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能
Unsafe的实现
unsafe意味着不安全的操作。但是更底层的操作会带来性能的提升和特殊功能,Netty中会尽力使用unsafe。
Java语言很重要的特性是“一次编写到处运行”,所以它针对底层的内存或者其他操作,做了很多封装。
而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常。
Info.仅返回一些低级的内存信息 Objects.提供用于操作对象及其字段的方法 Classes.提供用于操作类及其静态字段的方法
addressSize
allocateInstance
staticFieldOffset
pageSize
objectFieldOffset
defineClass
defineAnonymousClass
ensureClassInitialized
Synchronization.低级的同步原语 Memory.直接访问内存方法 Arrays.操作数组
monitorEnter
allocateMemory
arrayBaseOffset
tryMonitorEnter
copyMemory
arrayIndexScale
monitorExit
freeMemory
compareAndSwapInt
getAddress
putOrderedInt
getInt
putInt
PooledByteBuf对象、内存复用
PoolThreadCache:PooledByteBufAllocator实例维护的一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。
PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory。
PooledByteBufAllocator.ioBuffer运作过程梳理:
EventLoop - Thread --allocate--> Arena(负责buf分配管理) -->
创建或复用ByteBuf对象
PooledByteBuf stack
RECYCLER ---->
buffer
cache
尝试从对应的缓存 复用内存空间
PoolThreadCache TINY_MR_CACHE * 32 Q[512]
SMALL_MR_CACHE * 4 Q[256]
NORMAL_MR_CACHE * 3 Q[64]
无缓存时,从内存中申请 直接向内存申请 unpool
零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。
CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
wrappedBuffer()方法,将byte[]数组包装成ByteBuf对象。
ByteBuf newBuffer = Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5});
slice()方法。将一个ByteBuf对象切分成多个ByteBuf对象。
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
ByteBuf测试代码
import io.netty.buffer.ByteBuf;import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Arrays;
/**
* @Author: Wenx
* @Description:
* @Date: Created in 2019/11/25 22:31
* @Modified By:
*/
public class ByteBufDemo {
public static void main(String[] args) {
apiTest();
compositeTest();
wrapTest();
sliceTest();
}
public static void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
//ByteBuf buf = Unpooled.directBuffer(10);
println("1.原始ByteBuf为", buf);
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
print("2.写入的bytes为", bytes);
println("写入内容后ByteBuf为", buf);
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
print("3.读取的bytes为", new byte[]{b1, b2});
println("读取内容后ByteBuf为", buf);
// 4.将读取的内容丢弃
buf.discardReadBytes();
println("4.将读取的内容丢弃后ByteBuf为", buf);
// 5.清空读写指针
buf.clear();
println("5.清空读写指针后ByteBuf为", buf);
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
print("6.写入的bytes为", bytes2);
println("写入内容后ByteBuf为", buf);
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
println("7.将内容清零后ByteBuf为", buf);
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
print("8.写入的bytes为", bytes3);
println("写入内容后ByteBuf为", buf);
// 随机访问索引 getByte
// 顺序读 read*
// 顺序写 write*
// 清除已读内容 discardReadBytes
// 清除缓冲区 clear
// 搜索操作
// 标记和重置
// 完整代码示例:参考
// 搜索操作 读取指定位置 buf.getByte(1);
}
public static void compositeTest() {
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
print("buffer1为", buffer1);
print("buffer2为", buffer2);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
println("CompositeByteBuf为", newBuffer);
}
public static void wrapTest() {
byte[] arr = {1, 2, 3, 4, 5};
ByteBuf newBuffer = Unpooled.wrappedBuffer(arr);
print("byte[]为", arr);
print("wrappedBuffer为", newBuffer);
print("newBuffer.getByte(4)为", newBuffer.getByte(4));
arr[4] = 6;
println("byte[4] = 6; 后newBuffer.getByte(4)为", newBuffer.getByte(4));
}
public static void sliceTest() {
ByteBuf oldBuffer = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = oldBuffer.slice(1, 2);
print("oldBuffer为", oldBuffer);
print("oldBuffer.slice(1, 2); 为", newBuffer);
print("newBuffer.getByte(0)为", newBuffer.getByte(0));
print("newBuffer.getByte(1)为", newBuffer.getByte(1));
// 新buf中原buf的引用
ByteBuf buf = newBuffer.unwrap();
print("newBuffer.unwrap()为", buf);
print("buf.getByte(0)为", buf.getByte(0));
print("buf.getByte(1)为", buf.getByte(1));
print("buf.getByte(2)为", buf.getByte(2));
print("buf.getByte(3)为", buf.getByte(3));
print("buf.getByte(4)为", buf.getByte(4));
}
private static void print(String str, byte b) {
System.out.println(String.format("%s==========>%s", str, b));
}
private static void print(String str, byte[] bytes) {
System.out.println(String.format("%s==========>%s", str, Arrays.toString(bytes)));
}
private static void print(String str, ByteBuf buf) {
print(str, buf, "");
}
private static void print(String before, ByteBuf buf, String after) {
byte[] bytes;
if (buf.hasArray()) {
bytes = buf.array();
} else {
int capacity = buf.capacity();
bytes = new byte[capacity];
for (int i = 0; i < buf.capacity(); i++) {
bytes[i] = buf.getByte(i);
}
}
System.out.println(String.format("%s==========>%s(ridx:%s, widx: %s, cap: %s)%s", before, Arrays.toString(bytes), buf.readerIndex(), buf.writerIndex(), buf.capacity(), after));
}
private static void println(String str, byte b) {
System.out.println(String.format("%s==========>%s
", str, b));
}
private static void println(String str, ByteBuf buf) {
print(str, buf, "
");
}
}
以上是 Java学习笔记16Netty缓冲区ByteBuf详解 的全部内容, 来源链接: utcz.com/z/511202.html