上一篇学习总结了并发的初步知识,学习了select实现并发服务器的两点限制以及poll函数的使用。本篇博文将学习epoll相关知识。
1.epoll
- 头文件:#include <sys/epoll.h>
- int epoll_create(int size);
- int epoll_create1(int flags);
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
(1)epoll_create和epoll_create1函数功能相同,用于创建一个epoll句柄实例。
- 原型:int epoll_create(int size);
- 功能:创建一个epoll的句柄
- 参数:
- size:用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。
- flags:若指定为EPOLL_CLOEXEC则与open函数的O_CLOEXEC标志类似,若进程被替换时文件描述符会被关闭。若指定为0,则与epoll_create函数一样。
- 返回值:成功返回一个非负文件描述符;失败返回-1
需要注意的是:当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
(2)epoll_ctl函数,可以将一个I/O文件描述符添加到epoll来管理。
- 原型:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event );
- 功能:epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
- 参数:
- epfd:epoll实例句柄,epoll_create()的返回值
- op:表示动作,用三个宏来表示,EPOLL_CTL_ADD:注册新的fd到epfd中;EPOLL_CTL_MOD:修改已经注册的fd的监听事件;EPOLL_CTL_DEL:从epfd中删除一个fd;
- fd:需要监听的fd
- event:告诉内核需要监听什么事件,struct epoll_event结构如下:
- 返回值:成功返回0;失败返回-1
1 2 3 4 5 6 7 8 9 10 11 12 |
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; //其中 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; |
其中,epoll_event.events可以是以下几个宏的集合:
1 2 3 4 5 6 7 |
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。 |
(3)epoll_wait函数,等待事件。
- 原型:int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 函数功能:等待事件的产生,类似于select()调用。
- 参数:
- epfd:epoll实例句柄,epoll_create()的返回值
- events:结构体指针,用来从内核得到事件的集合
- maxevents:表示每次能处理的最大事件数,告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size
- timeout:超时时间(毫秒,0会立即返回,-1永久阻塞直到事件产生)
- 返回值:返回需要处理的事件个数,返回0表示超时;失败返回-1
(4)使用epoll函数改进回射客户/服务器的服务器端
该修改程序与前面的服务器端代码的整体框架无异。主要修改在main主函数内,使用epoll替代前面的select。
1)首先定义了一个EventList动态数组:
typedef std::vector<struct epoll_event> EventList;
2)之后的创建套接字(sockt函数)、绑定套接字(bind函数)、监听套接字(listen函数)这些“老步骤”与前面的服务器设计一样。
3)然后需要创建epoll实例,使用epoll_create1函数:
1 2 |
int epollfd; epollfd = epoll_create1(EPOLL_CLOEXEC);//flags参数如前所述 |
4)接下来要将感兴趣的文件描述符listenfd加入epoll来管理,使用epoll_ctl函数:
1 2 3 4 |
struct epoll_event event; event.data.fd = listenfd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event); |
其中,我们感兴趣的套接口listenfd,对它感兴趣的事件是EPOLLIN | EPOLLET(EPOLLET表示以边缘的方式触发)。
4)下面就需要去检测哪些I/O产生了感兴趣事件,使用epoll_wait函数:
1 2 3 4 |
EventList events(16); ... nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1) ... |
其中,第一行代码定义了一个events动态数组,数组大小为16,当容量不够时,动态数组可以增大。&*events.begin()为解引用迭代器后再取地址,即动态数组的首元素地址。
5)epoll_wait函数成功返回后(返回感兴趣事件个数),就可以遍历事件进行处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
for (int i = 0; i < nready; ++i) { if (events[i].data.fd == listenfd) { conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen); //此处进行accept返回处理 //accept成功返回后,就多了一个以连接套接字conn,将其加入clients数组中 clients.push_back(conn); activate_nonblock(conn); //此时就要将以连接套接字conn加入 event.data.fd = conn event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event); } } |
其中,clients保存客户端以连接套接字(std::vector<int> clients);activate_nonblock(conn);是将conn设置为非阻塞模式,该函数在前一篇博文中已经封装过了(由fcntl函数实现的);
在获得已连接套接字之后,下一次我们也需要监听它可读事件,因此也要为该套接字准备一个事件:
event.data.fd = conn;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event);
6)在这之后,集合中就不仅仅只有listenfd套接口了。第二次循环时,若不是listenfd产生的事件,就接着判断是不是conn套接字产生了可读事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
else if (events[i].events & EPOLLIN) { conn = events[i].data.fd; if (conn < 0) continue; char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf, 1024); //此处对readline的返回进行处理 if (ret == -1) ERR_EXIT("readline"); if (ret == 0) { printf("client clise\n"); close(conn); event = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event); clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end()); } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); } |
其中,clients.erase(remove(clients.begin(), clients.end(), conn), clients.end());表示先将数组中值为conn的元素移到数组尾部,然后再删除尾部所有值为conn的元素。
服务端echosrv.c完整源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
#include<unistd.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<stdlib.h> #include<stdio.h> #include<errno.h> #include<string.h> #include<signal.h> #include <sys/epoll.h> #include <sys/wait.h> #include <fcntl.h> #include <vector> #include <algorithm> #define ERR_EXIT(m)\ do\ {\ perror(m);\ exit(EXIT_FAILURE);\ }while (0) ssize_t readn(int fd, void *buf, size_t count) { size_t nleft = count; ssize_t nread; char *bufp = (char*)buf; while(nleft > 0){ if((nread = read(fd,bufp, nleft)) < 0){ if (errno == EINTR) continue; return -1; } else if (nread ==0) return count - nleft; bufp += nread; nleft -= nread; } return count; } ssize_t writen(int fd, const void *buf, size_t count) { size_t nleft = count; ssize_t nwritten; char *bufp = (char*)buf; while(nleft> 0){ if((nwritten = write(fd, bufp, nleft)) < 0){ if(errno == EINTR) continue; return -1; } else if(nwritten ==0) continue; bufp += nwritten; nleft -= nwritten; } return count; } ssize_t recv_peek(int sockfd, void *buf, size_t len) { while(1){ int ret = recv(sockfd, buf ,len, MSG_PEEK); if (ret == -1 && errno == EINTR) continue; return ret; } } size_t readline(int sockfd, void *buf, size_t maxline) { int ret; int nread; char *bufp = (char*)buf; int nleft = maxline; while(1){ ret = recv_peek(sockfd, bufp, nleft); if(ret < 0) return ret; else if (ret == 0) return ret; nread = ret; int i; for (i=0; i<nread; i++){ if (bufp[i] == '\n'){ ret = readn(sockfd, bufp, i+1); if (ret != i+1) exit(EXIT_FAILURE); return ret; } } if (nread > nleft) exit(EXIT_FAILURE); nleft -= nread; ret = readn(sockfd, bufp, nread); if (ret != nread) exit(EXIT_FAILURE); bufp += nread; } return -1; } void activate_nonblock(int fd) { int ret; int flags = fcntl(fd, F_GETFL); if (flags == -1) ERR_EXIT("fcntl"); flags |= O_NONBLOCK; ret = fcntl(fd, F_SETFL, flags); if (ret == -1) ERR_EXIT("fcntl"); } typedef std::vector<struct epoll_event> EventList; int main(void) { int count = 0; int listenfd; if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) ERR_EXIT("socket"); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(5581); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) ERR_EXIT("setsockopt"); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) ERR_EXIT("bind"); if (listen(listenfd, SOMAXCONN) < 0) ERR_EXIT("listen"); std::vector<int> clients; int epollfd; epollfd = epoll_create1(EPOLL_CLOEXEC); struct epoll_event event; event.data.fd = listenfd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event); EventList events(16); struct sockaddr_in peeraddr; socklen_t peerlen; int conn; int i; int nready; while (1) { nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1); if (nready == -1) { if (errno == EINTR) continue; ERR_EXIT("epoll_wait"); } if (nready == 0) continue; if ((size_t)nready == events.size()) events.resize(events.size() * 2); for (i = 0; i < nready; i++) { if (events[i].data.fd == listenfd) { peerlen = sizeof(peeraddr); conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen); if (conn == -1) ERR_EXIT("accept"); printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)); printf("count = %d\n", ++count); clients.push_back(conn); activate_nonblock(conn); event.data.fd = conn; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event); } else if (events[i].events & EPOLLIN) { conn = events[i].data.fd; if (conn < 0) continue; char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf, 1024); if (ret == -1) ERR_EXIT("readline"); if (ret == 0) { printf("client close\n"); close(conn); event = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event); clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end()); } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); } } } return 0; } |
客户端可以使用之前的代码,此处就不贴了。
2.epoll与select、poll区别
(1)相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。
(2)内核中select与poll的实现是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
(3)epoll的实现是基于回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。
(4)内核/用户空间内存拷贝问题,如何让内核把fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。
(5)epoll不仅会告诉应用程序有I/O事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。
3.epoll LT/ET模式
- EPOLLLT:水平/电平触发
- EPOLLET:边沿触发
通常认为边沿触发的效率更高。
(1)EPOLLLT
1)此模式下,只要这个fd还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作。
2)完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。
(2)EPOLLET
1)此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息(从epoll队列移除),直到应用程序通过读写操作触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。
2)随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势。但是对程序员的要求也更高。