node.js的http.createServer过程深入解析

下面是nodejs创建一个服务器的代码。接下来我们一起分析这个过程。

var http = require('http');

http.createServer(function (request, response) {

response.end('Hello World

');

}).listen(9297);

首先我们去到lib/http.js模块看一下这个函数的代码。

function createServer(requestListener) {

return new Server(requestListener);

}

只是对_http_server.js做了些封装。我们继续往下看。

function Server(requestListener) {

if (!(this instanceof Server)) return new Server(requestListener);

net.Server.call(this, { allowHalfOpen: true });

// 收到http请求时执行的回调

if (requestListener) {

this.on('request', requestListener);

}

this.httpAllowHalfOpen = false;

// 建立tcp连接的回调

this.on('connection', connectionListener);

this.timeout = 2 * 60 * 1000;

this.keepAliveTimeout = 5000;

this._pendingResponseData = 0;

this.maxHeadersCount = null;

}

util.inherits(Server, net.Server);

发现_http_server.js也没有太多逻辑,继续看lib/net.js下的代码。

function Server(options, connectionListener) {

if (!(this instanceof Server))

return new Server(options, connectionListener);

EventEmitter.call(this);

// connectionListener在http.js处理过了

if (typeof options === 'function') {

connectionListener = options;

options = {};

this.on('connection', connectionListener);

} else if (options == null || typeof options === 'object') {

options = options || {};

if (typeof connectionListener === 'function') {

this.on('connection', connectionListener);

}

} else {

throw new errors.TypeError('ERR_INVALID_ARG_TYPE',

'options',

'Object',

options);

}

this._connections = 0;

......

this[async_id_symbol] = -1;

this._handle = null;

this._usingWorkers = false;

this._workers = [];

this._unref = false;

this.allowHalfOpen = options.allowHalfOpen || false;

this.pauseOnConnect = !!options.pauseOnConnect;

}

至此http.createServer就执行结束了,我们发现这个过程还没有涉及到很多逻辑,并且还是停留到js层面。接下来我们继续分析listen函数的过程。该函数是net模块提供的。我们只看关键的代码。

Server.prototype.listen = function(...args) {

// 处理入参,根据文档我们知道listen可以接收好几个参数,我们这里是只传了端口号9297

var normalized = normalizeArgs(args);

// normalized = [{port: 9297}, null];

var options = normalized[0];

var cb = normalized[1];

// 第一次listen的时候会创建,如果非空说明已经listen过

if (this._handle) {

throw new errors.Error('ERR_SERVER_ALREADY_LISTEN');

}

......

listenInCluster(this, null, options.port | 0, 4,

backlog, undefined, options.exclusive);

}

function listenInCluster() {

...

server._listen2(address, port, addressType, backlog, fd);

}

_listen2 = setupListenHandle = function() {

......

this._handle = createServerHandle(...);

this._handle.listen(backlog || 511);

}

function createServerHandle() {

handle = new TCP(TCPConstants.SERVER);

handle.bind(address, port);

}

到这我们终于看到了tcp连接的内容,每一个服务器新建一个handle并且保存他,该handle是一个TCP对象。然后执行bind和listen函数。接下来我们就看一下TCP类的代码。TCP是C++提供的类。对应的文件是tcp_wrap.cc。我们看看new TCP的时候发生了什么。

void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {

// This constructor should not be exposed to public javascript.

// Therefore we assert that we are not trying to call this as a

// normal function.

CHECK(args.IsConstructCall());

CHECK(args[0]->IsInt32());

Environment* env = Environment::GetCurrent(args);

int type_value = args[0].As<Int32>()->Value();

TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);

ProviderType provider;

switch (type) {

case SOCKET:

provider = PROVIDER_TCPWRAP;

break;

case SERVER:

provider = PROVIDER_TCPSERVERWRAP;

break;

default:

UNREACHABLE();

}

new TCPWrap(env, args.This(), provider);

}

TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)

: ConnectionWrap(env, object, provider) {

int r = uv_tcp_init(env->event_loop(), &handle_);

CHECK_EQ(r, 0);

}

我们看到,new TCP的时候其实是执行libuv的uv_tcp_init函数,初始化一个uv_tcp_t的结构体。首先我们先看一下uv_tcp_t结构体的结构。

uv_tcp_t

uv_tcp_t

// 初始化一个tcp流的结构体

int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {

// 未指定未指定协议

return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);

}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {

int domain;

/* Use the lower 8 bits for the domain */

// 低八位是domain

domain = flags & 0xFF;

if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)

return UV_EINVAL;

// 除了第八位的其他位是flags

if (flags & ~0xFF)

return UV_EINVAL;

uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

/* If anything fails beyond this point we need to remove the handle from

* the handle queue, since it was added by uv__handle_init in uv_stream_init.

*/

if (domain != AF_UNSPEC) {

int err = maybe_new_socket(tcp, domain, 0);

if (err) {

// 出错则把该handle移除loop队列

QUEUE_REMOVE(&tcp->handle_queue);

return err;

}

}

return 0;

}

我们接着看uv__stream_init做了什么事情。

void uv__stream_init(uv_loop_t* loop,

uv_stream_t* stream,

uv_handle_type type) {

int err;

uv__handle_init(loop, (uv_handle_t*)stream, type);

stream->read_cb = NULL;

stream->alloc_cb = NULL;

stream->close_cb = NULL;

stream->connection_cb = NULL;

stream->connect_req = NULL;

stream->shutdown_req = NULL;

stream->accepted_fd = -1;

stream->queued_fds = NULL;

stream->delayed_error = 0;

QUEUE_INIT(&stream->write_queue);

QUEUE_INIT(&stream->write_completed_queue);

stream->write_queue_size = 0;

if (loop->emfile_fd == -1) {

err = uv__open_cloexec("/dev/null", O_RDONLY);

if (err < 0)

/* In the rare case that "/dev/null" isn't mounted open "/"

* instead.

*/

err = uv__open_cloexec("/", O_RDONLY);

if (err >= 0)

loop->emfile_fd = err;

}

#if defined(__APPLE__)

stream->select = NULL;

#endif /* defined(__APPLE_) */

// 初始化io观察者

uv__io_init(&stream->io_watcher, uv__stream_io, -1);

}

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {

assert(cb != NULL);

assert(fd >= -1);

// 初始化队列,回调,需要监听的fd

QUEUE_INIT(&w->pending_queue);

QUEUE_INIT(&w->watcher_queue);

w->cb = cb;

w->fd = fd;

w->events = 0;

w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)

w->rcount = 0;

w->wcount = 0;

#endif /* defined(UV_HAVE_KQUEUE) */

}

从代码可以知道,只是对uv_tcp_t结构体做了一些初始化操作。到这,new TCP的逻辑就执行完毕了。接下来就是继续分类nodejs里调用bind和listen的逻辑。nodejs的bind对应libuv的函数是uv__tcp_bind,listen对应的是uv_tcp_listen。

先看一个bind的核心代码。

/* Cannot set IPv6-only mode on non-IPv6 socket. */

if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)

return UV_EINVAL;

// 获取一个socket并且设置某些标记

err = maybe_new_socket(tcp, addr->sa_family, 0);

if (err)

return err;

on = 1;

// 设置在端口可重用

if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))

return UV__ERR(errno);

bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE

static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {

struct sockaddr_storage saddr;

socklen_t slen;

if (domain == AF_UNSPEC) {

handle->flags |= flags;

return 0;

}

return new_socket(handle, domain, flags);

}

static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {

struct sockaddr_storage saddr;

socklen_t slen;

int sockfd;

int err;

// 获取一个socket

err = uv__socket(domain, SOCK_STREAM, 0);

if (err < 0)

return err;

sockfd = err;

// 设置选项和保存socket的文件描述符到io观察者中

err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);

if (err) {

uv__close(sockfd);

return err;

}

...

return 0;

}

int uv__stream_open(uv_stream_t* stream, int fd, int flags) {

if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))

return UV_EBUSY;

assert(fd >= 0);

stream->flags |= flags;

if (stream->type == UV_TCP) {

if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))

return UV__ERR(errno);

/* TODO Use delay the user passed in. */

if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&

uv__tcp_keepalive(fd, 1, 60)) {

return UV__ERR(errno);

}

}

...

// 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符

stream->io_watcher.fd = fd;

return 0;

}

上面的一系列操作主要是新建一个socket文件描述符,设置一些flag,然后把文件描述符保存到IO观察者中,libuv在poll IO阶段会监听该文件描述符,如果有事件到来,会执行设置的回调函数,该函数是在uv__stream_init里设置的uv__stream_io。最后执行bind函数进行绑定操作。最后我们来分析一下listen函数。首先看下tcp_wrapper.cc的代码。

void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {

TCPWrap* wrap;

ASSIGN_OR_RETURN_UNWRAP(&wrap,

args.Holder(),

args.GetReturnValue().Set(UV_EBADF));

int backlog = args[0]->Int32Value();

int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),

backlog,

OnConnection);

args.GetReturnValue().Set(err);

}

代码中有个很重要的地方就是OnConnection函数,nodejs给listen函数设置了一个回调函数OnConnection,该函数在IO观察者里保存的文件描述符有连接到来时会被调用。OnConnection函数是在connection_wrap.cc定义的,tcp_wrapper继承了connection_wrap。下面我们先看一下uv_listen。该函数调用了uv_tcp_listen。该函数的核心代码如下。

if (listen(tcp->io_watcher.fd, backlog))

return UV__ERR(errno);

// cb即OnConnection

tcp->connection_cb = cb;

tcp->flags |= UV_HANDLE_BOUND;

// 有连接到来时的libuv层回调,覆盖了uv_stream_init时设置的值

tcp->io_watcher.cb = uv__server_io;

// 注册事件

uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

在libuv的poll IO阶段,epoll_wait会监听到到来的连接,然后调用uv__server_io。下面是该函数的核心代码。

// 继续注册事件,等待连接

uv__io_start(stream->loop, &stream->io_watcher, POLLIN);

err = uv__accept(uv__stream_fd(stream));

// 保存连接对应的socket

stream->accepted_fd = err;

// 执行nodejs层回调

stream->connection_cb(stream, 0);

libuv会摘下一个连接,得到对应的socket。然后执行nodejs层的回调,这时候我们来看一下OnConnection的代码。

OnConnection(uv_stream_t* handle,int status)

if (status == 0) {

// 新建一个uv_tcp_t结构体

Local<Object> client_obj = WrapType::Instantiate(env, wrap_data, WrapType::SOCKET);

WrapType* wrap;

ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj);

uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_);

// uv_accept返回0表示成功

if (uv_accept(handle, client_handle))

return;

argv[1] = client_obj;

}

// 执行上层的回调,该回调是net.js设置的onconnection

wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);

OnConnection新建了一个uv_tcp_t结构体。代表这个连接。然后调用uv_accept。

int uv_accept(uv_stream_t* server, uv_stream_t* client) {

...

// 新建的uv_tcp_t结构体关联accept_fd,注册读写事件

uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);

...

}

最后执行nodejs的回调。

function onconnection(err, clientHandle) {

var handle = this;

var self = handle.owner;

if (err) {

self.emit('error', errnoException(err, 'accept'));

return;

}

if (self.maxConnections && self._connections >= self.maxConnections) {

clientHandle.close();

return;

}

var socket = new Socket({

handle: clientHandle,

allowHalfOpen: self.allowHalfOpen,

pauseOnCreate: self.pauseOnConnect

});

socket.readable = socket.writable = true;

self._connections++;

socket.server = self;

socket._server = self;

DTRACE_NET_SERVER_CONNECTION(socket);

LTTNG_NET_SERVER_CONNECTION(socket);

COUNTER_NET_SERVER_CONNECTION(socket);

// 触发_http_server.js里设置的connectionListener回调

self.emit('connection', socket);

}

listen函数总体的逻辑就是把socket设置为可监听,然后注册事件,等待连接的到来,连接到来的时候,调用accept获取新建立的连接,tcp_wrapper.cc的回调新建一个uv_tcp_t结构体,代表新的连接,然后设置可读写事件,并且设置回调为uv__stream_io,等待数据的到来。最后执行_http_server.js设置的回调connectionListener。至此,服务器启动并且接收连接的过程就完成了。接下来就是对用户数据的读写。当用户传来数据时,处理数据的函数是uv__stream_io。后面继续解析数据的读写。

总结

以上是 node.js的http.createServer过程深入解析 的全部内容, 来源链接: utcz.com/z/318576.html

回到顶部