首页 > 解决方案 > Linux epoll 系统调用,等待数据可用

问题描述

使用简单的父子程序测试 Linux 系统调用epoll 。

预期行为

当孩子每秒写一个 no 时,父母应该从管道中读取它并每秒写一个 no 到 stdout。

实际行为

父母等到孩子写完所有的号码,然后从管道中读取所有数据并写入标准输出。通过对父母进行 strace 验证。它在 epoll_wait 中阻塞。

请查看 github 中的 README 更多信息

家长

#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <error.h>
#include <errno.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/epoll.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NAMED_FIFO "aFifo"

static void set_nonblocking(int fd) {
  int flags = fcntl(fd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl()");
    return;
  }
  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
    perror("fcntl()");
  }
}

void errExit(char *msg) {
  perror(msg);
  exit(-1);
}

void printArgs(char **argv,char **env) {
  for(int i=0;argv[i];i++)
    printf("argv[%d]=%s\n",i,argv[i]);

  for(int i=0;env[i];i++)
    printf("env[%d]=%s\n",i,env[i]);
}

void PrintNos(short int max,char *name) {
  int fifo_fd,rVal;
  int bSize=2;
  char buffer[bSize];

  fifo_fd = open(NAMED_FIFO,O_RDONLY);
  if(fifo_fd<0)
    errExit("open");

  for(short int i=0;i<max;i++) {
    rVal = read(fifo_fd,buffer,bSize);
    if(rVal != bSize)
      errExit("read");
    printf("%03d\n",i);
  }
}

int main(int argc, char *argv[],char *env[]) {
  //int pipe_fds_child_stdin[2] ;
  int pipe_fds_child_stdout[2] ;
  pid_t child_id ;

  //if( pipe(pipe_fds_child_stdin) < 0 )
  //  errExit("pipe");

  if( pipe(pipe_fds_child_stdout) < 0 )
    errExit("pipe");

  child_id = fork();

  if( child_id > 0 ) {
    const int MAX_POLL_FDS = 2;
    const int BUF_SIZE = 4;

    size_t readSize;
    char buf[BUF_SIZE];
    int status;

    int epoll_fd;
    int nfds ;
    struct epoll_event e_e, e_events[MAX_POLL_FDS];

    memset(e_events,'\0',sizeof(e_events));
    memset(&e_e,'\0',sizeof(e_e));
    //close(pipe_fds_child_stdin[0]);
    close(pipe_fds_child_stdout[1]);

    epoll_fd = epoll_create1(0);
    if(epoll_fd < 0)
      errExit("epoll_create1");

    e_e.data.fd = pipe_fds_child_stdout[0];
    e_e.events  = EPOLLIN;

    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fds_child_stdout[0], &e_e) < 0)
      errExit("epoll_ctl");

    while(1) {
      nfds = epoll_wait(epoll_fd, e_events,MAX_POLL_FDS,-1);
      if( nfds < 0)
        errExit("epoll_wait");

      for(int i=0;i<nfds;i++) {
        if( e_events[i].data.fd == pipe_fds_child_stdout[0]) {
          if( e_events[i].events & EPOLLIN) {
            readSize = read(pipe_fds_child_stdout[0],buf,BUF_SIZE);
            if( readSize == BUF_SIZE ) {
              write(STDOUT_FILENO,buf,BUF_SIZE);
            } else if(readSize == 0) { // eof
              errExit("readSize=0");
            } else {
              errExit("read");
            }
          } else if( e_events[i].events & EPOLLHUP) {
            printf("got EPOLLHUP on pipefd\n");
            wait(&status);
            exit(0);
          } else {
            errExit("Unexpected event flag returned by epoll_wait on waited fd");
          }
        } else  {
          errExit("epoll_wait returned non-awaited fd");
        }
      }
    }
  } else if( child_id == 0 ) {
    close(0);
    close(1);
    //close(pipe_fds_child_stdin[1]);
    close(pipe_fds_child_stdout[0]);

    //dup2(pipe_fds_child_stdin[0],0);
    dup2(pipe_fds_child_stdout[1],1);

    execvpe(argv[1],&(argv[1]),env);
    //PrintNos(100,"P");
    //errExit("execvp");
  } else {
    errExit("fork");
  }
}

孩子

import sys
import time
import os
#f=open("aFifo",'r')
for x in range(10):
    #try:
    #    val = f.read(2)
    #except Exception as e:
    #    raise 
    time.sleep(1)
    print(f'{x:03d}')

标签: pythonclinuxsystem-callsepoll

解决方案


这是由于 python 缓冲,可以通过将 -u 选项传递给 python 来禁用它。


经过大量搜索和研究,了解到这是由于管道缓冲区造成的。尽管客户端写入,但它位于管道缓冲区中。只有在管道缓冲区已满后,内核才会在该描述符上发送就绪事件。最小值是页面大小,内核不允许设置低于该值。但是可以增加。通过从 epoll 更改为 poll/select 来解决这个问题。更改为轮询/选择后,行为相同。尽管数据在管道中可用,但阻塞。

import fcntl
import os

F_SETPIPE_SZ=1031
fds = os.pipe()
for i in range(5):
    print(fcntl.fcntl(fds[0],F_SETPIPE_SZ,64))


$ python3.7 pipePageSize.py 
4096
4096

这是修改后的客户端。服务器也进行了适当的更改。

import time

pageSize=1024*8

for x in range(100):
    time.sleep(0.5)
    print(f'{x:-{pageSize}d}')

推荐阅读