1. 主页 > 2018世界杯阿根廷 >

Epoll事件ET和LT模型分析

1:Epoll事件有两种模型:

​ ET: (边沿触发) ==》缓冲区状态发生变化时,触发一次

​ LT:(水平触发) ==》有数据可读,读事件一直触发 有空间可写,写事件一直触发。

使用时,不指定事件模型,则默认是水平触发。

2:ET模型

ET边缘触发模型,涉及以下问题:

1:ET模式下,accept如果多个客户端同时触发,只返回一次的话,有丢失。

​ ==》处理应该用while循环,一次性处理完。

if(epoll_events[i].data.fd == sockfd)

{

//因为是et模式,所以这里要用while

struct sockaddr_in client_addr;

socklen_t client_len = sizeof(client_addr);

while((connfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len))>=0)

{

int oldSocketFlag = fcntl(connfd, F_GETFL, 0);

int newSocketFlag = oldSocketFlag | O_NONBLOCK;

if (fcntl(connfd, F_SETFD, newSocketFlag) == -1)

{

close(connfd);

printf("fcntl set nonblock error. fd [%d] \n", connfd);

continue;

}

{

struct epoll_event client_fd_event;

client_fd_event.data.fd = connfd;

client_fd_event.events = EPOLLIN | EPOLLOUT;

client_fd_event.events |= EPOLLET; //ET

if(epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &client_fd_event) == -1)

{

printf("add clientfd to epoll error \n");

close(connfd);

continue;

}

printf("new client accept, client fd is %d. \n",connfd);

}

}

}

2: ET模式下,如果大量数据同时到达,只触发一次,取一次可能有数据丢失。 应该用while循环一次取完。

if (events[i].events & EPOLLIN)

{

n = 0;

// 一直读直到返回0或者 errno = EAGAIN

while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0)

{

n += nread;

}

if (nread == -1 && errno != EAGAIN)

{

perror("read error");

}

ev.data.fd = fd;

ev.events = events[i].events | EPOLLOUT;

epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

}

如果是可写,这里应该根据业务逻辑做一定的处理:

ssize_t socket_write(int sockfd, const char* buffer, size_t buflen)

{

ssize_t tmp;

size_t total = buflen;

const char* p = buffer;

while(1)

{

tmp = write(sockfd, p, total);

if(tmp < 0)

{

// 当send收到信号时,可以继续写,但这里返回-1.

if(errno == EINTR)

return -1;

// 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,

// 在这里做延时后再重试.

if(errno == EAGAIN)

{

usleep(1000);

continue;

}

return -1;

}

if((size_t)tmp == total)

return buflen;

total -= tmp;

p += tmp;

}

return tmp;//返回已写字节数

}

3: LT模型

默认情况下,epoll模型为LT模型,如果有数据,会一直触发。

LT模型相对来说代码实现简单,但是cpu执行效率相对ET较差。

同时,LT模型,在accept和接受事件是不必担心的,但是可写事件会一直触发。

思考:可以把accept用LT模型,但是相关的连接fd用et模型。

​ 同时,可读与可写的业务逻辑一般是可读,recv后,读完数据,更改事件为监听可写。

4:LT样例代码。

epoll_lt.c ==>会发现 可写事件一直触发。

//业务逻辑可以优化,在读完后监听可写

//epoll默认时lt模式,

//lt模式有一直触发的问题,需要写完移除,或者读完移除

//lt模式代码简单,可以根据业务读取固定的字节,直到读完为止

/****************************************

默认就是lt模式: 水平触发

需要处理,写完移除,读完移除

https://cloud.tencent.com/developer/article/1636224

*****************************************/

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define EPOLL_SIZE 1024

int main(int argc, char* argv[])

{

int sockfd = socket(AF_INET, SOCK_STREAM, 0);

if(sockfd <0){

printf("create listen socket error \n");

return -1;

}

//设置ip和端口可重用 设置非阻塞

int on = 1;

setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));

setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));

//设置非阻塞

int oldSocketFlag = fcntl(sockfd, F_GETFL, 0);

int newSocketFlag = oldSocketFlag | O_NONBLOCK;

if (fcntl(sockfd, F_SETFL, newSocketFlag) == -1)

{

close(sockfd);

printf("set nonblock error. \n");

return -1;

}

//初始化服务器

struct sockaddr_in bind_addr;

bind_addr.sin_family = AF_INET;

bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);

bind_addr.sin_port = htons(6666);

//绑定端口

if(bind(sockfd, (struct sockaddr*)&bind_addr, sizeof(bind_addr)) < 0)

{

printf("bind sockfd error \n");

close(sockfd);

return -1;

}

//启动监听

if(listen(sockfd, SOMAXCONN) < 0)//内核内规定的最大连接数

{

printf("listen sockfd error \n");

close(sockfd);

return -1;

}

//创建epoll

int epfd = epoll_create(1);

if(epfd == -1)

{

printf("create epoll fd error . \n");

close(sockfd);

return -1;

}

//设置相关参数,默认lt,添加fd到epoll中

struct epoll_event listen_fd_event;

listen_fd_event.data.fd = sockfd;

listen_fd_event.events = EPOLLIN; //默认是LT

if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &listen_fd_event) == -1)

{

printf("epoll ctl listen fd error. \n");

close(sockfd);

return -1;

}

//这里是主要的逻辑区

struct epoll_event epoll_events[EPOLL_SIZE];

int nready;

while(1)

{

nready = epoll_wait(epfd, epoll_events, EPOLL_SIZE, 1000);

if(nready < 0)

{

if (errno == EINTR)// 信号被中断

continue;

printf("epoll_wait error. \n");

break;

}else if(nready == 0) // 超时,继续

{

continue;

}

//开始处理响应的事件,这里是LT

for(int i =0; i

{

if(epoll_events[i].events & EPOLLIN)

{

//accept判断,默认是LT

if(epoll_events[i].data.fd == sockfd)

{

//accept接收 以及设置非阻塞,放入epoll中

struct sockaddr_in client_addr;

socklen_t client_len = sizeof(client_addr);

int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);

if(clientfd == -1)

{

printf("accept error %s \n ", strerror(errno));

continue;

}

//设置非阻塞

int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);

int newSocketFlag = oldSocketFlag | O_NONBLOCK;

if (fcntl(clientfd, F_SETFD, newSocketFlag) == -1)

{

close(clientfd);

printf("fcntl set nonblock error. fd [%d] \n", clientfd);

continue;

}

//加入epoll 监听读和写事件

{

struct epoll_event client_fd_event;

client_fd_event.data.fd = clientfd;

client_fd_event.events = EPOLLIN | EPOLLOUT;

if(epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) == -1)

{

printf("add clientfd to epoll error \n");

close(clientfd);

continue;

}

printf("new client accept, client fd is %d. \n",clientfd);

}

}

else

{

printf("clinetfd [%d], recv data :\n", epoll_events[i].data.fd);

//连接的客户发来数据

char recvbuff[1024] = {0};

int recvsize = recv(epoll_events[i].data.fd, recvbuff, 1024, 0);

if(recvsize == 0)// 关闭连接

{

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("client disconnection error from epoll \n");

close(epoll_events[i].data.fd);

continue;

}

printf("client disconnected,clientfd is [%d] \n", epoll_events[i].data.fd);

close(epoll_events[i].data.fd);

}else if(recvsize < 0) //出错情况下也是移除

{

if (errno == EWOULDBLOCK && errno == EINTR) //不做处理

{

continue;

}

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("recv client data error.del epoll error \n");

close(epoll_events[i].data.fd);

continue;

}

printf("recv client data error, clientfd is [%d] \n", epoll_events[i].data.fd);

close(epoll_events[i].data.fd);

}else

{

//正常接收到的数据

printf("recv client[%d] data success [%s]. \n", epoll_events[i].data.fd, recvbuff);

}

}

}else if(epoll_events[i].events & EPOLLOUT)

{

if(epoll_events[i].data.fd == sockfd)

{

continue;

}

//只处理客户端的连接 会一直触发

//还需要接受 没法删除,这应该适合接收后发送的逻辑

printf("EPOLLOUT send buff. \n");

}else if(epoll_events[i].events & EPOLLERR) //给已经关闭的端口

{

//应该关闭移除该端口

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("recv client data error.del epoll error \n");

close(epoll_events[i].data.fd);

continue;

}

printf("epoll error . EPOLLERR \n");

close(epoll_events[i].data.fd);

}

}

}

close(sockfd);

close(epfd);

return 0;

}

5:ET样例代码

/*************************************************

使用et,一定要设置成非阻塞

1:处理accept

2:处理发送和接受,发送

**************************************************/

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define EPOLL_SIZE 1024

int main(int argc, char* argv[])

{

int sockfd = socket(AF_INET, SOCK_STREAM, 0);

if(sockfd <0){

printf("create listen socket error \n");

return -1;

}

//设置ip和端口可重用 设置非阻塞

int on = 1;

setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));

setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));

//设置非阻塞

int oldSocketFlag = fcntl(sockfd, F_GETFL, 0);

int newSocketFlag = oldSocketFlag | O_NONBLOCK;

if (fcntl(sockfd, F_SETFL, newSocketFlag) == -1)

{

close(sockfd);

printf("set nonblock error. \n");

return -1;

}

//初始化服务器

struct sockaddr_in bind_addr;

bind_addr.sin_family = AF_INET;

bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);

bind_addr.sin_port = htons(6666);

//绑定端口

if(bind(sockfd, (struct sockaddr*)&bind_addr, sizeof(bind_addr)) < 0)

{

printf("bind sockfd error \n");

close(sockfd);

return -1;

}

//启动监听

if(listen(sockfd, SOMAXCONN) < 0)//内核内规定的最大连接数

{

printf("listen sockfd error \n");

close(sockfd);

return -1;

}

//创建epoll

int epfd = epoll_create(1);

if(epfd == -1)

{

printf("create epoll fd error . \n");

close(sockfd);

return -1;

}

//设置相关参数,默认lt,添加fd到epoll中

//这里用的是ET

struct epoll_event listen_fd_event;

listen_fd_event.data.fd = sockfd;

listen_fd_event.events = EPOLLIN;

listen_fd_event.events |= EPOLLET; //ET

if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &listen_fd_event) == -1)

{

printf("epoll ctl listen fd error. \n");

close(sockfd);

return -1;

}

//ET模式下的处理,一定要是非阻塞的。

//et模式处理时 要进行循环达到一次接受完,阻塞的话最后一次就会阻塞住

struct epoll_event epoll_events[EPOLL_SIZE];

int nready;

while(1)

{

nready = epoll_wait(epfd, epoll_events, EPOLL_SIZE, 1000);

if(nready < 0)

{

if (errno == EINTR)// 信号被中断

continue;

printf("epoll_wait error. \n");

break;

}else if(nready == 0) // 超时,继续

{

continue;

}

int connfd = -1;

for(int i =0; i

{

if(epoll_events[i].events & EPOLLIN) //有可读事件

{

if(epoll_events[i].data.fd == sockfd)

{

//因为是et模式,所以这里要用while

struct sockaddr_in client_addr;

socklen_t client_len = sizeof(client_addr);

while((connfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len))>=0)

{

int oldSocketFlag = fcntl(connfd, F_GETFL, 0);

int newSocketFlag = oldSocketFlag | O_NONBLOCK;

if (fcntl(connfd, F_SETFD, newSocketFlag) == -1)

{

close(connfd);

printf("fcntl set nonblock error. fd [%d] \n", connfd);

continue;

}

{

struct epoll_event client_fd_event;

client_fd_event.data.fd = connfd;

client_fd_event.events = EPOLLIN | EPOLLOUT;

client_fd_event.events |= EPOLLET; //ET

if(epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &client_fd_event) == -1)

{

printf("add clientfd to epoll error \n");

close(connfd);

continue;

}

printf("new client accept, client fd is %d. \n",connfd);

}

}

}else

{

printf("clinetfd [%d], recv data :\n", epoll_events[i].data.fd);

//开始接受

char recvbuff[1024] = {0};

int recvsize = -1;

//一次性读完

while((recvsize = recv(epoll_events[i].data.fd, recvbuff, 1024, 0))>0)

{

printf("recvbuff:[%s] recvsize:[%d] \n", recvbuff, recvsize);

}

if(recvsize == 0)

{

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("client disconnection error from epoll \n");

close(epoll_events[i].data.fd);

continue;

}

printf("client disconnected,clientfd is [%d] \n", epoll_events[i].data.fd);

close(epoll_events[i].data.fd);

}else if(recvsize < 0)

{

if (errno == EWOULDBLOCK && errno == EINTR) //不做处理

{

continue;

}

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("recv client data error.del epoll error \n");

close(epoll_events[i].data.fd);

continue;

}

printf("recv client data error, clientfd is [%d] \n", epoll_events[i].data.fd);

close(epoll_events[i].data.fd);

}

}

}else if(epoll_events[i].events & EPOLLOUT)

{

if(epoll_events[i].data.fd == sockfd)

{

continue;

}

//这里et,应该只触发一次

printf("EPOLLOUT send buff\n ");

}else if(epoll_events[i].events & EPOLLERR) //给已经关闭的端口

{

//应该关闭移除该端口

if(epoll_ctl(epfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, 0) == -1)

{

printf("recv client data error.del epoll error \n");

close(epoll_events[i].data.fd);

continue;

}

printf("epoll error . EPOLLERR \n");

close(epoll_events[i].data.fd);

}

}

}

close(sockfd);

close(epfd);

return 0;

}

6:思考优化,

如何适应业务逻辑实现相关得到功能,recv后才开始监听可写事件。

相关的参数优化,相关的tcp状态变化等。