python - Linux epoll 系统调用,等待数据可用
问题描述
使用简单的父子程序测试 Linux 系统调用epoll 。
预期行为
当孩子每秒写一个 no 时,父母应该从管道中读取它并每秒写一个 no 到 stdout。
实际行为
父母等到孩子写完所有的号码,然后从管道中读取所有数据并写入标准输出。通过对父母进行 strace 验证。它在 epoll_wait 中阻塞。
家长
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <error.h>
#include <errno.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define NAMED_FIFO "aFifo"
static void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl()");
return;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl()");
}
}
void errExit(char *msg) {
perror(msg);
exit(-1);
}
void printArgs(char **argv,char **env) {
for(int i=0;argv[i];i++)
printf("argv[%d]=%s\n",i,argv[i]);
for(int i=0;env[i];i++)
printf("env[%d]=%s\n",i,env[i]);
}
void PrintNos(short int max,char *name) {
int fifo_fd,rVal;
int bSize=2;
char buffer[bSize];
fifo_fd = open(NAMED_FIFO,O_RDONLY);
if(fifo_fd<0)
errExit("open");
for(short int i=0;i<max;i++) {
rVal = read(fifo_fd,buffer,bSize);
if(rVal != bSize)
errExit("read");
printf("%03d\n",i);
}
}
int main(int argc, char *argv[],char *env[]) {
//int pipe_fds_child_stdin[2] ;
int pipe_fds_child_stdout[2] ;
pid_t child_id ;
//if( pipe(pipe_fds_child_stdin) < 0 )
// errExit("pipe");
if( pipe(pipe_fds_child_stdout) < 0 )
errExit("pipe");
child_id = fork();
if( child_id > 0 ) {
const int MAX_POLL_FDS = 2;
const int BUF_SIZE = 4;
size_t readSize;
char buf[BUF_SIZE];
int status;
int epoll_fd;
int nfds ;
struct epoll_event e_e, e_events[MAX_POLL_FDS];
memset(e_events,'\0',sizeof(e_events));
memset(&e_e,'\0',sizeof(e_e));
//close(pipe_fds_child_stdin[0]);
close(pipe_fds_child_stdout[1]);
epoll_fd = epoll_create1(0);
if(epoll_fd < 0)
errExit("epoll_create1");
e_e.data.fd = pipe_fds_child_stdout[0];
e_e.events = EPOLLIN;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fds_child_stdout[0], &e_e) < 0)
errExit("epoll_ctl");
while(1) {
nfds = epoll_wait(epoll_fd, e_events,MAX_POLL_FDS,-1);
if( nfds < 0)
errExit("epoll_wait");
for(int i=0;i<nfds;i++) {
if( e_events[i].data.fd == pipe_fds_child_stdout[0]) {
if( e_events[i].events & EPOLLIN) {
readSize = read(pipe_fds_child_stdout[0],buf,BUF_SIZE);
if( readSize == BUF_SIZE ) {
write(STDOUT_FILENO,buf,BUF_SIZE);
} else if(readSize == 0) { // eof
errExit("readSize=0");
} else {
errExit("read");
}
} else if( e_events[i].events & EPOLLHUP) {
printf("got EPOLLHUP on pipefd\n");
wait(&status);
exit(0);
} else {
errExit("Unexpected event flag returned by epoll_wait on waited fd");
}
} else {
errExit("epoll_wait returned non-awaited fd");
}
}
}
} else if( child_id == 0 ) {
close(0);
close(1);
//close(pipe_fds_child_stdin[1]);
close(pipe_fds_child_stdout[0]);
//dup2(pipe_fds_child_stdin[0],0);
dup2(pipe_fds_child_stdout[1],1);
execvpe(argv[1],&(argv[1]),env);
//PrintNos(100,"P");
//errExit("execvp");
} else {
errExit("fork");
}
}
孩子
import sys
import time
import os
#f=open("aFifo",'r')
for x in range(10):
#try:
# val = f.read(2)
#except Exception as e:
# raise
time.sleep(1)
print(f'{x:03d}')
解决方案
这是由于 python 缓冲,可以通过将 -u 选项传递给 python 来禁用它。
经过大量搜索和研究,了解到这是由于管道缓冲区造成的。尽管客户端写入,但它位于管道缓冲区中。只有在管道缓冲区已满后,内核才会在该描述符上发送就绪事件。最小值是页面大小,内核不允许设置低于该值。但是可以增加。通过从 epoll 更改为 poll/select 来解决这个问题。更改为轮询/选择后,行为相同。尽管数据在管道中可用,但阻塞。
import fcntl
import os
F_SETPIPE_SZ=1031
fds = os.pipe()
for i in range(5):
print(fcntl.fcntl(fds[0],F_SETPIPE_SZ,64))
$ python3.7 pipePageSize.py
4096
4096
这是修改后的客户端。服务器也进行了适当的更改。
import time
pageSize=1024*8
for x in range(100):
time.sleep(0.5)
print(f'{x:-{pageSize}d}')
推荐阅读
- apache-spark - PySpark UDF 用于转换 UTM 错误预期零参数用于构造 ClassDict(用于 numpy.dtype)
- javascript - 如何在deck.gl的iconlayer中使用svg
- bash - 将先前输入的命令插入文件
- c++ - 你如何打包 GCC 进行分发?
- php - 如何在php中将两个或三个数组连接在一起?
- python - get_currency_exchange_daily 的 alphavantage XAU
- mysql - 如何使用 msql 数据库在 VB.Net 中使用组合框过滤数据
- python - Opencv MedianBlur 是如何工作的?
- javascript - Word js:matchCase 搜索选项未按预期工作
- python - 如何为 AWS RDS 凭证创建配置文件并将其导入我的 AWS Lambda API?