linux下epoll和select的区别总结

前言

最近一直在忙面试笔试的问题, 好久没有写博客了, 以后会慢慢的将一些算法题解题思路也放到博客上来, 避免做完而不思考总结的情况出现. 今天总结的是epoll和select, 这是linux下两个非常重要的IO多路复用的函数, 在面试中也非常容易遇到, 让我们来深入了解一下.

select()函数

函数原型是如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)

  • nfds表示要监测的文件描述符的范围, 监测范围在0到最大文件描述符之间, 因此此值为最大描述符+1.

  • readfds表示包含状态变成可读而触发select返回的文件描述符集合

  • writefds表示包含状态变成可写而触发select返回的文件描述符集合

  • exceptfds表示包含状态变成发生特殊异常而触发select返回的文件描述符集合

  • timeout指定超时时间, 原型为:

    struct timeval{
    long tv_sec;//seconds
    long tv_usec;//microseconds
    };

  • 返回值: 准备就绪的描述符数目, 若超时返回0, 若出错返回-1.

说明一下最后一个参数, 它指定了愿意等待的时间长度, 单位为秒和微秒. 有以下三种情况:

  1. timeout == NULL
    这时会永远等待. 如果捕捉到一个信号则中断此无限期等待. 当所指定的描述符中的一个准备好或者捕捉到一个信号则返回. 如果捕捉到一个信号, 则select返回-1, errno设置为EINTR.
  2. timeout->tv_sec == 0 && timeout->tv_usec == 0
    根本不等待. 测试所有指定的描述符并立即返回. 这是轮询系统找到多个描述符状态而不阻塞select函数的方法.
  3. timeout->tv_sec != 0 && timeout->tv_usec != 0
    等待指定的秒数和微秒数. 当指定的描述符之一已经准备好, 或当指定的时间值已经超过就立刻返回.如果在超时到期时还没有一个描述符准备好, 则返回值是0.(如果系统不提供微秒级的精度, 则timeout->tv_usec值取最近的支持值). 与第一种情况一样, 这种等待可以被捕捉到的信号中断.
    中间三个参数readfds, writefds和exceptfds是指向描述符集的指针. 这3个描述符集说明了我们关心的可读,可写或处于异常状态的描述符集合.每个描述符集储存在一个fd_set数据类型中. 这个数据类型是由实现选择的, 它可以为每一个可能的描述符保持一位. 我们可以认为它只是一个很大的数组.

下面是一段伪代码, 演示了select的用法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sockfd;
fd_set rfds;//定义fd_set集合来保存要监听的Socket
struct timeval tv;//定义一个时间变量来保存超时时间
FD_ZERO(&rfds);//调用select函数之前先把集合清零

...//创建套接字, 绑定, 监听, 等待连接

FD_SET(sockfd, &rfds);//把要监听的文件描述符加入到集合中

tv.tv_sec = 1;
tv.tv_usec = 0;//设置select等待的最大时间为1s

int res = select(sockfd + 1, &rfds, NULL, NULL, &tv);

if(ret < 0)//说明出错, 例如所有指定的描述符都没准备好的情况下捕捉到一个信号
{
perror("select error");
}
else if(ret == 0)//超时返回
{
cout<<"timeout"<<endl;
}
else//正常返回, 是三个描述符集中已准备好的描述符数之和
{
cout<<"ret = "<<ret<<endl;//这里肯定返回是1
}

if(FD_ISSET(sockfd, &rfds))//判断描述符是否真的可读, 第一个参数就是我们要判断的描述符
{
read(...);//读取sockfd文件描述符的数据
}

select有以下几个缺点:

  1. 监听的描述符存在上限, 大部分情况下最多1024, 虽然也可以通过修改头文件之后再重新编译内核来改变这个数字, 但是这个治标不治本.
  2. 每次调用select会将fd_set从用户空间拷贝到内核空间,当有描述符准备好了之后又会将fd_set从内核空间拷贝到用户空间, 如果我们监听的描述符非常多, 这显然是一种对CPU资源的浪费.
  3. 线性扫描所有fd_set, 判断有没有准备好. 如果没有, 将会重复循环, 不断轮询, 效率随着监听描述符的数量增加而下降.

epoll()函数

epoll支持管道, FIFO, 套接字和POSIX消息队列, 终端, 设备等等, 但是不支持普通文件. 操作函数有:

int epoll_create(int size) / int epoll_create1(int flags)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
int epoll_wait(int epfd, struct epoll_event* event, int maxevents, int timeout)

创建epoll监听池

int epoll_create(int size);
创建一个epoll监听池, 返回该池的描述符, size用来告诉内核监听的描述符的数目一共多大. 这个参数与select()中的第一个参数不一样, 这里没有加1在里面. 但是在2.6.8之后的linux内核中这个参数已经无效, 内核不再使用这个参数作为监听总数的设计值.

创建好epoll监听池之后, 该函数返回epoll的描述符. 所以和文件描述符一样, 使用完之后要记得close另外还有一个创建的函数

int epoll_create1(int flags);

当flag为0时, 这个函数和上面的函数作用一样. 如果我们使用epoll_create1(EPOLLCLOEXEC)表示返回的epoll的fd具有执行后关闭的特性.

添加要监听的事件到epoll池中或从池中删除, 修改事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

epfd表示epoll池的fd, op表示操作, 可以是EPOLL_CTL_ADD(增加监听事件), 或EPOLL_CTL_MOD(修改监听事件), 或者EPOLL_CTL_DEL(删除监听事件)
fd: 监听的文件描述符
event: 监听的事件
epoll_event结构体原型为:

1
2
3
4
5
struct epoll_event
{
uint32_t events;//epoll events
epoll_data_t data;//user data variable
};

events的取值可以是(可以是下面取值的集合, 即通过位或者合并)
EPOLLIN: 触发该事件, 表示对应的文件描述符上有可读数据.
EPOLLOUT: 触发该事件, 表示对应的文件描述符上可以写数据
EPOLLPRI: 表示对应的文件描述符上有紧急的数据可读
EPOLLERR: 表示对应的文件描述符发生错误
EPOLLHUP: 表示对应的文件描述符被挂断
EPOLLET: 将epoll设置为边缘触发, 这是相对于水平触发而言的, 后面再说
EPOLLONESHOT: 只监听一次事件, 当监听完这次事件后, 如果还需要监听这个Socket的话, 就需要把这个socket加入回epoll的队列中去.

epoll_data_t是一个联合体, 原型为:

1
2
3
4
5
6
7
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

设置操作举例:

1
2
3
4
5
6
7
struct epoll_event ev;
//设置与要处理事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN | EPOLLET;
//增加epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
等待监听事件的发生

int epoll_wait(int epfd, struct epoll_event *event, int maxevents, int timeout);

epfd: epoll池的fd.
events: 从内核得到的触发事件的集合, 调用者可根据这个参数得到被触发的事件.
maxevents: 告诉内核events有多少个.
timeout: 等待超时时间, 毫秒单位, 设置为0表示不等待立即返回, -1则是永久等待.
返回值: 函数执行成功返回需要处理的事件的数目, 被触发的事件存在events数组中, 超时返回0, 失败返回-1;
注意, epoll_wait()等待注册的fd的事件发生, 若发生了则将fd和相应事件放到参数events数组里面, 并将epfd中的事件类型清空, 注意只是清空事件类型, 并不是删除fd, 所以如果我们下次还要监听这个事件, 需要用epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev)重新设置fd的事件类型. 注意, 这次我们使用的是EPOLL_CTL_MOD, 而不是EPOLL_CTL_ADD了.

下面是epoll的一个使用例子, 创建两个管道文件并且打开, 通过epoll机制去监测这两个管道是否有数据可读, 根据epoll的原理, 只要有一个管道可以读写, 那么epoll就会返回. 值得注意的一点是, 打开管道的时候, 读端以只读方式打开, 写端以只写方式打开, 任意一端打开的时候要求对方必须存在, 否则会阻塞在打开操作中. 例如, 读端以O_RDONLY方式打开管道时若发现另一端没有另外进程以O_WRONLY打开, 那么就会堵塞在打开操作中, 直到有进程使用O_WRONLY方式打开.下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
int main(void)
{
int fifo_fd1, fifo_fd2;
int ep_fd;
struct epoll_event event;
struct epoll_event *ret_events = NULL;
int cnt;
int i;
char c;

mkfifo("./test1", 0666);
mkfifo("./test2", 0666);

fifo_fd1 = open("./test1", O_RDONLY);
fifo_fd2 = open("./test2", O_RDONLY);

/* 需要有进程以只写的方式打开fifo1、fifo2后才能执行于此 */
printf("监测fifo1和fifo2中...\n");

/* 创建epoll池 */
ep_fd = epoll_create1(0);

/* 将检测事件加入epoll池中 */
event.events = EPOLLIN | EPOLLET; /* 监测fifo1可读,且以边沿方式触发 */
event.data.fd = fifo_fd1;
epoll_ctl(ep_fd, EPOLL_CTL_ADD, fifo_fd1, &event);

event.events = EPOLLIN | EPOLLET; /* 监测fifo2可读,且以边沿方式触发 */
event.data.fd = fifo_fd2;
epoll_ctl(ep_fd, EPOLL_CTL_ADD, fifo_fd2, &event);

/* ret_events用于存放被触发的事件 */
ret_events = malloc(sizeof(struct epoll_event) * 100);

/* 阻塞等待监测事件触发 */
cnt = epoll_wait(ep_fd, ret_events, 100, -1);
printf("cnt = %d\n", cnt);

/* 判断监测事件 */
for (i = 0; i < cnt; i++)
{
if (ret_events[i].events & EPOLLIN)
{
read(ret_events[i].data.fd, &c, 1);
printf("fd = %d, recv data = %c\n", ret_events[i].data.fd, c);
}
}

free(ret_events);
close(ep_fd); /* 注意关闭epoll池的描述符 */
close(fifo_fd2);
close(fifo_fd1);

return 0;
}

下面是两个管道的进程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
int main(void)
{
int fd;
char c;

fd = open("./test1", O_WRONLY);

printf("w1: pls input char: \n");
scanf("%c", &c);

write(fd, &c, 1);

close(fd);

return 0;
}

fifo2的代码同上, 只是将打开的文件由test1改为test2.
大家可以测试一下, 我在这里就展示测试结果了, 这里的epoll使用很简单, 一旦读到之后就会返回.

epoll的工作原理
  1. 调用epoll_create时,做了以下事情:

    • 内核帮我们在epoll文件系统里建了个file结点;
    • 在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket;
    • 建立一个list链表,用于存储准备就绪的事件.
  2. 调用epoll_ctl时,做了以下事情:

    • 把socket放到epoll文件系统里file对象对应的红黑树上;
    • 给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里.
  3. 调用epoll_wait时,做了以下事情:

    • 观察list链表里有没有数据。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已.
两种触发方式的区别

LT模式下,只要一个描述符上的事件一次没有处理完,会在以后调用epoll_wait时重复返回这个描述符,而ET模式仅在第一次返回, 之后就不会再返回了.

在实现上, 当一个socket描述符上有事件时,内核会把该描述符插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait检查这些socket,如果是LT模式,并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表。所以,LT模式的描述符,只要它上面还有事件,epoll_wait每次都会返回.

epoll的优点
  1. 本身没有最大并发连接的限制, 这比select是个巨大的飞跃.
  2. 效率提升:只有活跃的socket才会主动的去调用callback函数;
  3. 省去不必要的内存拷贝:epoll通过内核与用户空间mmap同一块内存实现。

总结: 在高并发, 活跃连接少的情况下epoll是远远优于select的, 如果在并发量小且活跃连接多的情况下, select就不一定劣于epoll的, 我们要在理解原理的基础上具体情况具体分析.