Hadoop源码 – ipc.Server - java20130722

java

Hadoop源码 – ipc.Server

2013-04-16 18:15 

java20130722 

阅读(214) 

评论(0) 

编辑 

收藏 

举报

1、前言

昨天分析了ipc包下的RPC、Client类,今天来分析下ipc.Server。Server类因为是Hadoop自己使用,所以代码结构以及流程都很清晰,可以清楚的看到实例化、停止、运行等过程。

2、Server类结构

上面是Server的五个内部类,分别介绍一下:

1)Call

用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;

2)Listener

监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。

3)Responder

响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

4)Connection

连接类,真正的客户端请求读取逻辑在这个类中。

5)Handler

请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

3、Server初始化

第一篇博客说了,Server的初始化入口在RPC.getServer中,getServer其实是调用的RPC.Server静态类中的构造方法,我们看看Namenode创建RPCServer的方法和RPC.Server构造方法代码:

  1. private void initialize(Configuration

    conf) throws IOException {  

  2.     …  

  3.     this.serviceRpcServer = RPC.getServer(this,

    dnSocketAddr.getHostName(),   

  4.           dnSocketAddr.getPort(), serviceHandlerCount,  

  5.           false, conf, namesystem.getDelegationTokenSecretManager());  

  6.     this.serviceRpcServer.start(); //

    运行服务器  

  7. }  

  1. public Server(Object instance, Configuration conf, String bindAddress,  int port,  

  2.                   int numHandlers, boolean verbose,   

  3.                   SecretManager<? extends TokenIdentifier>

    secretManager)   

  4.         throws IOException {  

  5.       super(bindAddress, port, Invocation.class,

    numHandlers, conf,  

  6.           classNameBase(instance.getClass().getName()), secretManager);  

  7.       this.instance = instance;  

  8.       this.verbose = verbose;  

  9.     }  

该方法调用了父类的构造方法,如下:

  1. protected Server(String bindAddress, int port,   

  2.                   Class<? extends Writable> paramClass, inthandlerCount,   

  3.                   Configuration conf, String serverName, SecretManager<? extends TokenIdentifier>

    secretManager)   

  4.     throws IOException {  

  5.     this.bindAddress = bindAddress;  

  6.     this.conf = conf;  

  7.     this.port = port;  

  8.     this.paramClass = paramClass;  

  9.     this.handlerCount = handlerCount;  

  10.     this.socketSendBufferSize = 0;  

  11.     this.maxQueueSize = handlerCount * conf.getInt(  

  12.                                 IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,  

  13.                                 IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);  

  14.     this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,  

  15.                                    IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);  

  16.     this.readThreads = conf.getInt(  

  17.         IPC_SERVER_RPC_READ_THREADS_KEY,  

  18.         IPC_SERVER_RPC_READ_THREADS_DEFAULT);  

  19.     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);   

  20.     this.maxIdleTime =2*conf.getInt("ipc.client.connection.maxidletime", 1000);  

  21.     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);  

  22.     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);  

  23.     this.secretManager = (SecretManager<TokenIdentifier>)

    secretManager;  

  24.     this.authorize =   

  25.       conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);  

  26.     this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();  

  27.       

  28.     // Start the listener here and let it bind to the port  

  29.     listener = new Listener();  

  30.     this.port = listener.getAddress().getPort();      

  31.     this.rpcMetrics = RpcInstrumentation.create(serverName,this.port);  

  32.     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay",false);  

  33.  

  34.     // Create the responder here  

  35.     responder = new Responder();  

  36.       

  37.     if (isSecurityEnabled) {  

  38.       SaslRpcServer.init(conf);  

  39.     }  

  40.   }  

不难看出,父类的构造方法就初始化了一些配置和变量。

4、Server运行

在上面第一段代码中,还有一句RpcServer.start()的方法,在调用构造函数初始化一些变量之后,Server就可以正式运行起来了:

  1. public synchronized void start()

    {  

  2.     responder.start();  

  3.     listener.start();  

  4.     handlers = new Handler[handlerCount];  

  5.       

  6.     for (int i

    = 0; i < handlerCount; i++) {  

  7.       handlers[i] = new Handler(i);  

  8.       handlers[i].start();  

  9.     }  

  10.   }  

responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。

4、Server接受请求流程

1)监听到请求

Listener监听到请求,获得所有请求的SelectionKey,执行doAccept(key)方法,该方法将所有的连接对象放入list中,并将connection对象与key绑定,以供reader使用。初始化玩所有的conne对象之后,就可以激活Reader线程了。

  1. void doAccept(SelectionKey key) throws IOException, 

    OutOfMemoryError {  

  2.       Connection c = null;  

  3.       ServerSocketChannel server = (ServerSocketChannel) key.channel();  

  4.       SocketChannel channel;  

  5.       while ((channel = server.accept()) != null)

    {  

  6.         channel.configureBlocking(false);  

  7.         channel.socket().setTcpNoDelay(tcpNoDelay);  

  8.         Reader reader = getReader();  

  9.         try {  

  10.           reader.startAdd();  // 激活readSelector,设置adding为true  

  11.           SelectionKey readKey = reader.registerChannel(channel);  

  12.           c = new Connection(readKey, channel, System.currentTimeMillis());  

  13.           readKey.attach(c);  

  14.           synchronized (connectionList) {  

  15.             connectionList.add(numConnections, c);  

  16.             numConnections++;  

  17.           }  

  18.           …           

  19.         } finally {  

  20.           reader.finishAdd(); // add完毕,设置adding为false,Reader开始工作  

  21.         }  

  22.     }  

  23. }  

2)接收请求

Reader的run方法和Listener基本一致,也是获得所有的SelectionKey,再执行doRead(key)方法。该方法获得key中绑定的connection,并执行conection的readAndProcess()方法:

  1. void doRead(SelectionKey key) throws InterruptedException

    {  

  2.       int count = 0;  

  3.       Connection c = (Connection)key.attachment(); // 获得连接对象  

  4.       if (c == null)

    {  

  5.         return;    

  6.       }  

  7.       c.setLastContact(System.currentTimeMillis());  

  8.         

  9.       try {  

  10.         count = c.readAndProcess(); // 接受并处理请求  

  11.       } catch (InterruptedException ieo) {  

  12.         …  

  13.       }  

  14.       if (count < 0)

    {  

  15.         …  

  16.         closeConnection(c);  

  17.         c = null;  

  18.       }  

  19.       else {  

  20.         c.setLastContact(System.currentTimeMillis());  

  21.       }  

  22.     }  

  1. public int readAndProcess() throws IOException,

    InterruptedException {  

  2. // 一次最多读取一次RPC请求,如果头没读完,继续迭代直到  

  3. // 读完所有请求数据    

  4. while (true)

    {   

  5.         int count = -1;  

  6.         if (dataLengthBuffer.remaining() > 0)

    {  

  7.           count = channelRead(channel, dataLengthBuffer);         

  8.           …  

  9.         if (!rpcHeaderRead) {  

  10.           //读取请求头.  

  11.           if (rpcHeaderBuffer == null)

    {  

  12.             rpcHeaderBuffer = ByteBuffer.allocate(2);  

  13.           }  

  14.           count = channelRead(channel, rpcHeaderBuffer);  

  15.           if (count < 0 ||

    rpcHeaderBuffer.remaining() > 0) {  

  16.             return count;  

  17.           }  

  18.           // 读取请求版本号  

  19.           int version = rpcHeaderBuffer.get(0);  

  20.           byte[] method = new byte[]

    {rpcHeaderBuffer.get(1)};  

  21.           authMethod = AuthMethod.read(new DataInputStream(  

  22.               new ByteArrayInputStream(method)));  

  23.           dataLengthBuffer.flip();            

  24.           …  

  25.           dataLengthBuffer.clear();  

  26.           …  

  27.             

  28.           rpcHeaderBuffer = null;  

  29.           rpcHeaderRead = true;  

  30.           continue;  

  31.         }   

  32.         …  

  33.           data = ByteBuffer.allocate(dataLength);  

  34.         }  

  35.           

  36.         // 读取请求  

  37.         count = channelRead(channel, data);  

  38.           

  39.         if (data.remaining() == 0)

    {  

  40.           …  

  41.           if (useSasl) {  

  42.             saslReadAndProcess(data.array());  

  43.           } else {  

  44.             // 执行RPC请求,先解析header请求,下次循环解析param请求  

  45.             processOneRpc(data.array());  

  46.           }  

  47.           …  

  48.         }   

  49.         return count;  

  50.       }  

  51.     }  

3)获得call请求

在Connection中解析param请求中,解析了请求数据,并构造Call对象,将其加入callQueue。

  1. private void processData(byte[]

    buf) throws  IOException, InterruptedException {  

  2.       DataInputStream dis =  

  3.         new DataInputStream(new ByteArrayInputStream(buf));  

  4.       int id = dis.readInt();         //

    读取请求id  

  5.         …  

  6.  

  7.       Writable param = ReflectionUtils.newInstance(paramClass, conf);// 获取参数,paramClass是参数的实体类,在构造Server对象的时候传入 

  8.       param.readFields(dis);          

  9.           

  10.       Call call = new Call(id, param, this);  

  11.       callQueue.put(call);              // 添加进阻塞队列,不过队列有max限制,有可能也会阻塞  

  12.       incRpcCount();   

  13.     }  

4)处理call对象

Connection给callQueue添加了call对象,阻塞的Handler可以继续运行了,拿出一个call对象,并调用RPC.Call方法

  1. // 关键代码  

  2. while (running) {  

  3. final Call call = callQueue.take(); //

    弹出call对象  

  4. CurCall.set(call);  

  5.      value = call(call.connection.protocol, call.param,   

  6.                            call.timestamp); // 调用RPC.Server中的call  

  7.      CurCall.set(null);  

  8.  

  9. synchronized (call.connection.responseQueue) {  

  10.          setupResponse(buf, call,   

  11.                         (error == null) ? Status.SUCCESS

    : Status.ERROR,   

  12.                         value, errorClass, error);  

  13.          …  

  14.          responder.doRespond(call);  

  15.      }  

  16. }  

5)响应请求

上面代码中的setupResponse将call的id和状态发送回去,再设置了call中的response:ByteBuffer,之后就开始responder.doRespond(call)了,processResponse以及Responder.run()没太弄明白,就先不说了。

  1. void doRespond(Call call) throws IOException

    {  

  2.       synchronized (call.connection.responseQueue)

    {  

  3.         // 这行没懂  

  4.         call.connection.responseQueue.addLast(call);  

  5.         if (call.connection.responseQueue.size()

    == 1) {  

  6.           // 返回响应结果,并激活writeSelector  

  7.           processResponse(call.connection.responseQueue,true);  

  8.         }  

  9.       }  

  10.     }  

6、总结

Server用的标准的Java TCP/IP NIO通信,同时请求的超时使用基于BlockingQueue以及wait/notify机制实现。使用的模式是reactor模式,关于nio和reactor可以参考这个博客。

对于服务器端接收多个连接请求的需求,Server采用Listener来监听连接的事件,并用Listener.Reader来监听网络流读以及Responder监听写的事件,当有实际的网络流读写时间发生之后,解析了请求Call之后,添加进阻塞队列,并交由多个Handlers来处理请求。

这个方法比TCP/IP BIO好处就是可接受很多的连接,而这些连接只在真实的请求时才会创建线程处理,称之为一请求一处理。但是,连接上的请求发送非常频繁时,TCP/IP NIO的方法并不会带来太大的优势。

但是Hadoop实际场景中,通常是服务器端支持大量的连接数(Namenode连上几千个Datanode),但是连接发送的请求并不会太多(heartbeat、blockreport都有较长间隔)。这样就造成了Hadoop不适合实时的、多请求的运算,带来的代价是模型、实现简单,但是这也为以后的扩展埋下了祸根。

P.S.: 以上分析基于稳定版0.20.203.0rc1。

以上是 Hadoop源码 – ipc.Server - java20130722 的全部内容, 来源链接: utcz.com/z/391422.html

回到顶部