首页 > 解决方案 > SOCK_SEQPACKET Unix Socket 上的空包

问题描述

我在 Unix 套接字上使用 SOCK_SEQPACKET 类型。

我用来阅读的代码是经典的

ssize_t recv_size = recv(sd, buffer, sizeof(buffer), 0);
if (recv_size < 0) {
    handle_error("recv", errno);
} else if (recv_size > 0) {
    handle_packet(buffer, recv_size);
} else {
    // recv_size == 0 => peer closed socket.
    handle_end_of_stream();
}

虽然这很好用,但我注意到它无法区分套接字闭包和大小为 0 的消息。换句话说,如果在另一端我发出如下调用序列:

send(sd, "hello", strlen("hello"), 0);
send(sd, "", 0, 0);
send(sd, "world", strlen("world"), 0);

…读者只会接收"hello"到带有套接字闭包的第二条消息并对其做出反应,"world"完全错过了该消息。

我想知道是否有某种方法可以消除这两种情况之间的歧义。

标签: cposixunix-socket

解决方案


正如我在评论中提到的,零长度 seqpackets(以及零长度数据报)的行为可能很奇怪,通常被误认为是断开连接;因此,我绝对建议不要出于任何目的使用零长度的 seqpackets 或数据报。

为了说明主要问题并探索细节,我创建了两个测试程序。首先是receive.c,它侦听 Unix 域 seqpacket 套接字,接受一个连接,并描述它接收到的内容:

#define  _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <string.h>
#include <poll.h>
#include <time.h>
#include <stdio.h>
#include <errno.h>

static volatile sig_atomic_t  done = 0;

static void handle_done(int signum)
{
    done = 1;
}

static int install_done(int signum)
{
    struct sigaction  act;

    memset(&act, 0, sizeof act);
    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done;
    act.sa_flags = 0;
    if (sigaction(signum, &act, NULL) == -1)
        return errno;

    return 0;
}

static inline unsigned int digit(const int c)
{
    switch (c) {
    case '0': return 0;
    case '1': return 1;
    case '2': return 2;
    case '3': return 3;
    case '4': return 4;
    case '5': return 5;
    case '6': return 6;
    case '7': return 7;
    case '8': return 8;
    case '9': return 9;
    case 'A': case 'a': return 10;
    case 'B': case 'b': return 11;
    case 'C': case 'c': return 12;
    case 'D': case 'd': return 13;
    case 'E': case 'e': return 14;
    case 'F': case 'f': return 15;
    default:  return 16;
    }
}

static inline unsigned int octbyte(const char *src)
{
    if (src) {
        const unsigned int  o0 = digit(src[0]);
        if (o0 < 4) {
            const unsigned int  o1 = digit(src[1]);
            if (o1 < 8) {
                const unsigned int  o2 = digit(src[2]);
                if (o2 < 8)
                    return o0*64 + o1*8 + o2;
            }
        }
    }
    return 256;
}

static inline unsigned int hexbyte(const char *src)
{
    if (src) {
        const unsigned int  hi = digit(src[0]);
        if (hi < 16) {
            const unsigned int  lo = digit(src[1]);
            if (lo < 16)
                return 16*hi + lo;
        }
    }
    return 256;
}

size_t set_unix_path(const char *src, struct sockaddr_un *addr)
{
    char         *dst = addr->sun_path;
    char *const   end = addr->sun_path + sizeof (addr->sun_path) - 1;
    unsigned int  byte;

    if (!src || !addr)
        return 0;

    memset(addr, 0, sizeof *addr);
    addr->sun_family = AF_UNIX;
    while (*src && dst < end)
        if (*src == '\\')
            switch (*(++src)) {
            case '0':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else {
                    *(dst++) = '\0';
                    src++;
                }
                break;
            case '1':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case '2':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case '3':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case 'x':
                byte = hexbyte(src + 1);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case 'a':  *(dst++) = '\a'; src++; break;
            case 'b':  *(dst++) = '\b'; src++; break;
            case 't':  *(dst++) = '\t'; src++; break;
            case 'n':  *(dst++) = '\n'; src++; break;
            case 'v':  *(dst++) = '\v'; src++; break;
            case 'f':  *(dst++) = '\f'; src++; break;
            case 'r':  *(dst++) = '\r'; src++; break;
            case '\\': *(dst++) = '\\'; src++; break;
            default:   *(dst++) = '\\';
            }
        else
            *(dst++) = *(src++);

    *(dst++) = '\0';

    return (size_t)(dst - (char *)addr);
}


int main(int argc, char *argv[])
{
    struct sockaddr_un  addr, conn;
    socklen_t           addrlen, connlen;
    int                 socketfd, connfd;

    if (argc != 2 || !argv[1][0] || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
        fprintf(stderr, "       %s SOCKET_PATH\n", argv[0]);
        fprintf(stderr, "\n");
        return EXIT_FAILURE;
    }

    if (install_done(SIGINT) ||
        install_done(SIGHUP) ||
        install_done(SIGTERM)) {
        fprintf(stderr, "Cannot install signal handlers: %s.\n", strerror(errno));
        return EXIT_FAILURE;
    }

    socketfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
    if (socketfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain seqpacket socket: %s.\n", strerror(errno));
        return EXIT_FAILURE;
    }

    addrlen = set_unix_path(argv[1], &addr);
    if (bind(socketfd, (const struct sockaddr *)&addr, addrlen) == -1) {
        fprintf(stderr, "Cannot bind to %s: %s.\n", argv[1], strerror(errno));
        close(socketfd);
        return EXIT_FAILURE;
    }

    if (listen(socketfd, 1) == -1) {
        fprintf(stderr, "Cannot listen for incoming connections: %s.\n", strerror(errno));
        close(socketfd);
        return EXIT_FAILURE;
    }

    memset(&conn, 0, sizeof conn);
    connlen = sizeof conn;
    connfd = accept(socketfd, (struct sockaddr *)&conn, &connlen);
    if (connfd == -1) {
        close(socketfd);
        fprintf(stderr, "Canceled.\n");
        return EXIT_SUCCESS;
    }

    if (connlen > 0)
        fprintf(stderr, "Connected, peer address size is %d.\n", (int)connlen);
    else
        fprintf(stderr, "Connected; no peer address.\n");

    while (!done) {
        char     buffer[65536];
        ssize_t  n;
        int      r;

        n = recv(connfd, buffer, sizeof buffer, 0);
        if (n > 0)
            fprintf(stderr, "Received %zd bytes.\n", n);
        else
        if (n == 0) {
            struct pollfd  fds[1];

            fds[0].fd = connfd;
            fds[0].events = 0;
            fds[0].revents = 0;
            r = poll(fds, 1, 0);
            if (r > 0 && (fds[0].revents & POLLHUP)) {
                fprintf(stderr, "Disconnected (revents = %d).\n", fds[0].revents);
                break;
            } else
            if (r > 0)
                fprintf(stderr, "recv() == 0, poll() == %d, revents == %d\n", r, fds[0].revents); 
            else
            if (r == 0)
                fprintf(stderr, "Received a zero-byte seqpacket.\n");
            else
                fprintf(stderr, "recv() == 0, poll() == %d, revents == %d\n", r, fds[0].revents);
        }
    }

    close(connfd);
    close(socketfd);
    return EXIT_SUCCESS;
}

您可以使用例如编译上述内容gcc -Wall -O2 receive.c -o receive。要运行,给它一个 Unix 域地址来监听。在 Linux 中,您可以通过添加地址来使用抽象命名空间\0;例如,通过运行./receive '\0example'. 否则,套接字地址将在文件系统中可见,并且您需要在使用相同的套接字地址再次rm运行之前将其删除(就像它是一个文件一样,使用 ) 。./receive

我们还需要一个实用程序来发送 seqpackets。下面的send.c非常相似(重用了许多相同的代码)。您指定要连接的 Unix 域地址,以及一个或多个 seqpacket 长度。您还可以指定以毫秒为单位的延迟(只需添加一个-; 即,负整数是以毫秒为单位的延迟):

#define  _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <string.h>
#include <poll.h>
#include <time.h>
#include <stdio.h>
#include <errno.h>

static volatile sig_atomic_t  done = 0;

static void handle_done(int signum)
{
    done = 1;
}

static int install_done(int signum)
{
    struct sigaction  act;

    memset(&act, 0, sizeof act);
    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done;
    act.sa_flags = 0;
    if (sigaction(signum, &act, NULL) == -1)
        return errno;

    return 0;
}

static inline unsigned int digit(const int c)
{
    switch (c) {
    case '0': return 0;
    case '1': return 1;
    case '2': return 2;
    case '3': return 3;
    case '4': return 4;
    case '5': return 5;
    case '6': return 6;
    case '7': return 7;
    case '8': return 8;
    case '9': return 9;
    case 'A': case 'a': return 10;
    case 'B': case 'b': return 11;
    case 'C': case 'c': return 12;
    case 'D': case 'd': return 13;
    case 'E': case 'e': return 14;
    case 'F': case 'f': return 15;
    default:  return 16;
    }
}

static inline unsigned int octbyte(const char *src)
{
    if (src) {
        const unsigned int  o0 = digit(src[0]);
        if (o0 < 4) {
            const unsigned int  o1 = digit(src[1]);
            if (o1 < 8) {
                const unsigned int  o2 = digit(src[2]);
                if (o2 < 8)
                    return o0*64 + o1*8 + o2;
            }
        }
    }
    return 256;
}

static inline unsigned int hexbyte(const char *src)
{
    if (src) {
        const unsigned int  hi = digit(src[0]);
        if (hi < 16) {
            const unsigned int  lo = digit(src[1]);
            if (lo < 16)
                return 16*hi + lo;
        }
    }
    return 256;
}

size_t set_unix_path(const char *src, struct sockaddr_un *addr)
{
    char         *dst = addr->sun_path;
    char *const   end = addr->sun_path + sizeof (addr->sun_path) - 1;
    unsigned int  byte;

    if (!src || !addr)
        return 0;

    memset(addr, 0, sizeof *addr);
    addr->sun_family = AF_UNIX;
    while (*src && dst < end)
        if (*src == '\\')
            switch (*(++src)) {
            case '0':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else {
                    *(dst++) = '\0';
                    src++;
                }
                break;
            case '1':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case '2':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case '3':
                byte = octbyte(src);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case 'x':
                byte = hexbyte(src + 1);
                if (byte < 256) {
                    *(dst++) = byte;
                    src += 3;
                } else
                    *(dst++) = '\\';
                break;
            case 'a':  *(dst++) = '\a'; src++; break;
            case 'b':  *(dst++) = '\b'; src++; break;
            case 't':  *(dst++) = '\t'; src++; break;
            case 'n':  *(dst++) = '\n'; src++; break;
            case 'v':  *(dst++) = '\v'; src++; break;
            case 'f':  *(dst++) = '\f'; src++; break;
            case 'r':  *(dst++) = '\r'; src++; break;
            case '\\': *(dst++) = '\\'; src++; break;
            default:   *(dst++) = '\\';
            }
        else
            *(dst++) = *(src++);

    *(dst++) = '\0';

    return (size_t)(dst - (char *)addr);
}

static inline long sleep_ms(const long ms)
{
    struct timespec  t;

    if (ms > 0) {
        t.tv_sec = ms / 1000;
        t.tv_nsec = (ms % 1000) * 1000000;
        if (nanosleep(&t, &t) == -1 && errno == EINTR)
            return 1000 * (unsigned long)(t.tv_sec)
                 + (unsigned long)(t.tv_nsec / 1000000);
        return 0;
    } else
        return ms;
}

static int parse_long(const char *src, long *dst)
{
    const char *end = src;
    long        val;

    if (!src || !*src)
        return errno = EINVAL;

    errno = 0;
    val = strtol(src, (char **)&end, 0);
    if (errno)
        return errno;

    if (end == src)
        return errno = EINVAL;

    while (*end == '\t' || *end == '\n' || *end == '\v' ||
           *end == '\f' || *end == '\r' || *end == ' ')
        end++;

    if (*end)
        return errno = EINVAL;

    if (dst)
        *dst = val;

    return 0;
}

int main(int argc, char *argv[])
{
    char                buffer[65536];
    struct sockaddr_un  conn;
    socklen_t           connlen;
    int                 connfd, arg;
    ssize_t             n;
    long                val, left;

    if (argc < 3 || !argv[1][0] || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
        fprintf(stderr, "       %s SOCKET_PATH [ LEN | -MS ] ...\n", argv[0]);
        fprintf(stderr, "\n");
        fprintf(stderr, "All arguments except the first one, SOCKET_PATH, are integers.\n");
        fprintf(stderr, "A positive integer causes a seqpacket of that length to be sent,\n");
        fprintf(stderr, "a negative value causes a delay (magnitude in milliseconds).\n");
        fprintf(stderr, "\n");
        return EXIT_FAILURE;
    }

    if (install_done(SIGINT) ||
        install_done(SIGHUP) ||
        install_done(SIGTERM)) {
        fprintf(stderr, "Cannot install signal handlers: %s.\n", strerror(errno));
        return EXIT_FAILURE;
    }

    /* Fill buffer with some data. Anything works. */
    {
        size_t  i = sizeof buffer;
        while (i-->0)
            buffer[i] = (i*i) ^ i;
    }

    connfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
    if (connfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain seqpacket socket: %s.\n", strerror(errno));
        return EXIT_FAILURE;
    }

    connlen = set_unix_path(argv[1], &conn);
    if (connect(connfd, (const struct sockaddr *)&conn, connlen) == -1) {
        fprintf(stderr, "Cannot connect to %s: %s.\n", argv[1], strerror(errno));
        close(connfd);
        return EXIT_FAILURE;
    }

    /* To avoid output affecting the timing, fully buffer stdout. */
    setvbuf(stdout, NULL, _IOFBF, 65536);

    for (arg = 2; arg < argc; arg++)
        if (parse_long(argv[arg], &val)) {
            fprintf(stderr, "%s: Not an integer.\n", argv[arg]);
            close(connfd);
            return EXIT_FAILURE;
        } else
        if (val > (long)sizeof buffer) {
            fprintf(stderr, "%s: Seqpacket size too large. Current limit is %zu.\n", argv[arg], sizeof buffer);
            close(connfd);
            return EXIT_FAILURE;
        } else
        if (val >= 0) {
            n = send(connfd, buffer, (size_t)val, 0);
            if (n == (ssize_t)val)
                printf("Sent %ld-byte seqpacket successfully.\n", val);
            else
            if (n != (ssize_t)val && n >= 0)
                fprintf(stderr, "Sent %zd bytes of a %ld-byte seqpacket.\n", n, val);
            else
            if (n < -1) {
                fprintf(stderr, "C library bug: send() returned %zd.\n", n);
                close(connfd);
                return EXIT_FAILURE;
            } else
            if (n == -1) {
                fprintf(stderr, "Send failed: %s.\n", strerror(errno));
                close(connfd);
                return EXIT_FAILURE;
            }
        } else {
            left = sleep_ms(-val);
            if (left)
                fprintf(stderr, "Slept %ld milliseconds (out of %ld ms).\n", -val-left, -val);
            else
                printf("Slept %ld milliseconds.\n", -val);
        }

    if (close(connfd) == -1) {
        fprintf(stderr, "Error closing connection: %s.\n", strerror(errno));
        return EXIT_FAILURE;
    }

    printf("All done, connection closed.\n"); 
    fflush(stdout);

    return EXIT_SUCCESS;
}

使用例如编译它gcc -Wall -O2 send.c -o send

对于测试,我建议您使用两个终端窗口。一个跑send,另一个跑receive。为简单起见,我将并排显示相应的命令和输出。运行它的机器是 Core i5 7200U 笔记本电脑 (HP EliteBook 830),运行 Ubuntu 16.04.4 LTS,64 位 Linux 内核版本 4.15.0-24-generic,并使用 GCC-5.4.0 20160609 (5.4 .0-6ubuntu1~16.04.10) 和上述命令 ( gcc -Wall -O2)。

当我们在最终发送之前使用一个小的延迟时,一切似乎都正常:

$ ./send '\0example' 1 0 3 0 0 -1 6   │   $ ./receive '\0example'
                                      │   Connected, peer address size is 2.
Sent 1-byte seqpacket successfully.   │   Received 1 bytes.
Sent 0-byte seqpacket successfully.   │   Received a zero-byte seqpacket.
Sent 3-byte seqpacket successfully.   │   Received 3 bytes.
Sent 0-byte seqpacket successfully.   │   Received a zero-byte seqpacket.
Sent 0-byte seqpacket successfully.   │   Received a zero-byte seqpacket.
Slept 1 milliseconds.                 │  
Sent 6-byte seqpacket successfully.   │   Received 6 bytes.
All done, connection closed.          │   Disconnected (revents = 16).

但是,当发送者发送最后几个 seqpackets(从零长度的一个开始)时,中间没有任何延迟,我观察到:

$ ./send '\0example' 1 0 3 0 0 6      │   ./receive '\0example'
                                      │   Connected, peer address size is 2.
Sent 1-byte seqpacket successfully.   │   Received 1 bytes.
Sent 0-byte seqpacket successfully.   │   Received a zero-byte seqpacket.
Sent 3-byte seqpacket successfully.   │   Received 3 bytes.
Sent 0-byte seqpacket successfully.   │   
Sent 0-byte seqpacket successfully.   │   
Sent 6-byte seqpacket successfully.   │   
All done, connection closed.          │  Disconnected (revents = 16).

查看两个零字节 seqpacket 和 6 字节 seqpacket 是如何丢失的(因为poll()返回revents == POLLHUP.(POLLHUP== 0x0010 == 16,因此两次都没有设置其他标志。)

我个人不确定这是否是一个错误。在我看来,这只是表明使用零长度的 seqpackets 是有问题的,应该避免。

(上面的peer地址长度为2,因为sender没有绑定任何地址,因此使用了一个未命名的Unix域socket地址(如man unixman page中描述的)。我觉得不重要,但是我留下了以防万一。)

讨论了手头问题的一种可能解决方案,通过MSG_EOR(因为recvmsg()应该添加MSG_EOR到结构中的msg_flags字段msghdr;并且由于它“应该为所有 seqpackets 设置”,即使是零长度的,这将是一种可靠的方法在 2007 年 5 月的 Linux 内核邮件列表(和 linux-netdev 列表)中检测来自输入端/读取端关闭/断开的零长度 seqpackets。(Marc.info 的存档线程在这里)但是,MSG_EOR根据最初的发布者 Sam Kumar 的说法,在 Linux Unix 域 seqpacket 套接字中,没有设置也没有通过。讨论没有任何结果。当我读到它时,没有人知道预期的行为应该是什么。

查看Unix 域套接字的 Linux 内核更改日志,自该线程以来也没有任何相关更改(截至 2018 年 7 月 23 日)。


以上程序是一口气写的,没有复习;因此,他们很容易在其中包含错误或思想。如果您注意到任何或获得非常不同的结果(但请注意基于时间的效果有时难以复制),请在评论中告诉我,以便我检查并在必要时进行修复。


推荐阅读