如果一个IO调用不是在调用它之后立即返回,而是在它完成了所有操作或者超时之后才返回,那么它就是一个典型的同步IO。比如,客户端通过调用connect()函数向服务端发起TCP连接请求时,只有当双方完成了三次握手或者超时之后,connect()函数才会返回。
1.阻塞调用
在初识socket编程时,我们编写的第一个回射客户/服务器模型就是一个阻塞调用的例子(详见http://dulishu.top/linux-socket02/),其中客户端代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <arpa/inet.h> #define ERR_EXIT(m)\ do\ {\ perror(m);\ exit(EXIT_FAILURE);\ }while(0) int main(void) { int sock; if((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) ERR_EXIT("socket"); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htonl(512119); servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) ERR_EXIT("connect"); char sendbuf[1024] = {0}; char recvbuf[1024] = {0}; while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL) { write(sock, sendbuf, strlen(sendbuf)); read(sock, recvbuf, sizeof(recvbuf)); fputs(recvbuf, stdout); memset(recvbuf, 0, sizeof(recvbuf)); } close(sock); return 0; } |
其中,所有的网络调用都是阻塞调用。比如,connect()函数在建立连接之后才会返回(不报错、不超时的情况下);write()函数在将输出数据刷新到内核的写缓冲区之后才会返回;read()函数在收到数据之后或TCP连接关闭之后才会返回。
阻塞IO编程和非阻塞IO编程没有绝对的好坏之分。如果你的应用程序在那个时候(阻塞时)没有其他的事情要做,那么阻塞IO会是一个不错的选择;但是,如果在那个时候,你还有其他的请求或者其他的任务需要去做,那么阻塞IO就不适合你了。比如,处理多连接时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
/* This won't work. */ char buf[1024]; int i, n; while (i_still_want_to_read()) { for (i=0; i<n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n==0) handle_close(fd[i]); else if (n<0) handle_error(fd[i], errno); else handle_input(fd[i], buf, n); } } |
其中,如果fd[1]有可读事件产生,而fd[0]没有可读事件。那么,fd[0]将一直阻塞,进而导致fd[1]无法及时处理它的可读事件。
针对上述问题,你可以选择使用多进程或多线程的方案来解决。比如,为每个连接创建一个进程或线程。使用多进程的方式实现多客户连接的示例可以参考这篇文章:
2.fcntl
然而,进程的创建(甚至是线程的创建)有的时候也是很昂贵的。使用fcntl()函数可以使你的socket不再阻塞:
fcntl(fd, F_SETFL, O_NONBLOCK);
使用fcntl使socket不再阻塞之后,当你再次通过该socket进行read(或其他网络调用函数)调用时,要么它立刻读取到数据并返回;要么立即返回一个错误码(比如EAGAIN),表示暂时无法操作。
此时,上面的代码片段可以稍作修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
/* This will work, but the performance will be unforgivably bad. */ int i, n; char buf[1024]; for (i=0; i < n_sockets; ++i) fcntl(fd[i], F_SETFL, O_NONBLOCK); while (i_still_want_to_read()) { for (i=0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } } |
3.select
再深入考虑上面的代码片段。虽然,我们使每个socket都不再阻塞,解决了1中提及的问题。然而,又带来了新的问题:如果所有的socket都没有可读事件,循环将一直进行,浪费CPU的时间片;不管某个或某些连接是否产生了可读事件,你都将会为每一个连接执行一个内核调用。
所以,一个比较理想的方案是:让内核在某个或某些socket产生了可读事件的时候,再去通知用户;同时,也让内核明确的告诉用户,是哪个或哪些socket产生了这些事件。这样一来,不仅减少了循环所占用的CPU时间,还减少了内核调用的次数。
对此,比较古老的一种做法是使用select()函数。该函数提供了三个套接字集合,分别用于读、写、异常事件。在调用select()函数之后,它会一直等待直到其中一个套接字集合中的套接字产生了感兴趣事件,然后修改集合,等待用户处理。
在初识socket编程时,我们也编写了一个基于select()的回射客户/服务器模型,见如下代码片段(详见:http://dulishu.top/linux-socket-13/、http://dulishu.top/socket-select/):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
int client[FD_SETSIZE]; int maxi = 0; int i; for( i = 0; i<FD_SETSIZE;i++) client[i] = -1; int nready; int maxfd = listenfd; fd_set rset; fd_set allset; FD_ZERO(&rset); FD_ZERO(&allset); FD_SET(listenfd, &allset); while(1){ rset = allset; nready = select(maxfd+1, &rset, NULL, NULL, NULL); if ( nready == -1){ if(errno == EINTR) continue; ERR_EXIT("slelct"); } if (nready ==0) continue; if(FD_ISSET(listenfd, &rset)){ peerlen = sizeof(peeraddr); conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen); if (conn == -1) ERR_EXIT("accept"); for ( i = 0; i<FD_SETSIZE; i++){ if(client[i] < 0){ client[i] = conn; if(i > maxi) maxi = i; break; } } if (i==FD_SETSIZE){//没有找到空闲位置 fprintf(stderr, "too many clients\n"); exit(EXIT_FAILURE); } printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)); FD_SET(conn, &allset); if (conn > maxfd) maxfd = conn; if( -- nready <= 0)//处理一个事件后就减1 continue; } for (i = 0; i<=maxi; i++){ conn = client[i]; if (conn == -1) continue; if (FD_ISSET(conn, &rset)){ char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf,1024); if(ret == -1) ERR_EXIT("readline"); else if(ret ==0 ){ printf("client close\n"); FD_CLR(conn, &allset); client[i] = -1; } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= 0) break; } } } |
4.select()替代品
虽然select()函数解决了3中提及的问题。但是,它也带来了一些其他的问题。
从上面的代码片段可以观察出:每当产生感兴趣事件之后,用户都需要去遍历一遍所有的socket,socket数量越多,遍历的次数越多,耗时就越久。这也是为什么在上面的代码中,我们将maxi尽可能压缩小的原因。
所以,当socket数量较大时,select()的性能也很堪忧呀。
对此,不同的操作系统提供了不同的select()替代品。主要有:poll(), epoll(), kqueue(), evports, /dev/poll等等。不幸的是,这些select()替代品没有一个通用的标准,比如,linux的epolll(), BSDs的kqeue(),Solaris的evports等,进而导致移植性低的问题。
5.libevent
对于不同平台之间的select()替换品没有一个统一标准,导致移植性低的问题。libevent提供了解决方案,它将这些select()替代品封装起来,提供了一致的接口供用户使用,实现了跨平台。
初识libevent,我们先实现一个简单的示例,来直观的感受一下libevent的魅力:客户端发送一条消息,服务端接收消息后将消息中的小写字母转换为大写字母,并发送给客户端,客户端再打印输出。
(1)服务端echosrv.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
#include <stdio.h> #include <unistd.h> #include <string.h> #include <errno.h> #include <assert.h> #include <stdlib.h> #include <ctype.h> /*For sockaddr_in*/ #include <netinet/in.h> /*For socket functions*/ #include <sys/socket.h> /*For fcntl*/ #include <fcntl.h> #include <event2/event.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); struct fd_state{ char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state* alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if(!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if(!state->read_event){ free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if(!state->write_event){ event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024]; int i; ssize_t result; while(1){ assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if(result <= 0) break; for(i=0; i<result; ++i){ if(state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = toupper(buf[i]); if(buf[i] == '\n'){ assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; } } } if(result == 0){ free_fd_state(state); } else if(result < 0){ if(errno == EAGAIN) return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while(state->n_written < state->write_upto){ ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if(result < 0){ if(errno == EAGAIN) return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; } if(state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1; event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_in ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if(fd < 0){ perror("accept"); } else if(fd > FD_SETSIZE){ close(fd); } else{ struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); assert(state->write_event); event_add(state->read_event, NULL); } } void run() { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if(!base) return; sin.sin_family = AF_INET; sin.sin_port = htons(6666); sin.sin_addr.s_addr = 0; listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); if(bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0){ perror("bind"); return; } if(listen(listener, 16) < 0){ perror("listener"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); event_add(listener_event, NULL); event_base_dispatch(base); } int main(int argc, char *argv[]) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; } |
(2)客户端echocli.c
客户端未使用Libevent库,编写比较简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <netinet/in.h> #include <sys/socket.h> #include <arpa/inet.h> int main() { int fd; if((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("socket."); return -1; } struct sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(6666); sin.sin_addr.s_addr = inet_addr("127.0.0.1"); if(connect(fd, (struct sockaddr*)&sin, sizeof(sin)) < 0){ perror("connect."); return -1; } char sendbuf[1024] = {0}; char recvbuf[1024] = {0}; while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL){ write(fd, sendbuf, strlen(sendbuf)); read(fd, recvbuf, sizeof(recvbuf)); fputs(recvbuf, stdout); memset(recvbuf, 0, sizeof(recvbuf)); } close(fd); return 0; } |
(3)编译、运行
#服务端
1 2 |
[root@localhost 01]# gcc -Wall -g echosrv.c -o echosrv -levent [root@localhost 01]# ./echosrv |
#客户端
1 2 3 4 |
[root@localhost 01]# gcc -Wall -g echocli.c -o echocli [root@localhost 01]# ./echocli hello world! HELLO WORLD! |
6.其他
bufferevent的相关接口,在代码中没有体现,而是使用了一个自定义的fd_state结构体来管理读写缓冲区。对于bufferevent的内容,后续介绍。
参考:
http://www.wangafu.net/~nickm/libevent-book/01_intro.html
拜读了,多多学习总是好的!
谢谢