c - SOCK_SEQPACKET Unix Socket 上的空包
问题描述
我在 Unix 套接字上使用 SOCK_SEQPACKET 类型。
我用来阅读的代码是经典的
ssize_t recv_size = recv(sd, buffer, sizeof(buffer), 0);
if (recv_size < 0) {
handle_error("recv", errno);
} else if (recv_size > 0) {
handle_packet(buffer, recv_size);
} else {
// recv_size == 0 => peer closed socket.
handle_end_of_stream();
}
虽然这很好用,但我注意到它无法区分套接字闭包和大小为 0 的消息。换句话说,如果在另一端我发出如下调用序列:
send(sd, "hello", strlen("hello"), 0);
send(sd, "", 0, 0);
send(sd, "world", strlen("world"), 0);
…读者只会接收"hello"
到带有套接字闭包的第二条消息并对其做出反应,"world"
完全错过了该消息。
我想知道是否有某种方法可以消除这两种情况之间的歧义。
解决方案
正如我在评论中提到的,零长度 seqpackets(以及零长度数据报)的行为可能很奇怪,通常被误认为是断开连接;因此,我绝对建议不要出于任何目的使用零长度的 seqpackets 或数据报。
为了说明主要问题并探索细节,我创建了两个测试程序。首先是receive.c,它侦听 Unix 域 seqpacket 套接字,接受一个连接,并描述它接收到的内容:
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <string.h>
#include <poll.h>
#include <time.h>
#include <stdio.h>
#include <errno.h>
static volatile sig_atomic_t done = 0;
static void handle_done(int signum)
{
done = 1;
}
static int install_done(int signum)
{
struct sigaction act;
memset(&act, 0, sizeof act);
sigemptyset(&act.sa_mask);
act.sa_handler = handle_done;
act.sa_flags = 0;
if (sigaction(signum, &act, NULL) == -1)
return errno;
return 0;
}
static inline unsigned int digit(const int c)
{
switch (c) {
case '0': return 0;
case '1': return 1;
case '2': return 2;
case '3': return 3;
case '4': return 4;
case '5': return 5;
case '6': return 6;
case '7': return 7;
case '8': return 8;
case '9': return 9;
case 'A': case 'a': return 10;
case 'B': case 'b': return 11;
case 'C': case 'c': return 12;
case 'D': case 'd': return 13;
case 'E': case 'e': return 14;
case 'F': case 'f': return 15;
default: return 16;
}
}
static inline unsigned int octbyte(const char *src)
{
if (src) {
const unsigned int o0 = digit(src[0]);
if (o0 < 4) {
const unsigned int o1 = digit(src[1]);
if (o1 < 8) {
const unsigned int o2 = digit(src[2]);
if (o2 < 8)
return o0*64 + o1*8 + o2;
}
}
}
return 256;
}
static inline unsigned int hexbyte(const char *src)
{
if (src) {
const unsigned int hi = digit(src[0]);
if (hi < 16) {
const unsigned int lo = digit(src[1]);
if (lo < 16)
return 16*hi + lo;
}
}
return 256;
}
size_t set_unix_path(const char *src, struct sockaddr_un *addr)
{
char *dst = addr->sun_path;
char *const end = addr->sun_path + sizeof (addr->sun_path) - 1;
unsigned int byte;
if (!src || !addr)
return 0;
memset(addr, 0, sizeof *addr);
addr->sun_family = AF_UNIX;
while (*src && dst < end)
if (*src == '\\')
switch (*(++src)) {
case '0':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else {
*(dst++) = '\0';
src++;
}
break;
case '1':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case '2':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case '3':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case 'x':
byte = hexbyte(src + 1);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case 'a': *(dst++) = '\a'; src++; break;
case 'b': *(dst++) = '\b'; src++; break;
case 't': *(dst++) = '\t'; src++; break;
case 'n': *(dst++) = '\n'; src++; break;
case 'v': *(dst++) = '\v'; src++; break;
case 'f': *(dst++) = '\f'; src++; break;
case 'r': *(dst++) = '\r'; src++; break;
case '\\': *(dst++) = '\\'; src++; break;
default: *(dst++) = '\\';
}
else
*(dst++) = *(src++);
*(dst++) = '\0';
return (size_t)(dst - (char *)addr);
}
int main(int argc, char *argv[])
{
struct sockaddr_un addr, conn;
socklen_t addrlen, connlen;
int socketfd, connfd;
if (argc != 2 || !argv[1][0] || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s SOCKET_PATH\n", argv[0]);
fprintf(stderr, "\n");
return EXIT_FAILURE;
}
if (install_done(SIGINT) ||
install_done(SIGHUP) ||
install_done(SIGTERM)) {
fprintf(stderr, "Cannot install signal handlers: %s.\n", strerror(errno));
return EXIT_FAILURE;
}
socketfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (socketfd == -1) {
fprintf(stderr, "Cannot create an Unix domain seqpacket socket: %s.\n", strerror(errno));
return EXIT_FAILURE;
}
addrlen = set_unix_path(argv[1], &addr);
if (bind(socketfd, (const struct sockaddr *)&addr, addrlen) == -1) {
fprintf(stderr, "Cannot bind to %s: %s.\n", argv[1], strerror(errno));
close(socketfd);
return EXIT_FAILURE;
}
if (listen(socketfd, 1) == -1) {
fprintf(stderr, "Cannot listen for incoming connections: %s.\n", strerror(errno));
close(socketfd);
return EXIT_FAILURE;
}
memset(&conn, 0, sizeof conn);
connlen = sizeof conn;
connfd = accept(socketfd, (struct sockaddr *)&conn, &connlen);
if (connfd == -1) {
close(socketfd);
fprintf(stderr, "Canceled.\n");
return EXIT_SUCCESS;
}
if (connlen > 0)
fprintf(stderr, "Connected, peer address size is %d.\n", (int)connlen);
else
fprintf(stderr, "Connected; no peer address.\n");
while (!done) {
char buffer[65536];
ssize_t n;
int r;
n = recv(connfd, buffer, sizeof buffer, 0);
if (n > 0)
fprintf(stderr, "Received %zd bytes.\n", n);
else
if (n == 0) {
struct pollfd fds[1];
fds[0].fd = connfd;
fds[0].events = 0;
fds[0].revents = 0;
r = poll(fds, 1, 0);
if (r > 0 && (fds[0].revents & POLLHUP)) {
fprintf(stderr, "Disconnected (revents = %d).\n", fds[0].revents);
break;
} else
if (r > 0)
fprintf(stderr, "recv() == 0, poll() == %d, revents == %d\n", r, fds[0].revents);
else
if (r == 0)
fprintf(stderr, "Received a zero-byte seqpacket.\n");
else
fprintf(stderr, "recv() == 0, poll() == %d, revents == %d\n", r, fds[0].revents);
}
}
close(connfd);
close(socketfd);
return EXIT_SUCCESS;
}
您可以使用例如编译上述内容gcc -Wall -O2 receive.c -o receive
。要运行,给它一个 Unix 域地址来监听。在 Linux 中,您可以通过添加地址来使用抽象命名空间\0
;例如,通过运行./receive '\0example'
. 否则,套接字地址将在文件系统中可见,并且您需要在使用相同的套接字地址再次rm
运行之前将其删除(就像它是一个文件一样,使用 ) 。./receive
我们还需要一个实用程序来发送 seqpackets。下面的send.c非常相似(重用了许多相同的代码)。您指定要连接的 Unix 域地址,以及一个或多个 seqpacket 长度。您还可以指定以毫秒为单位的延迟(只需添加一个-
; 即,负整数是以毫秒为单位的延迟):
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <string.h>
#include <poll.h>
#include <time.h>
#include <stdio.h>
#include <errno.h>
static volatile sig_atomic_t done = 0;
static void handle_done(int signum)
{
done = 1;
}
static int install_done(int signum)
{
struct sigaction act;
memset(&act, 0, sizeof act);
sigemptyset(&act.sa_mask);
act.sa_handler = handle_done;
act.sa_flags = 0;
if (sigaction(signum, &act, NULL) == -1)
return errno;
return 0;
}
static inline unsigned int digit(const int c)
{
switch (c) {
case '0': return 0;
case '1': return 1;
case '2': return 2;
case '3': return 3;
case '4': return 4;
case '5': return 5;
case '6': return 6;
case '7': return 7;
case '8': return 8;
case '9': return 9;
case 'A': case 'a': return 10;
case 'B': case 'b': return 11;
case 'C': case 'c': return 12;
case 'D': case 'd': return 13;
case 'E': case 'e': return 14;
case 'F': case 'f': return 15;
default: return 16;
}
}
static inline unsigned int octbyte(const char *src)
{
if (src) {
const unsigned int o0 = digit(src[0]);
if (o0 < 4) {
const unsigned int o1 = digit(src[1]);
if (o1 < 8) {
const unsigned int o2 = digit(src[2]);
if (o2 < 8)
return o0*64 + o1*8 + o2;
}
}
}
return 256;
}
static inline unsigned int hexbyte(const char *src)
{
if (src) {
const unsigned int hi = digit(src[0]);
if (hi < 16) {
const unsigned int lo = digit(src[1]);
if (lo < 16)
return 16*hi + lo;
}
}
return 256;
}
size_t set_unix_path(const char *src, struct sockaddr_un *addr)
{
char *dst = addr->sun_path;
char *const end = addr->sun_path + sizeof (addr->sun_path) - 1;
unsigned int byte;
if (!src || !addr)
return 0;
memset(addr, 0, sizeof *addr);
addr->sun_family = AF_UNIX;
while (*src && dst < end)
if (*src == '\\')
switch (*(++src)) {
case '0':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else {
*(dst++) = '\0';
src++;
}
break;
case '1':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case '2':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case '3':
byte = octbyte(src);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case 'x':
byte = hexbyte(src + 1);
if (byte < 256) {
*(dst++) = byte;
src += 3;
} else
*(dst++) = '\\';
break;
case 'a': *(dst++) = '\a'; src++; break;
case 'b': *(dst++) = '\b'; src++; break;
case 't': *(dst++) = '\t'; src++; break;
case 'n': *(dst++) = '\n'; src++; break;
case 'v': *(dst++) = '\v'; src++; break;
case 'f': *(dst++) = '\f'; src++; break;
case 'r': *(dst++) = '\r'; src++; break;
case '\\': *(dst++) = '\\'; src++; break;
default: *(dst++) = '\\';
}
else
*(dst++) = *(src++);
*(dst++) = '\0';
return (size_t)(dst - (char *)addr);
}
static inline long sleep_ms(const long ms)
{
struct timespec t;
if (ms > 0) {
t.tv_sec = ms / 1000;
t.tv_nsec = (ms % 1000) * 1000000;
if (nanosleep(&t, &t) == -1 && errno == EINTR)
return 1000 * (unsigned long)(t.tv_sec)
+ (unsigned long)(t.tv_nsec / 1000000);
return 0;
} else
return ms;
}
static int parse_long(const char *src, long *dst)
{
const char *end = src;
long val;
if (!src || !*src)
return errno = EINVAL;
errno = 0;
val = strtol(src, (char **)&end, 0);
if (errno)
return errno;
if (end == src)
return errno = EINVAL;
while (*end == '\t' || *end == '\n' || *end == '\v' ||
*end == '\f' || *end == '\r' || *end == ' ')
end++;
if (*end)
return errno = EINVAL;
if (dst)
*dst = val;
return 0;
}
int main(int argc, char *argv[])
{
char buffer[65536];
struct sockaddr_un conn;
socklen_t connlen;
int connfd, arg;
ssize_t n;
long val, left;
if (argc < 3 || !argv[1][0] || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s SOCKET_PATH [ LEN | -MS ] ...\n", argv[0]);
fprintf(stderr, "\n");
fprintf(stderr, "All arguments except the first one, SOCKET_PATH, are integers.\n");
fprintf(stderr, "A positive integer causes a seqpacket of that length to be sent,\n");
fprintf(stderr, "a negative value causes a delay (magnitude in milliseconds).\n");
fprintf(stderr, "\n");
return EXIT_FAILURE;
}
if (install_done(SIGINT) ||
install_done(SIGHUP) ||
install_done(SIGTERM)) {
fprintf(stderr, "Cannot install signal handlers: %s.\n", strerror(errno));
return EXIT_FAILURE;
}
/* Fill buffer with some data. Anything works. */
{
size_t i = sizeof buffer;
while (i-->0)
buffer[i] = (i*i) ^ i;
}
connfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (connfd == -1) {
fprintf(stderr, "Cannot create an Unix domain seqpacket socket: %s.\n", strerror(errno));
return EXIT_FAILURE;
}
connlen = set_unix_path(argv[1], &conn);
if (connect(connfd, (const struct sockaddr *)&conn, connlen) == -1) {
fprintf(stderr, "Cannot connect to %s: %s.\n", argv[1], strerror(errno));
close(connfd);
return EXIT_FAILURE;
}
/* To avoid output affecting the timing, fully buffer stdout. */
setvbuf(stdout, NULL, _IOFBF, 65536);
for (arg = 2; arg < argc; arg++)
if (parse_long(argv[arg], &val)) {
fprintf(stderr, "%s: Not an integer.\n", argv[arg]);
close(connfd);
return EXIT_FAILURE;
} else
if (val > (long)sizeof buffer) {
fprintf(stderr, "%s: Seqpacket size too large. Current limit is %zu.\n", argv[arg], sizeof buffer);
close(connfd);
return EXIT_FAILURE;
} else
if (val >= 0) {
n = send(connfd, buffer, (size_t)val, 0);
if (n == (ssize_t)val)
printf("Sent %ld-byte seqpacket successfully.\n", val);
else
if (n != (ssize_t)val && n >= 0)
fprintf(stderr, "Sent %zd bytes of a %ld-byte seqpacket.\n", n, val);
else
if (n < -1) {
fprintf(stderr, "C library bug: send() returned %zd.\n", n);
close(connfd);
return EXIT_FAILURE;
} else
if (n == -1) {
fprintf(stderr, "Send failed: %s.\n", strerror(errno));
close(connfd);
return EXIT_FAILURE;
}
} else {
left = sleep_ms(-val);
if (left)
fprintf(stderr, "Slept %ld milliseconds (out of %ld ms).\n", -val-left, -val);
else
printf("Slept %ld milliseconds.\n", -val);
}
if (close(connfd) == -1) {
fprintf(stderr, "Error closing connection: %s.\n", strerror(errno));
return EXIT_FAILURE;
}
printf("All done, connection closed.\n");
fflush(stdout);
return EXIT_SUCCESS;
}
使用例如编译它gcc -Wall -O2 send.c -o send
。
对于测试,我建议您使用两个终端窗口。一个跑send
,另一个跑receive
。为简单起见,我将并排显示相应的命令和输出。运行它的机器是 Core i5 7200U 笔记本电脑 (HP EliteBook 830),运行 Ubuntu 16.04.4 LTS,64 位 Linux 内核版本 4.15.0-24-generic,并使用 GCC-5.4.0 20160609 (5.4 .0-6ubuntu1~16.04.10) 和上述命令 ( gcc -Wall -O2
)。
当我们在最终发送之前使用一个小的延迟时,一切似乎都正常:
$ ./send '\0example' 1 0 3 0 0 -1 6 │ $ ./receive '\0example'
│ Connected, peer address size is 2.
Sent 1-byte seqpacket successfully. │ Received 1 bytes.
Sent 0-byte seqpacket successfully. │ Received a zero-byte seqpacket.
Sent 3-byte seqpacket successfully. │ Received 3 bytes.
Sent 0-byte seqpacket successfully. │ Received a zero-byte seqpacket.
Sent 0-byte seqpacket successfully. │ Received a zero-byte seqpacket.
Slept 1 milliseconds. │
Sent 6-byte seqpacket successfully. │ Received 6 bytes.
All done, connection closed. │ Disconnected (revents = 16).
但是,当发送者发送最后几个 seqpackets(从零长度的一个开始)时,中间没有任何延迟,我观察到:
$ ./send '\0example' 1 0 3 0 0 6 │ ./receive '\0example'
│ Connected, peer address size is 2.
Sent 1-byte seqpacket successfully. │ Received 1 bytes.
Sent 0-byte seqpacket successfully. │ Received a zero-byte seqpacket.
Sent 3-byte seqpacket successfully. │ Received 3 bytes.
Sent 0-byte seqpacket successfully. │
Sent 0-byte seqpacket successfully. │
Sent 6-byte seqpacket successfully. │
All done, connection closed. │ Disconnected (revents = 16).
查看两个零字节 seqpacket 和 6 字节 seqpacket 是如何丢失的(因为poll()
返回revents == POLLHUP
.(POLLHUP
== 0x0010 == 16,因此两次都没有设置其他标志。)
我个人不确定这是否是一个错误。在我看来,这只是表明使用零长度的 seqpackets 是有问题的,应该避免。
(上面的peer地址长度为2,因为sender没有绑定任何地址,因此使用了一个未命名的Unix域socket地址(如man unix
man page中描述的)。我觉得不重要,但是我留下了以防万一。)
讨论了手头问题的一种可能解决方案,通过MSG_EOR
(因为recvmsg()
应该添加MSG_EOR
到结构中的msg_flags
字段msghdr
;并且由于它“应该为所有 seqpackets 设置”,即使是零长度的,这将是一种可靠的方法在 2007 年 5 月的 Linux 内核邮件列表(和 linux-netdev 列表)中检测来自输入端/读取端关闭/断开的零长度 seqpackets。(Marc.info 的存档线程在这里)但是,MSG_EOR
根据最初的发布者 Sam Kumar 的说法,在 Linux Unix 域 seqpacket 套接字中,没有设置也没有通过。讨论没有任何结果。当我读到它时,没有人知道预期的行为应该是什么。
查看Unix 域套接字的 Linux 内核更改日志,自该线程以来也没有任何相关更改(截至 2018 年 7 月 23 日)。
以上程序是一口气写的,没有复习;因此,他们很容易在其中包含错误或思想。如果您注意到任何或获得非常不同的结果(但请注意基于时间的效果有时难以复制),请在评论中告诉我,以便我检查并在必要时进行修复。
推荐阅读
- python - Django 异常
- powershell - PowerShell - 基于比较 2 个对象并查找不匹配项
- anylogic - 从多个来源中选择输入
- python - pandas pct_change(),按国家分组数据索引日期给出不正确的结果
- c# - 如何使用 Firebase、C# Visual Studio WINFORMS 登录
- r - 滚动期回报:1 年、3 年和 5 年业绩的数量
- r - 在R中的for循环中使用双括号与单括号时出错
- java - 移除元素列表
在恒定时间内在 HashMap 内部 - mysql - MySQL选择列的值作为
- android - 在android中将日期格式转换为不同的日期格式