前一篇我们介绍了select相关内容,本次我们将继续深入学习select函数,学习读、写、异常事件发生的条件以及用select改进回射服务器程序。
1.select函数
int select(int nfds, fd_set *readfds, fd_set *writefds, fs_set *excepfds, struct timeval *timeout)
ndfs:是最大描述符+1
(1)select相当于一个中心管理器,能够统一管理多个I/O,一旦其中一个或多个I/O产生了我们感兴趣的事件,就会被select检测到。并返回I/O事件发生的个数。
(2)通常将用select实现的服务器,称为并发服务器。当select检测到事件后,一个个的遍历事件,然后依次处理之。这种服务器是并发的但不是并行的。
(3)并发不代表并行,用select是无法实现并行的,它无法充分利用多核CPU。(对单核CPU来说,是没有并行可言的)
(4)对此,可以使用多个进程或线程使用select,达到“并行”的目的。
2.读、写、异常事件发生条件
- 可读:
- 套接口缓冲区有数据可读
- 连接的读一半关闭,即接收FIN段,读操作将返回0
- 如果是监听套接口,已完成连接队列不为空时
- 套接口上发生了一个错误待处理,错误可以通过getsockopt指定SO_ERROR选项来获取
- 可写:
- 套接口发送缓冲区有空间容纳数据
- 连接的写一半关闭。即收到RST段之后,再次调用write操作。
- 套接口上发生了一个错误待处理,错误可以通过getsockopt指定SO_ERROR选项来获取
- 异常:
- 套接口存在带外数据
3.用select改进回射服务器
(1)回想之前服务器端实现并发的设计机制:fork()一个进程出来,子进程处理已连接套接口的通信模块echo_srv;父进程负责监听套接口的监听连接。
(2)此处用select取代上述的并发设计。只需要一个进程即可实现:select可以统一管理监听套接口发生的事件和已连接套接口发生的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
int client[FD_SETSIZE]; //select限制:最多能处理FD_SETSIZE个文件描述符 int maxi = 0; //最大不空闲位置,为了是减少后续的遍历次数,不需要每次都遍历到FD_SETSIZE int i; for(i=0; i<FD_SETSIZE; ++i) client[i] = -1; //等于-1表示空闲的 int nready; //检测到的事件个数 int maxfd = listenfd; fd_set rset; fd_set allset; FD_ZERO(&rset); FD_ZERO(&allset); FD_SET(listenfd, &allset); while(1) { rset = allset; nready = select(maxfd+1, &rset, NULL, NULL, NULL); if (nready == -1) { if (errno == EINTR) continue; ERR_EXIT("select"); } if (nready == 0) continue; if (FD_ISSET(listenfd, &rset)) { peerlen = sizeof(peeraddr); conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen); if (conn == -1) ERR_EXIT("accept"); //前一个版本中每个进程中都有一个conn,但是这里,每次accept之后 //conn会被覆盖,所以需要一个数组来保存conn for (i = 0; i < FD_SETSIZE; ++i)//将conn保存到某一个空闲的位置 { if (client[i] < 0) { client[i] = conn; if (i > maxi) maxi = i; break; } } //没有找到空闲的位置 if (i == FD_SETSIZE) { fprintf(stderr, "too many clinets\n"); exit(EXIT_FAILURE); } //否则就打印对方的ip和端口信息 printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)); FD_SET(conn, &allset); //第一次循环时,只有一个listenfd套接口,此后conn便加入allset内 if (conn > maxfd) //更新maxfd maxfd = conn; if (--nready <=0) //检测的事件已经处理完了 continue; } //已连接套接口产生事件,以下处理已连接套接口 for(i = 0; i<=maxi; ++i) { conn = client[i]; if (conn == -1) //等于-1表示空闲的 continue; if (FD_ISSET(conn, &rset)) { char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf, 1024); if (ret == -1) ERR_EXIT("readline"); else if (ret == 0) { printf("client close\n"); FD_CLR(conn, &allset); //对方关闭,就可以将conn从allset中清除 client[i] = -1; //并将该位置置为空闲 } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= 0) break; } } } |
(3)完整echosrv.c源码
|
#include<unistd.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<stdlib.h> #include<stdio.h> #include<errno.h> #include<string.h> #include<signal.h> #define ERR_EXIT(m)\ do\ {\ perror(m);\ exit(EXIT_FAILURE);\ }while (0) ssize_t readn(int fd, void *buf, size_t count) { size_t nleft = count; ssize_t nread; char *bufp = (char*)buf; while(nleft > 0){ if((nread = read(fd,bufp, nleft)) < 0){ if (errno == EINTR) continue; return -1; } else if (nread ==0) return count - nleft; bufp += nread; nleft -= nread; } return count; } ssize_t writen(int fd, const void *buf, size_t count) { size_t nleft = count; ssize_t nwritten; char *bufp = (char*)buf; while(nleft> 0){ if((nwritten = write(fd, bufp, nleft)) < 0){ if(errno == EINTR) continue; return -1; } else if(nwritten ==0) continue; bufp += nwritten; nleft -= nwritten; } return count; } ssize_t recv_peek(int sockfd, void *buf, size_t len) { while(1){ int ret = recv(sockfd, buf ,len, MSG_PEEK); if (ret == -1 && errno == EINTR) continue; return ret; } } size_t readline(int sockfd, void *buf, size_t maxline) { int ret; int nread; char *bufp = buf; int nleft = maxline; while(1){ ret = recv_peek(sockfd, bufp, nleft); if(ret < 0) return ret; else if (ret == 0) return ret; nread = ret; int i; for (i=0; i<nread; i++){ if (bufp[i] == '\n'){ ret = readn(sockfd, bufp, i+1); if (ret != i+1) exit(EXIT_FAILURE); return ret; } } if (nread > nleft) exit(EXIT_FAILURE); nleft -= nread; ret = readn(sockfd, bufp, nread); if (ret != nread) exit(EXIT_FAILURE); bufp += nread; } return -1; } void echo_srv(int conn) { char recvbuf[1024]; while(1){ memset(recvbuf, 0, sizeof(recvbuf)); int ret = readline(conn, recvbuf,1024); if(ret == -1) ERR_EXIT("readline"); else if(ret ==0 ){ printf("client close\n"); break; } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); } } int main (void) { signal(SIGCHLD, SIG_IGN); int listenfd; if((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) ERR_EXIT("socket"); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(5581); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) ERR_EXIT("setsockopt"); if(bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) ERR_EXIT("bind"); if(listen(listenfd, SOMAXCONN) < 0) ERR_EXIT("liten"); struct sockaddr_in peeraddr; socklen_t peerlen = sizeof(peeraddr); int conn; /* pid_t pid; while(1) { if((conn = accept(listenfd, (struct sockaddr*) &peeraddr, &peerlen)) < 0) ERR_EXIT("accept"); printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port)); pid = fork(); if(pid == -1) ERR_EXIT("fork"); if(pid == 0) { close(listenfd); echo_srv(conn); exit(EXIT_SUCCESS); } else close(conn); } */ int client[FD_SETSIZE]; int maxi = 0; int i; for( i = 0; i<FD_SETSIZE;i++) client[i] = -1; int nready; int maxfd = listenfd; fd_set rset; fd_set allset; FD_ZERO(&rset); FD_ZERO(&allset); FD_SET(listenfd, &allset); while(1){ rset = allset; nready = select(maxfd+1, &rset, NULL, NULL, NULL); if ( nready == -1){ if(errno == EINTR) continue; ERR_EXIT("slelct"); } if (nready ==0) continue; if(FD_ISSET(listenfd, &rset)){ peerlen = sizeof(peeraddr); conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen); if (conn == -1) ERR_EXIT("accept"); for ( i = 0; i<FD_SETSIZE; i++){ if(client[i] < 0){ client[i] = conn; if(i > maxi) maxi = i; break; } } if (i==FD_SETSIZE){//没有找到空闲位置 fprintf(stderr, "too many clients\n"); exit(EXIT_FAILURE); } printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)); FD_SET(conn, &allset); if (conn > maxfd) maxfd = conn; if( -- nready <= 0)//处理一个事件后就减1 continue; } for (i = 0; i<=maxi; i++){ conn = client[i]; if (conn == -1) continue; if (FD_ISSET(conn, &rset)){ char recvbuf[1024] = {0}; int ret = readline(conn, recvbuf,1024); if(ret == -1) ERR_EXIT("readline"); else if(ret ==0 ){ printf("client close\n"); FD_CLR(conn, &allset); client[i] = -1; } fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= 0) break; } } } return 0; } |
(5)客户端源码可参考上一篇博文——用select改进后的客户端。
小结:
(1)用select改进后的回射客户/服务器模型较多进程回射客户/服务器有了一定的性能提升。
(2)本质思想,用select来管理多个套接口是否产生事件。要管理的套接口有两类:一类是监听套接口,另一类是已连接套接口。
(3)初始状态,只有一个套接口感兴趣——监听套接口,我们就将监听套接口放入集合中。然后调用select判定是否是监听套接口在集合中,若是,表明监听套接口产生了可读事件——这意味着对等方已经连接过来了。accept不再阻塞–>进行返回–>获得一个已连接套接口。并将该已连接套接口放入集合,以便下次我们可以关心其产生感兴趣事件。
(由于我们是用单进程的方式来实现,若需要维护与多个客户端的连接,就需要将该已连接套接口保存到一个数组中。)