1:Epoll事件有两种模型:
ET: (边沿触发) ==》缓冲区状态发生变化时,触发一次
LT:(水平触发) ==》有数据可读,读事件一直触发 有空间可写,写事件一直触发。
使用时,不指定事件模型,则默认是水平触发。
2:ET模型
ET边缘触发模型,涉及以下问题:
1:ET模式下,accept如果多个客户端同时触发,只返回一次的话,有丢失。
==》处理应该用while循环,一次性处理完。
if(epoll_events[i].data.fd == sockfd)
{
//因为是et模式,所以这里要用while
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
while((connfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len))>=0)
{
int oldSocketFlag = fcntl(connfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(connfd, F_SETFD, newSocketFlag) == -1)
{
close(connfd);
printf("fcntl set nonblock error. fd [%d] \n", connfd);
continue;
}
{
struct epoll_event client_fd_event;
client_fd_event.data.fd = connfd;
client_fd_event.events = EPOLLIN | EPOLLOUT;
client_fd_event.events |= EPOLLET; //ET
if(epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &client_fd_event) == -1)
{
printf("add clientfd to epoll error \n");
close(connfd);
continue;
}
printf("new client accept, client fd is %d. \n",connfd);
}
}
}
2: ET模式下,如果大量数据同时到达,只触发一次,取一次可能有数据丢失。 应该用while循环一次取完。
if (events[i].events & EPOLLIN)
{
n = 0;
// 一直读直到返回0或者 errno = EAGAIN
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0)
{
n += nread;
}
if (nread == -1 && errno != EAGAIN)
{
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
如果是可写,这里应该根据业务逻辑做一定的处理:
ssize_t socket_write(int sockfd, const char* buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char* p = buffer;
while(1)
{
tmp = write(sockfd, p, total);
if(tmp < 0)
{
// 当send收到信号时,可以继续写,但这里返回-1.
if(errno == EINTR)
return -1;
// 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
// 在这里做延时后再重试.
if(errno == EAGAIN)
{
usleep(1000);
continue;
}
return -1;
}
if((size_t)tmp == total)
return buflen;
total -= tmp;
p += tmp;
}
return tmp;//返回已写字节数
}
3: LT模型
默认情况下,epoll模型为LT模型,如果有数据,会一直触发。
LT模型相对来说代码实现简单,但是cpu执行效率相对ET较差。
同时,LT模型,在accept和接受事件是不必担心的,但是可写事件会一直触发。
思考:可以把accept用LT模型,但是相关的连接fd用et模型。
同时,可读与可写的业务逻辑一般是可读,recv后,读完数据,更改事件为监听可写。
4:LT样例代码。
epoll_lt.c ==>会发现 可写事件一直触发。
//业务逻辑可以优化,在读完后监听可写
//epoll默认时lt模式,
//lt模式有一直触发的问题,需要写完移除,或者读完移除
//lt模式代码简单,可以根据业务读取固定的字节,直到读完为止
/****************************************
默认就是lt模式: 水平触发
需要处理,写完移除,读完移除
https://cloud.tencent.com/developer/article/1636224
*****************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define EPOLL_SIZE 1024
int main(int argc, char* argv[])
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd <0){
printf("create listen socket error \n");
return -1;
}
//设置ip和端口可重用 设置非阻塞
int on = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));
//设置非阻塞
int oldSocketFlag = fcntl(sockfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(sockfd, F_SETFL, newSocketFlag) == -1)
{
close(sockfd);
printf("set nonblock error. \n");
return -1;
}
//初始化服务器
struct sockaddr_in bind_addr;
bind_addr.sin_family = AF_INET;
bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);
bind_addr.sin_port = htons(6666);
//绑定端口
if(bind(sockfd, (struct sockaddr*)&bind_addr, sizeof(bind_addr)) < 0)
{
printf("bind sockfd error \n");
close(sockfd);
return -1;
}
//启动监听
if(listen(sockfd, SOMAXCONN) < 0)//内核内规定的最大连接数
{
printf("listen sockfd error \n");
close(sockfd);
return -1;
}
//创建epoll
int epfd = epoll_create(1);
if(epfd == -1)
{
printf("create epoll fd error . \n");
close(sockfd);
return -1;
}
//设置相关参数,默认lt,添加fd到epoll中
struct epoll_event listen_fd_event;
listen_fd_event.data.fd = sockfd;
listen_fd_event.events = EPOLLIN; //默认是LT
if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &listen_fd_event) == -1)
{
printf("epoll ctl listen fd error. \n");
close(sockfd);
return -1;
}
//这里是主要的逻辑区
struct epoll_event epoll_events[EPOLL_SIZE];
int nready;
while(1)
{
nready = epoll_wait(epfd, epoll_events, EPOLL_SIZE, 1000);
if(nready < 0)
{
if (errno == EINTR)// 信号被中断
continue;
printf("epoll_wait error. \n");
break;
}else if(nready == 0) // 超时,继续
{
continue;
}
//开始处理响应的事件,这里是LT
for(int i =0; i { if(epoll_events[i].events & EPOLLIN) { //accept判断,默认是LT if(epoll_events[i].data.fd == sockfd) { //accept接收 以及设置非阻塞,放入epoll中 struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); if(clientfd == -1) { printf("accept error %s \n ", strerror(errno)); continue; } //设置非阻塞 int oldSocketFlag = fcntl(clientfd, F_GETFL, 0); int newSocketFlag = oldSocketFlag | O_NONBLOCK; if (fcntl(clientfd, F_SETFD, newSocketFlag) == -1) { close(clientfd); printf("fcntl set nonblock error. fd [%d] \n", clientfd); continue; } //加入epoll 监听读和写事件 { struct epoll_event client_fd_event; client_fd_event.data.fd = clientfd; client_fd_event.events = EPOLLIN | EPOLLOUT; if(epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) == -1) { printf("add clientfd to epoll error \n"); close(clientfd); continue; } printf("new client accept, client fd is %d. \n",clientfd); } } else { printf("clinetfd [%d], recv data :\n", epoll_events[i].data.fd); //连接的客户发来数据 char recvbuff[1024] = {0}; int recvsize = recv(epoll_events[i].data.fd, recvbuff, 1024, 0); if(recvsize == 0)// 关闭连接 { if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("client disconnection error from epoll \n"); close(epoll_events[i].data.fd); continue; } printf("client disconnected,clientfd is [%d] \n", epoll_events[i].data.fd); close(epoll_events[i].data.fd); }else if(recvsize < 0) //出错情况下也是移除 { if (errno == EWOULDBLOCK && errno == EINTR) //不做处理 { continue; } if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("recv client data error.del epoll error \n"); close(epoll_events[i].data.fd); continue; } printf("recv client data error, clientfd is [%d] \n", epoll_events[i].data.fd); close(epoll_events[i].data.fd); }else { //正常接收到的数据 printf("recv client[%d] data success [%s]. \n", epoll_events[i].data.fd, recvbuff); } } }else if(epoll_events[i].events & EPOLLOUT) { if(epoll_events[i].data.fd == sockfd) { continue; } //只处理客户端的连接 会一直触发 //还需要接受 没法删除,这应该适合接收后发送的逻辑 printf("EPOLLOUT send buff. \n"); }else if(epoll_events[i].events & EPOLLERR) //给已经关闭的端口 { //应该关闭移除该端口 if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("recv client data error.del epoll error \n"); close(epoll_events[i].data.fd); continue; } printf("epoll error . EPOLLERR \n"); close(epoll_events[i].data.fd); } } } close(sockfd); close(epfd); return 0; } 5:ET样例代码 /************************************************* 使用et,一定要设置成非阻塞 1:处理accept 2:处理发送和接受,发送 **************************************************/ #include #include #include #include #include #include #include #include #include #include #define EPOLL_SIZE 1024 int main(int argc, char* argv[]) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd <0){ printf("create listen socket error \n"); return -1; } //设置ip和端口可重用 设置非阻塞 int on = 1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)); setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on)); //设置非阻塞 int oldSocketFlag = fcntl(sockfd, F_GETFL, 0); int newSocketFlag = oldSocketFlag | O_NONBLOCK; if (fcntl(sockfd, F_SETFL, newSocketFlag) == -1) { close(sockfd); printf("set nonblock error. \n"); return -1; } //初始化服务器 struct sockaddr_in bind_addr; bind_addr.sin_family = AF_INET; bind_addr.sin_addr.s_addr = htonl(INADDR_ANY); bind_addr.sin_port = htons(6666); //绑定端口 if(bind(sockfd, (struct sockaddr*)&bind_addr, sizeof(bind_addr)) < 0) { printf("bind sockfd error \n"); close(sockfd); return -1; } //启动监听 if(listen(sockfd, SOMAXCONN) < 0)//内核内规定的最大连接数 { printf("listen sockfd error \n"); close(sockfd); return -1; } //创建epoll int epfd = epoll_create(1); if(epfd == -1) { printf("create epoll fd error . \n"); close(sockfd); return -1; } //设置相关参数,默认lt,添加fd到epoll中 //这里用的是ET struct epoll_event listen_fd_event; listen_fd_event.data.fd = sockfd; listen_fd_event.events = EPOLLIN; listen_fd_event.events |= EPOLLET; //ET if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &listen_fd_event) == -1) { printf("epoll ctl listen fd error. \n"); close(sockfd); return -1; } //ET模式下的处理,一定要是非阻塞的。 //et模式处理时 要进行循环达到一次接受完,阻塞的话最后一次就会阻塞住 struct epoll_event epoll_events[EPOLL_SIZE]; int nready; while(1) { nready = epoll_wait(epfd, epoll_events, EPOLL_SIZE, 1000); if(nready < 0) { if (errno == EINTR)// 信号被中断 continue; printf("epoll_wait error. \n"); break; }else if(nready == 0) // 超时,继续 { continue; } int connfd = -1; for(int i =0; i { if(epoll_events[i].events & EPOLLIN) //有可读事件 { if(epoll_events[i].data.fd == sockfd) { //因为是et模式,所以这里要用while struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); while((connfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len))>=0) { int oldSocketFlag = fcntl(connfd, F_GETFL, 0); int newSocketFlag = oldSocketFlag | O_NONBLOCK; if (fcntl(connfd, F_SETFD, newSocketFlag) == -1) { close(connfd); printf("fcntl set nonblock error. fd [%d] \n", connfd); continue; } { struct epoll_event client_fd_event; client_fd_event.data.fd = connfd; client_fd_event.events = EPOLLIN | EPOLLOUT; client_fd_event.events |= EPOLLET; //ET if(epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &client_fd_event) == -1) { printf("add clientfd to epoll error \n"); close(connfd); continue; } printf("new client accept, client fd is %d. \n",connfd); } } }else { printf("clinetfd [%d], recv data :\n", epoll_events[i].data.fd); //开始接受 char recvbuff[1024] = {0}; int recvsize = -1; //一次性读完 while((recvsize = recv(epoll_events[i].data.fd, recvbuff, 1024, 0))>0) { printf("recvbuff:[%s] recvsize:[%d] \n", recvbuff, recvsize); } if(recvsize == 0) { if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("client disconnection error from epoll \n"); close(epoll_events[i].data.fd); continue; } printf("client disconnected,clientfd is [%d] \n", epoll_events[i].data.fd); close(epoll_events[i].data.fd); }else if(recvsize < 0) { if (errno == EWOULDBLOCK && errno == EINTR) //不做处理 { continue; } if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("recv client data error.del epoll error \n"); close(epoll_events[i].data.fd); continue; } printf("recv client data error, clientfd is [%d] \n", epoll_events[i].data.fd); close(epoll_events[i].data.fd); } } }else if(epoll_events[i].events & EPOLLOUT) { if(epoll_events[i].data.fd == sockfd) { continue; } //这里et,应该只触发一次 printf("EPOLLOUT send buff\n "); }else if(epoll_events[i].events & EPOLLERR) //给已经关闭的端口 { //应该关闭移除该端口 if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1) { printf("recv client data error.del epoll error \n"); close(epoll_events[i].data.fd); continue; } printf("epoll error . EPOLLERR \n"); close(epoll_events[i].data.fd); } } } close(sockfd); close(epfd); return 0; } 6:思考优化, 如何适应业务逻辑实现相关得到功能,recv后才开始监听可写事件。 相关的参数优化,相关的tcp状态变化等。