I/O多路复用

编程

Linux下实现I/O复用的系统调用方式主要:select、poll、epoll。

I/O多路复用

Linux下实现I/O复用的系统调用方式主要:select、poll、epoll。

select

系统调用

select系统调用可在一段指定时间内,监听文件描述符上的可读、可写和异常等事件,判断发生的事件需要轮询。

#include <sys/select.h>

//select监听文件描述符事件

//nfds: 被监听文件描述符中最大值+1

//readfds: 可读事件对应的文件描述符集,对应位置1;会被内核修改,返回时无事件的置0。

//writefds: 可写事件对应的文件描述符集,对应位置1;会被内核修改,返回时无事件的置0。

//exceptfds:异常事件对应的文件描述符集,对应位置1;会被内核修改,返回时无事件的置0。

//timeout

//return: 返回就绪文件描述符中最大值+1

int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);

//fd_set比特向量操作

FD_ZERO(fd_set *fdset); //清楚fdset中所有比特位

FD_SET(int fd, fd_set *fdset); //设置fdset中比特位fd

FD_CLR(int fd, fd_set *fdset); //清除fdset中比特位fd

FD_ISSET(int fd, fd_set *fdset); //测试fdset中比特位fd是否被设置,用于判断是否有事件发生

select事件类型

  • socket可读

    select可读(readfds)

    • socket内核接收缓存区字节数大于或等于低水位标记SO_RCVLOWAT。读操作返回读取字节数。
    • 读关闭:socket通信的对方关闭连接。读操作返回0。
    • 错误:socket上有未处理的错误。可以使用getsockopt读取和清楚错误。
    • 连接请求:监听socket上有新的连接请求。

  • socket可写(writefds)

    • 可写:socket内核发送缓存区中的可用字节数大于或等于低水位标记SO_SNDLOWAT。写操作返回实际写字节数。
    • 写关闭:写操作被关闭,对写关闭的socket执行写操作将触发一个SIGPIPE信号。
    • 错误:socket上有未处理的错误。可以使用getsockopt读取和清楚错误。
    • 连接结果:socket使用非阻塞的connect连接成功或者失败(超时)之后。

  • socket异常(exceptfds)

    • 网络程序中,select能处理的异常情况只有一种,即socket上接收到带外数据。

poll

系统调用

poll系统调用与select类似,需要轮询判断监听文件描述符集上的事件,但没有监听文件描述符数量限制。

#include <poll.h>

//poll监听文件描述符上指定的事件

//fds: pollfd结构类型的数组

//nfds: pollfd数组长度

//return: 返回就绪文件描述符中最大值+1

int poll(struct pollfd* fds, nfds_t nfds, int timeout);

//pollfd结构体,描述文件描述符上的可读、可写、异常等事件

struct pollfd{

int fd; //文件描述符

short events; //注册的事件,一系列事件的按位或

short revents; //实际发生的事件,由内核填充,一些列事件的按位或

}

poll事件类型

事件

描述

是否可作为输入

是否可作为输出

POLLIN

普通或优先数据可读

POLLRDNORM

普通数据可读

POLLRDBAND

优先级带数据可读(linux不支持)

POLLPRI

高优先级数据可读,如TCP带外数据

POLLOUT

普通或优先数据可写

POLLWRNORM

普通数据可写

POLLWRBAND

优先级带数据可写

POLLRDHUB

TCP连接被对方关闭,或者对方关闭了写操作。由GNU引入

POLLERR

错误

POLLHUB

挂起。比如管道的写端被关闭,读端描述符上将收到POLLHUB事件

POLLNVAL

文件描述符没有打开

epoll

epoll相比select更加高效,主要体现在:

  • epoll使用内核事件表维护监听的文件描述符集,避免频繁的用户空间与内核空间频繁的拷贝开销。
  • epoll直接返回发生事件的文件描述符即,避免了用户程序轮询的开销。
  • epoll内部通过注册回调函数的方式,监听特定文件描述符上的时间,避免了内核轮询开销。
  • epoll提供了高效的边沿触发模式,边沿触发带来编程上的复杂性。

LT与ET模式

使用epoll监听文件描述符时,有两种事件触发模式:

  • 水平触发(LT):epoll默认使用水平触发。
  • 边沿触发(ET):往epoll内核事件表这种注册一个文件描述符上的EPOLLET时,将进行边沿触发。相比较水平触发,边沿触发更加高效。

系统调用

epoll在内核中使用事件表(红黑树)维护监听的文件描述符,并支持对事件表的增删改。

  • 创建事件表:epoll_create,返回一个内核事件表的文件描述符。
  • 操作事件表:epoll_ctl,对事件表进行增、删、改。
  • 监听事件表:epoll_wait,监听事件表上的事件。

#include <sys/epoll.h>

//epoll事件结构体

strcut epoll_event{

_uint32_t events; //epoll事件

epoll_data_t data; //用户数据,包含监听的文件描述符信息

}

//用户数据联合体

typedef union epoll_data{

void* ptr; //指定与fd相关的用户数据,用户数据包含监听的文件描述符

int fd; //监听的文件描述符fd(常用)

uint32_t u32;

uint64_t u64;

} epoll_data_t;

//创建epoll内核事件表

//size: 提示内核需要的事件表多大

int epoll_create(int size);

//操作epoll内核事件表

//epfd: epoll内核事件表

//op: 参数指定操作,包括增(EPOLL_CTL_ADD)、删(EPOLL_CTL_DEL)、改(EPOLL_CTL_MOD)

//fd: 被操作的文件描述符

//event:事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

//监听事件表上文件描述符的事件

//epfd: epoll内核事件表

//events: 就绪事件数组,由内核修改

//maxevents:最多监听事件数

//timeout:

//return: 就绪文件描述符个数

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

事件类型

epoll支持的时间类型和poll基本相同,表示epoll时间类型的宏实在poll对应的宏前加上‘E’,如EPOLLIN。

示例

服务端

服务端代码功能:

  1. 创建socket
  2. 为socket绑定固定地址和端口
  3. 监听套接字等待客户端连接
  4. 接收客户端连接,获取连接socket
  5. 使用select、poll或epoll监听连接socket的可读事件
  6. 连接socket可读事件发生时,从连接socket中读取数据

select

#include <sys/types.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <assert.h>

#include <stdio.h>

#include <unistd.h>

#include <errno.h>

#include <string.h>

#include <fcntl.h>

#include <stdlib.h>

int main(int argc, char *argv[])

{

if (argc <= 2)

{

printf("usage: %s ip_address port_number

", basename(argv[0]));

return 1;

}

const char *ip = argv[1];

int port = atoi(argv[2]);

printf("ip is %s and port is %d

", ip, port);

int ret = 0;

struct sockaddr_in address;

bzero(&address, sizeof(address));

address.sin_family = AF_INET;

inet_pton(AF_INET, ip, &address.sin_addr);

address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);

assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));

assert(ret != -1);

ret = listen(listenfd, 5);

assert(ret != -1);

struct sockaddr_in client_address;

socklen_t client_addrlength = sizeof(client_address);

int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);

if (connfd < 0)

{

printf("errno is: %d

", errno);

close(listenfd);

}

char remote_addr[INET_ADDRSTRLEN];

printf("connected with ip: %s and port: %d

", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));

char buf[1024];

fd_set read_fds;

fd_set exception_fds;

FD_ZERO(&read_fds);

FD_ZERO(&exception_fds);

int nReuseAddr = 1;

setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));

while (1)

{

memset(buf, "", sizeof(buf));

FD_SET(connfd, &read_fds);

FD_SET(connfd, &exception_fds);

ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);

printf("select one

");

if (ret < 0)

{

printf("selection failure

");

break;

}

if (FD_ISSET(connfd, &read_fds))

{

ret = recv(connfd, buf, sizeof(buf) - 1, 0);

if (ret <= 0)

{

printf("can not read

");

break;

}

printf("get %d bytes of normal data: %s

", ret, buf);

}

else if (FD_ISSET(connfd, &exception_fds))

{

ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);

if (ret <= 0)

{

printf("can not read

");

break;

}

printf("get %d bytes of oob data: %s

", ret, buf);

}

}

close(connfd);

close(listenfd);

return 0;

}

poll

#include <sys/types.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <assert.h>

#include <stdio.h>

#include <unistd.h>

#include <errno.h>

#include <string.h>

#include <fcntl.h>

#include <stdlib.h>

#include <poll.h>

int main(int argc, char *argv[])

{

if (argc <= 2)

{

printf("usage: %s ip_address port_number

", basename(argv[0]));

return 1;

}

const char *ip = argv[1];

int port = atoi(argv[2]);

printf("ip is %s and port is %d

", ip, port);

int ret = 0;

struct sockaddr_in address;

bzero(&address, sizeof(address));

address.sin_family = AF_INET;

inet_pton(AF_INET, ip, &address.sin_addr);

address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);

assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));

assert(ret != -1);

ret = listen(listenfd, 5);

assert(ret != -1);

struct sockaddr_in client_address;

socklen_t client_addrlength = sizeof(client_address);

int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);

if (connfd < 0)

{

printf("errno is: %d

", errno);

close(listenfd);

}

char remote_addr[INET_ADDRSTRLEN];

printf("connected with ip: %s and port: %d

", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));

char buf[1024];

pollfd poll_fds[1];

bzero(poll_fds, sizeof(poll_fds));

poll_fds[0].fd = connfd;

poll_fds[0].events = POLLIN;

int nReuseAddr = 1;

setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));

while (1)

{

memset(buf, "", sizeof(buf));

ret = poll(poll_fds, 1, -1);

printf("select one

");

if (ret < 0)

{

printf("selection failure

");

break;

}

if (poll_fds[0].events == POLLIN)

{

ret = recv(connfd, buf, sizeof(buf) - 1, 0);

if (ret <= 0)

{

printf("can not read

");

break;

}

printf("get %d bytes of normal data: %s

", ret, buf);

}

}

close(connfd);

close(listenfd);

return 0;

}

epoll

#include <sys/types.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <assert.h>

#include <stdio.h>

#include <unistd.h>

#include <errno.h>

#include <string.h>

#include <fcntl.h>

#include <stdlib.h>

#include <sys/epoll.h>

int main(int argc, char *argv[])

{

if (argc <= 2)

{

printf("usage: %s ip_address port_number

", basename(argv[0]));

return 1;

}

const char *ip = argv[1];

int port = atoi(argv[2]);

printf("ip is %s and port is %d

", ip, port);

int ret = 0;

struct sockaddr_in address;

bzero(&address, sizeof(address));

address.sin_family = AF_INET;

inet_pton(AF_INET, ip, &address.sin_addr);

address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);

assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));

assert(ret != -1);

ret = listen(listenfd, 5);

assert(ret != -1);

struct sockaddr_in client_address;

socklen_t client_addrlength = sizeof(client_address);

int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);

if (connfd < 0)

{

printf("errno is: %d

", errno);

close(listenfd);

}

char remote_addr[INET_ADDRSTRLEN];

printf("connected with ip: %s and port: %d

", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));

char buf[1024];

//创建内核事件表

int epfd = epoll_create(10);

//注册事件

epoll_event event;

event.data.fd = connfd;

event.events = EPOLLIN;

epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event);

//就绪事件数组

epoll_event events[10];

int nReuseAddr = 1;

setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));

while (1)

{

memset(buf, "", sizeof(buf));

ret = epoll_wait(epfd, events, 1, -1);

printf("select one

");

if (ret < 0)

{

printf("errno: %d

", errno);

printf("selection failure

");

break;

}

if (events[0].events == EPOLLIN)

{

ret = recv(connfd, buf, sizeof(buf) - 1, 0);

if (ret <= 0)

{

printf("can not read

");

break;

}

printf("get %d bytes of normal data: %s

", ret, buf);

}

}

close(connfd);

close(listenfd);

return 0;

}

客户端

客户端代码功能:

  1. 创建套接字
  2. 请求连接
  3. 发送数据
  4. 关闭连接。

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <assert.h>

#include <stdio.h>

#include <unistd.h>

#include <string.h>

#include <stdlib.h>

int main(int argc, char *argv[])

{

if (argc <= 2)

{

printf("usage: %s ip_address port_number

", basename(argv[0]));

return 1;

}

const char *ip = argv[1];

int port = atoi(argv[2]);

struct sockaddr_in server_address;

bzero(&server_address, sizeof(server_address));

server_address.sin_family = AF_INET;

inet_pton(AF_INET, ip, &server_address.sin_addr);

server_address.sin_port = htons(port);

int sockfd = socket(PF_INET, SOCK_STREAM, 0);

assert(sockfd >= 0);

if (connect(sockfd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0)

{

printf("connection failed

");

}

else

{

printf("send oob data out

");

const char *oob_data = "abc";

const char *normal_data = "123";

send(sockfd, normal_data, strlen(normal_data), 0);

send(sockfd, oob_data, strlen(oob_data), MSG_OOB);

send(sockfd, normal_data, strlen(normal_data), 0);

}

close(sockfd);

return 0;

}

以上是 I/O多路复用 的全部内容, 来源链接: utcz.com/z/520422.html

回到顶部