Java高并发程序设计学习笔记(十一):Jetty分析

java

new Server()
初始化线程池
QueuedThreadPool
execute()方法
BlockingQueue
初始化ServerConnector
初始化ScheduledExecutorScheduler
初始化ByteBufferPool
ArrayByteBufferPool
结构
acquire
release
例外处理
总结
维护ConnectionFactory
取得可用CPU数量
更新acceptor数量
创建acceptor线程组
初始化ServerConnectorManager
保存selector线程数量
设置port
关联Sever和Connector
Server.start()
设置启动状态
启动过程doStart()
注册ShutdownMonitor
获取化线程池
设置selector数量
计算所需的所有线程数量
如果大于默认的200则中断程序
维护Bean
启动QueuedThreadPool
创建线程
设置线程的属性
启动线程 thread.start();
启动WebAppContext
启动Connector
取得ConnectionFactory
创建selector线程并启动
创建Acceptor线程
设置线程名字
设置优先级
将自己放入_acceptors数组
监听端口
没有Acceptor的情况
启动完毕
Http请求
Accept成功
设置为非阻塞模式
配置Socket
正式处理
选择可用的ManagedSelector线程
ManagedSelector处理
提交任务
请求处理
ManagedSelector.run()
select()
runChanges();
select()
处理SelectionKey

本次笔记是对jetty,一个servlet容器,一个httpServer,内部的实现,因为代码量量比较多,不会从架构的方向介绍,作为一个servlet容器,不会介绍如何使用jsp,而是通过多线程的方向,从代码触发,看究竟是如何提高并发量的。
综上本次笔记并没有对jetty具体是如何用的做介绍,主要是学习jetty高并发的处理手段。
new Server()
public Server(@Name("port")int port) {
this((ThreadPool)null);
ServerConnector connector=new ServerConnector(this); connector.setPort(port);
setConnectors(new Connector[]{connector});
}
1
2
3
4
5
初始化线程池
public Server(@Name("threadpool") ThreadPool pool) {
_threadPool=pool!=null?pool:new QueuedThreadPool(); addBean(_threadPool);
setServer(this);
}
1
2
3
4
QueuedThreadPool
并不是初始化jdk的线程池,而是new自己的QueuedThreadPool,QueuedThreadPool实现了SizedThreadPool

execute()方法
@Override
public void execute(Runnable job) {
if (!isRunning() || !_jobs.offer(job)) {
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString()); }
else {
// Make sure there is at least one thread executing the job. if (getThreads() == 0)
startThreads(1);
}
}

1
2
3
4
5
6
7
8
9
10
11
这里的核心是将任务压入队列 : _jobs.offer(job)

BlockingQueue
这里的_jobs是BlockingQueue,保存线程池执行的所有的任务。
将任务推入
BlockingQueue org.eclipse.jetty.util.thread.QueuedThreadPool._jobs

BlockingQueue不是一个高性能的,所以execute不太可能被非常频繁的调用。

初始化ServerConnector
处理一些http的连接以及NIO、Selector一些的东西。
HTTP connector using NIO ByteChannels and Selectors
继承自 AbstractConnector

初始化ScheduledExecutorScheduler
调度器
based on JDK’s {@link ScheduledThreadPoolExecutor}.
有些任务是需要每隔一段时间执行一次的,比如每一分钟需要检查一些东西,诸如此类的任务就是由schedule来调度的。

初始化ByteBufferPool
ByteBuffer是一个数组,分为两种_indirect ByteBuffer(分配在堆当中)和_directByteBuffer(分配在内存当中),相当于一个对象池,这其中的byteBuffer实际上是可以复用的。对象池的好处就是可以减少gc,减少new。在java中new是经过了绝对的优化的,性能还是比较高,关键是回收,新生代的回收会更加的频繁。如果自己写一个线程池,还不如通过new、gc的方式,因为线程池必须是线程安全的,所以好多线程在用,拿到对象和归还对象的时候必须是线程安全的,如果简单的加synchronize,除非是特别大的超级对象,性能可能比new,好一点,否则,如果用synchronize构造的线程池性能还不如new出来的。

ArrayByteBufferPool
普通对象池中的对象都是对立的,我去拿任何一个对象都是一样的,都是等价的,但是ByteBufferPool不同,因为你可能需要2k的bytebuffer,也可能需要一个2m的bytebuffer

public ArrayByteBufferPool(int minSize, int increment, int maxSize)
public ArrayByteBufferPool() {
this(0,1024,64*1024);
}
_direct=new Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment];
1
2
3
4
5
6
minSize其实大小(最小的容量),increment增量,maxSize最大的大小

结构
Bucket理解为篮子,一个篮子里面只放一中大小的buffer
因为1-63k不可能每一个都有,也许系统中只需要32k,这个时候1k、2k等等都不需要,所以有一个延迟加载的功能
_direct Bucket数组 _indirect Bucket数组
为每一个大小,新建一个Bucket 但不初始化ByteBuffer

int size=0;
for (int i=0;i<_direct.length;i++) {
size+=_inc;
_direct[i]=new Bucket(size);
_indirect[i]=new Bucket(size);
}
1
2
3
4
5
6
一个Bucekt存放大小相同的所有的ByteBuffer
_size
bytebuffer大小
_queue
public final Queue _queue= new ConcurrentLinkedQueue<>();
初始化ByteBuPool的时候所有的Bucket都创建一遍,但是Bucket里面的Queue里面的内容都是延迟加载的

acquire
请求线程池。

public ByteBuffer acquire(int size, boolean direct)
1
size多大的内存,direct直接内存还是堆。
取得合适的Bucket 每个Bucket的大小不同,这里找到最合适的,取最接近的Buket大小

Bucket bucket = bucketFor(size,direct);
1
从Bucket中取得ByteBuffer

ByteBuffer buffer = bucket==null?null:bucket._queue.poll();
1
不存在则新建

if (buffer == null) {
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}

1
2
3
4
5
release
用完了释放掉,还给线程池。

public void release(ByteBuffer buffer) {
if (buffer!=null) {
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
{
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
}
}
1
2
3
4
5
6
7
8
9
10
取得合适的Bucket

Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
1
清空Buffer

BufferUtil.clear(buffer);
1
归还Pool

bucket._queue.offer(buffer);
1
例外处理
如果申请的ByteBuffer过大或者过小,无法在POOL中满足,则可以申请成功,但无法归还给POOL。
加入目前只有1-64k的bytebuffer,但是我需要120k大小的bytebuffer,就需要申请。
可以看到上面的require没有的话会创建,但是因为无法归还,所以最后会被gc

总结
通过学习ByteBufferPool,首先是对于所有的对象池都应该无锁的,如果有锁,还不如new出来,第二个就是bucket,因为bytebuffer大小是不确定的,所做的相应的处理。

维护ConnectionFactory
HttpConnectionFactory
用于创建连接, 比如Accept后,需要创建一个表示连接的对象

取得可用CPU数量
int cores = Runtime.getRuntime().availableProcessors();
1
我的系统中应该使用多少accept线程,应该使用多少个selector线程,都要由cores算出来的,这也就是对于高并发的程序来讲,你必须自适应cpu的数量,否则在2核的cpu上和64核的cpu上,完全没有办法很好的协调,

更新acceptor数量
根据cpu的数量更新acceptor数量

if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));

1
2
3
它认为accept的数量应该是比较小的,上面可以发现不会超过四个。

创建acceptor线程组
_acceptors = new Thread[acceptors];
1
初始化ServerConnectorManager
继承自 SelectorManager

_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
1
保存selector线程数量
Math.min(4,Runtime.getRuntime().availableProcessors()/2))
1
最多也不超过四个

设置port
connector.setPort(port);
1
关联Sever和Connector
setConnectors(new Connector[]{connector});
1
Server.start()
org.eclipse.jetty.server.Server
启动web服务器

WebAppContext context = new WebAppContext();
context.setContextPath("/");
context.setResourceBase("./web/"); context.setClassLoader(Thread.currentThread().getContextClassLoader()); server.setHandler(context);
server.start();
1
2
3
4
设置启动状态
AbstractLifeCycle
private void setStarting() {
if (LOG.isDebugEnabled()) LOG.debug("starting {}",this);
_state = __STARTING;
for (Listener listener : _listeners)
listener.lifeCycleStarting(this);
}
1
2
3
4
5
6
7
启动过程doStart()
Server
启动整个server

protected void doStart() throws Exception
{
//If the Server should be stopped when the jvm exits, register //with the shutdown handler thread.
if (getStopAtShutdown())
ShutdownThread.register(this);
//Register the Server with the handler thread for receiving //remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize
LOG.info("jetty-" + getVersion()); HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); MultiException mex=new MultiException();
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class); int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
if (mex.size()==0)
{
for (Connector connector : _connectors) {
if (connector instanceof AbstractConnector) acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector) selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
int needed=1+selectors+acceptors; if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
try {
super.doStart();
}
catch(Throwable e) {
mex.add(e); }
// start connectors last
for (Connector connector : _connectors) {
try {
connector.start();
}catch(Throwable e) {
mex.add(e);
}
}
if (isDumpAfterStart()) dumpStdErr();
mex.ifExceptionThrow();
LOG.info(String.format("Started @%dms",Uptime.getUptime()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
注册ShutdownMonitor
远程控制接口

//Register the Server with the handler thread for receiving //remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize
1
2
3
允许远程的将jetty的server关掉。

获取化线程池
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);
1
2
QueuedThreadPool

设置selector数量
根据Connector数量进行累计 大部分情况下,只有一个ServerConnector,因为每个connector都有selector

for (Connector connector : _connectors) {
if (connector instanceof AbstractConnector) acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector) selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
1
2
3
4
累计所有Connector的需求

计算所需的所有线程数量
int needed=1+selectors+acceptors;

如果大于默认的200则中断程序
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d
+ selectors=%d + request=1)",max,acceptors,selectors));
1
2
3
因为如果光accept和selector线程都超过200说明性能很差了,没有必要再继续往下跑了。

维护Bean
启动QueuedThreadPool
doStart()
startThreads()建立需要的线程

创建线程
Thread thread = newThread(_runnable);
_runnable _jobs中取任务并执行

设置线程的属性
thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); thread.setName(_name + “-” + thread.getId()); _threads.add(thread);

启动线程 thread.start();
启动WebAppContext
如果需要使用,在此处启动,去做servlet规范所做的内容,这里就不展开讲了。

启动Connector
取得ConnectionFactory
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
1
创建selector线程并启动
for (int i = 0; i < _selectors.length; i++) {
ManagedSelector selector = newSelector(i); _selectors[i] = selector;
selector.start();
execute(new NonBlockingThread(selector));
}
1
2
3
4
5
newSelector()

protected ManagedSelector newSelector(int id) {
return new ManagedSelector(id);
}
1
2
3
创建Acceptor线程
//创建几个accept线程
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
//执行
getExecutor().execute(a);
}
1
2
3
4
5
6
7
8
9
Acceptor

设置线程名字

final Thread thread = Thread.currentThread();
String name=thread.getName();
_name=String.format("%s-acceptor-%d@%x-
%s",name,_acceptor,hashCode(),AbstractConnector.this.toString()); thread.setName(_name);
1
2
3
4
5
设置优先级
将自己放入_acceptors数组
synchronized (AbstractConnector.this) {
_acceptors[_acceptor] = thread;
}

1
2
3
4
监听端口
try {
while (isAccepting()) {
try {
accept(_acceptor); }
catch (Throwable e) {
if (isAccepting())
LOG.warn(e);
else LOG.ignore(e);
}
}
} finally {
thread.setName(name);
if (_acceptorPriorityDelta!=0) thread.setPriority(priority);
synchronized (AbstractConnector.this) {
_acceptors[_acceptor] = null;
}
CountDownLatch stopping=_stopping;
if (stopping!=null)
stopping.countDown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ServerConnector.accept()

public void accept(int acceptorID) throws IOException {
ServerSocketChannel serverChannel = _acceptChannel; if (serverChannel != null && serverChannel.isOpen()) {
SocketChannel channel =serverChannel.accept();
accepted(channel);
}
}
1
2
3
4
5
6
在accept的地方等待

没有Acceptor的情况
channle默认是blocking的
如果acceptor数量为0,没有安排线程专门进行accept,则设置为非阻塞模式 若是非0,有专门线程进行accept,因此,为阻塞模式

protected void doStart() throws Exception {
super.doStart();
if (getAcceptors()==0) {
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}
1
2
3
4
5
6
7
启动完毕
Http请求
Accept成功
private void accepted(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
_manager.accept(channel);
}
1
2
3
4
5
6
设置为非阻塞模式
channel.configureBlocking(false);
1
配置Socket
Socket socket = channel.socket();
configure(socket);
1
2
正式处理
SelectorManager _manager;
_manager.accept(channel);
1
2
选择可用的ManagedSelector线程
private ManagedSelector chooseSelector() {

//The ++ increment here not atomic, but does not matter,
// so long as the value chages sometimes,then connections will
//be distributed over the available selectors
long s = _selectorIndex++;
int index = (int)(s % getSelectorCount());
return _selectors[index];
}
1
2
3
4
5
6
7
8
9
这里有一个很有意思的注释,++并不是原子操作,但是不影响,因为这里只需要++就可以了,并不需要每次的数字搜不一样。

ManagedSelector处理
ManagedSelector 是一个线程 封装了Selector 的使用

提交任务
selector.submit(selector.new Accept(channel, attachment));
1
提交这个处理任务到ManagedSelector:

private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>(); _changes.offer(change);
1
任务提交到了_chages这个队列中,ConcurrentArrayQueue这个队列是jetty中自己实现的。
ConcurrentArrayQueue
与ConcurrentLinkedQueue相似的性能,但直接保存元素 而不是node,因此需要更少的对象,更少的GC
因为ConcurrentLinkedQueue是一个链表,所以要有node,并且指向下一个node,而ConcurrentArrayQueue不是一个链表,所以需要更少的对象,更少的GC

请求处理
ManagedSelector.run()
while (isRunning())
select();
1
2
select()
发现有任务就执行

runChanges();
参见: 提交任务

private void runChanges() {
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
1
2
3
4
5
将_changes中的任务拿出来做执行。
runChange()
change.run();

Accept.run

SelectionKey key = channel.register(_selector, 0, attachment);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
1
2
3
注册selector并将endpoint传给后面。

select()
runChanges实质上是将selector和channel连接到一起,接下来就是等待read或者write准备好

int selected = _selector.select();
1
处理SelectionKey
当发现任何一个selector可用的时候就会处理SelectionKey

Set<SelectionKey> selectedKeys = _selector.selectedKeys(); for (SelectionKey key : selectedKeys)
{
if (key.isValid()) {
processKey(key);
}
else {
if (debug)
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
((EndPoint)attachment).close();
}
}
selectedKeys.clear();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
processKey()

private void processKey(SelectionKey key) {
Object attachment = key.attachment(); try
{
if (attachment instanceof SelectableEndPoint) {
((SelectableEndPoint)attachment).onSelected();
}
else if (key.isConnectable()) {
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
} else {
throw new IllegalStateException(); }
}
catch (CancelledKeyException x) {
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x) {
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
onSelected()

@Override
public void onSelected() {
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps; setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
getFillInterest().fillable(); if (_key.isWritable())
getWriteFlusher().completeWrite();
}
1
2
3
4
5
6
7
8
9
10
11
会使用新的线程进行HTTP业务处理 (提交到线程池)
---------------------
作者:Leesin Dong
来源:CSDN
原文:https://blog.csdn.net/dataiyangu/article/details/87894253
版权声明:本文为博主原创文章,转载请附上博文链接!

以上是 Java高并发程序设计学习笔记(十一):Jetty分析 的全部内容, 来源链接: utcz.com/z/392251.html

回到顶部