epoll 惊群效应实测

惊群效应

惊群简单来说就是多个进程或者线程在等待同一个事件,当事件发生时,所有线程和进程都会被内核唤醒。唤醒后通常只有一个进程获得了该事件并进行处理,其他进程发现获取事件失败后又继续进入了等待状态,在一定程度上降低了系统性能。
常见的惊群问题有两种:
Accept惊群问题,多个accept的进程同时被唤醒,该问题已于 linux2.6 解决,本文不再讨论
Epoll惊群问题,虽然accept惊群问题已被内核解决,但epoll仍旧会触发fd的可读状态,触发读事件

epoll 惊群测试

测试思路

  • 主进程创建socket
  • 从进程通过把该socket注册为epoll的可读事件,需要在fork之后创建epoll,否则多个进程会公用同一个epoll,进程不能识别其他进程产生的fd
  • 注册listen fd的可读状态,并触发accept
  • 观察输出信息
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>


const int MAX_PROC_NUM = 4;
const int MAX_EVENTS = 128;
const int PORT = 8081;
const int BUF_SIZE = 1024;
const int CLIENT_SIZE = 128;
int g_pids[MAX_PROC_NUM] = {0};

void sig_handler(int signo) {
    int i;
    for (i = 0; i < MAX_PROC_NUM; i++) {
        kill(g_pids[i], SIGKILL);
    }
}


int child_procedure(int listenfd) {
    struct epoll_event ev;
    struct epoll_event events[MAX_EVENTS];
    int epfd = 0;
    int pid = 0;
    char buf[BUF_SIZE];
    int cnt = 0;
    int i = 0;

    pid = getpid();

    epfd = epoll_create(CLIENT_SIZE + 1);
    if (0 == epfd) {
        printf("Create epoll failed\n");
        return -1;
    }

    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listenfd;

    if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) < 0) {
        printf("Add ev failed\n");
        return -1; 
    }   

    printf("Epoll init finished, process pid: %d\n", pid);

    while (1) {
        cnt = epoll_wait(epfd, events, MAX_EVENTS, 0);
        if (cnt <= 0) {
            continue;
        }

        for (i = 0; i < cnt; i++) {
            if (events[i].data.fd == listenfd) {
                // 新连接请求
                int newfd;
                printf("Process %d receive a connection request\n", pid);
                newfd = accept(listenfd, NULL, 0);
                if(newfd <=0) {
                    printf("Process %d accept failed\n", pid);
                    continue;
                }
                fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFD, 0)|O_NONBLOCK); 
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = newfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, newfd, &ev);

            } else {
                int n = read(events[i].data.fd, buf, 1024);
                printf("Process %d receive a msg, length %d\n", pid, n);
                if (n != 0) {
                    write(events[i].data.fd, buf, n);
                }
                close(events[i].data.fd);
                epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
            }

        }

    }
    return 0;
}

int main()
{
    int listenfd = 0;
    int cnt = 0;
    int i = 0;
    struct sockaddr_in servaddr;
    
    // signal(SIGINT, sig_handler);
    // signal(SIGKILL, sig_handler);

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
   
    fcntl(listenfd, F_SETFL, fcntl(listenfd, F_GETFD, 0)|O_NONBLOCK);
    
    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1) {
        printf("bind error\n");
        return -1;
    }

    if (listen(listenfd, CLIENT_SIZE) == -1) {
        printf("Listen failed\n");
        return -1;
    }

    for (i = 0; i < MAX_PROC_NUM; i++) {
        g_pids[i] = fork();
        if (0 == g_pids[i]) {
            // 子进程
            break;
        }
    }

    if (i == MAX_PROC_NUM) {
        // 注册信号,父进程退出,子进程一起kill掉
        signal(SIGINT, sig_handler);
        signal(SIGKILL, sig_handler);

        // 父进程,阻塞
        while(1) {
            sleep(100);
        }
    } else {
        // 子进程,进入子进程流程
        return child_procedure(listenfd);
    }
    
    return 0;

}



测试结果

Epoll init finished, process pid: 20628
Epoll init finished, process pid: 20629
Epoll init finished, process pid: 20630
Epoll init finished, process pid: 20631
Process 20630 receive a connection request
Process 20631 receive a connection request
Process 20628 receive a connection request
Process 20629 receive a connection request
Process 20630 receive a msg, length 169
Process 20631 accept failed
Process 20628 accept failed
Process 20629 accept failed

根据结果显示,fork了4个进程,4个进程都收到了可读事件导致被唤醒,实际只能有1个进程accept该fd

惊群问题改进

同一个listen fd添加到多个epoll中,其中一个典型应用就是nginx,nginx增加了一把锁,同一时刻只有一个进程在wait状态,这样就保证了同一个可读事件不会触发给多个进程,为了减少加锁的时间,采用先将事件放入队列,处理完accept立即解锁,收发包并不占用这个全局锁,这把锁不单单用于解决惊群问题,还是进程间负载均衡重要的一环
详细可参考:https://blog.csdn.net/initphp/article/details/52266844

0 条评论

What is 7 + 8 ?
Please leave these two fields as-is:
不答对这道小题,是不给通过的哦

昵称

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

沙发空缺中,还不快抢~