首页 > 解决方案 > 为什么Linux C串行只读在高CPU下工作?

问题描述

我正在使用以下代码从串行端口(在阻塞模式下)读取



    int fd;

    int res;
    res = open_port(O_RDONLY | O_NOCTTY, &fd);


    while (num_bytes_read < C_BUFFER_ENTRY_SIZE)
    {
        num_buf_byte_remaining = C_BUFFER_ENTRY_SIZE - num_bytes_read;      
        rc = read(fd, &buf[num_bytes_read], num_buf_byte_remaining);

        if (rc > 0)
        {
            num_bytes_read += rc;
        }

        if (rc == -1)
        {
            printf("Read error %s\n",strerror(errno));
        }
    }

我正在使用的端口是 /dev/ttyUSB0 (串行端口),它使用以下配置进行配置。

    // BAUDRATE B3000000
int open_port(int flags, int *fd)
{
    struct termios options;
    *fd = open("/dev/ttyUSB0", flags | O_NOCTTY);

    if(*fd < 0){
        printf("Error opening port\n");
        return 1;
    } else{
        printf("Port handle is %d\n", *fd);
    }

    options.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(*fd, TCIFLUSH);
    tcsetattr(*fd, TCSANOW, &options);

    return 0;
}

通过此串行链路传输的数据量非常高(大约 1-2Mbps),因此读取速度需要很快。

我看到的行为是,当系统正常运行时,读取循环非常频繁地丢失数据。但是,当我使用所有错误增加加载时nice -n 20 cat /dev/zero > /dev/null突然停止(我有一个标头 + 计数 + CRCpacket 结构,因此可以检查损坏和连续性)。

  1. 为什么增加 CPU 负载实际上会使这个循环的性能更好?
  2. 我是否需要更改终端设置以提高性能?(我尝试将 VMIN 设置为 0)

编辑

根据下面的评论,我将扩展上述内容。bufdata 变量是从串口读取的数据放置的地方。这个 while 循环一直持续到 buf 中有 C_BUFFER_ENTRY_SIZE (4096) 个字节。

在 while 循环中断后(因为字节数等于 4906)buf被添加到要从第二个线程读取的队列中。队列受互斥锁和条件变量保护,见下文。

/// function is after above while 1 loop
add_to_queue(buf,num_bytes_read);

// ring buff struct
typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t fill;
    pthread_cond_t empty;
    int s_icm_idx; // InComing Message index
    int s_ogm_idx; // OutGoing Message index
    int s_entry_count;
    int s_entry_count_hi;
    int used_bytes[10000];
    ring_buffer_data_t buffer[10000];
}ring_buffer_t;

typedef struct
{
    char data[4096];
}ring_buffer_data_t;


///
int add_to_queue(void* a_data, int a_bytes)
{
    int ret_val = 0;

    if (a_bytes > C_BUFFER_ENTRY_SIZE)
    {
        return 2;
    }

    pthread_mutex_lock(&(circ_buf.mutex));

    while (circ_buf.s_entry_count == 10000) 
    {
        ret_val = 1;
        pthread_cond_wait(&(circ_buf.empty), &(circ_buf.mutex));
    }

    /* Put the data on the queue, set how many bytes of the queue */
    /* are used and then incoming message index and the queue     */
    /* entry counter.                                             */
    memcpy(&(circ_buf.buffer[circ_buf.s_icm_idx]), a_data, a_bytes);
    circ_buf.used_bytes[circ_buf.s_icm_idx] = a_bytes;

    circ_buf.s_icm_idx = (circ_buf.s_icm_idx + 1) % 10000;
    crc_buf.s_entry_count++;

    pthread_cond_signal(&(circ_buf.fill));
    pthread_mutex_unlock(&(circ_buf.mutex));

    return ret_val;
}

第二个线程从这个队列中读取数据,然后使用一个简单的状态机解析每个 4906 字节的段。即循环遍历字节,直到找到标头字节,然后检查idx + 5以获取 CRC,如果 crc 有效,则在整个 6 个字节上执行 CRC,然后读取消息。

我看到从串行读取的损坏数据由 CRC 失败指示。当我加载 CPU 时,CRC 检查不会失败。而且因为标头和 CRC 之间的 4 个字节是一个计数,我可以看到没有消息被丢弃(连续计数)。

我相信当负载低时不是 CRC 检查错误。

编辑2:

完整的可编译代码

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <termio.h>
#include <linux/serial.h>
#include <assert.h>
#include <string.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>

#include <time.h>

//The width and depth of the queue, ie the max size message that can be placed on it.
#define C_BUFFER_ENTRY_SIZE 4096
#define C_FIFO_ENTRIES_MAX 10000

typedef struct
{
    char data[C_BUFFER_ENTRY_SIZE];
}ring_buffer_data_t;


typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t fill;
    pthread_cond_t empty;

    int s_icm_idx; // InComing Message index
    int s_ogm_idx; // OutGoing Message index
    int s_entry_count;
    int s_entry_count_hi;
    int used_bytes[C_FIFO_ENTRIES_MAX];
    ring_buffer_data_t buffer[C_FIFO_ENTRIES_MAX];
} ring_buffer_t;


static ring_buffer_t s_rb = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, 0, 0, {0}, {{{0}}}};
const uint8_t CRC_LUT[256] = {0, 94, 188, 226, 97, 63, 221, 131, 194, 156, 126, 32, 163, 253, 31, 65, 157, 195, 33, 127, 252, 162, 64, 30, 95, 1, 227, 189, 62, 96, 130, 220, 35, 125, 159, 193, 66, 28, 254, 160, 225, 191, 93, 3, 128, 222, 60, 98, 190, 224, 2, 92, 223, 129, 99, 61, 124, 34, 192, 158, 29, 67, 161, 255, 70, 24, 250, 164, 39, 121, 155, 197, 132, 218, 56, 102, 229, 187, 89, 7, 219, 133, 103, 57, 186, 228, 6, 88, 25, 71, 165, 251, 120, 38, 196, 154, 101, 59, 217, 135, 4, 90, 184, 230, 167, 249, 27, 69, 198, 152, 122, 36, 248, 166, 68, 26, 153, 199, 37, 123, 58, 100, 134, 216, 91, 5, 231, 185, 140, 210, 48, 110, 237, 179, 81, 15, 78, 16, 242, 172, 47, 113, 147, 205, 17, 79, 173, 243, 112, 46, 204, 146, 211, 141, 111, 49, 178, 236, 14, 80, 175, 241, 19, 77, 206, 144, 114, 44, 109, 51, 209, 143, 12, 82, 176, 238, 50, 108, 142, 208, 83, 13, 239, 177, 240, 174, 76, 18, 145, 207, 45, 115, 202, 148, 118, 40, 171, 245, 23, 73, 8, 86, 180, 234, 105, 55, 213, 139, 87, 9, 235, 181, 54, 104, 138, 212, 149, 203, 41, 119, 244, 170, 72, 22, 233, 183, 85, 11, 136, 214, 52, 106, 43, 117, 151, 201, 74, 20, 246, 168, 116, 42, 200, 150, 21, 75, 169, 247, 182, 232, 10, 84, 215, 137, 107, 53};
int id;

FILE *log_fh;

void *dequeue_and_log(void* fd)
{
    #define HDR_START 'X'

    // FILE *fd;

    //fd = fopen("log.log", "wb");
    uint8_t parse_buffer[2 * C_BUFFER_ENTRY_SIZE] = {0};
    int bytes_to_write = 0;
    while (1)
    {
        pthread_mutex_lock(&(s_rb.mutex));

        while (s_rb.s_entry_count == 0)
        {
            pthread_cond_wait(&(s_rb.fill), &(s_rb.mutex));
        }
        // Get the incoming data from the buffer
        bytes_to_write = s_rb.used_bytes[s_rb.s_ogm_idx];
        memcpy(parse_buffer, &(s_rb.buffer[s_rb.s_ogm_idx]),bytes_to_write);

        s_rb.s_ogm_idx = (s_rb.s_ogm_idx + 1) % C_FIFO_ENTRIES_MAX;
        s_rb.s_entry_count--;

        pthread_cond_signal(&(s_rb.empty));
        pthread_mutex_unlock(&(s_rb.mutex));
        // binary write
        fwrite(parse_buffer,bytes_to_write,1,(FILE*)fd);
    }
}

int add_to_queue(void* a_data, int a_bytes)
{
    int ret_val = 0;

    if (a_bytes > C_BUFFER_ENTRY_SIZE)
    {
        printf("add_to_queue oversized send %i\n", a_bytes);
        return 2;
    }

    pthread_mutex_lock(&(s_rb.mutex));  

    while (s_rb.s_entry_count == C_FIFO_ENTRIES_MAX) 
    {
        ret_val = 1;
        printf("Queue is FULL\n");
        pthread_cond_wait(&(s_rb.empty), &(s_rb.mutex));
    }

    /* Put the data on the queue, set how many bytes of the queue */
    /* are used and then incoming message index and the queue     */
    /* entry counter.                                             */
    memcpy(&(s_rb.buffer[s_rb.s_icm_idx]), a_data, a_bytes);
    s_rb.used_bytes[s_rb.s_icm_idx] = a_bytes;

    s_rb.s_icm_idx = (s_rb.s_icm_idx + 1) % C_FIFO_ENTRIES_MAX;
    s_rb.s_entry_count++;

    pthread_cond_signal(&(s_rb.fill));
    pthread_mutex_unlock(&(s_rb.mutex));

    return ret_val;
}

int open_port(char *port_name, int flags, int *fd)
{
    struct termios options;

    //*fd = open(port_name, O_RDWR | O_NOCTTY);
    *fd = open(port_name, flags | O_NOCTTY);

    if(*fd < 0){
        printf("Error opening port\n");
        return 1;
    } else{
        printf("Port handle is %d\n", *fd);
    }

    //Assume port has already been setup by receiver
    #if(1)
    options.c_cflag = B3000000 | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(*fd, TCIFLUSH);
    tcsetattr(*fd, TCSANOW, &options);
    #endif

    return 0;
}


int main(int argc, char **argv)
{
    int fd;
    int res;
    uint8_t buf[C_BUFFER_ENTRY_SIZE];
    int rc = 0;

    pthread_t p_parse;
    setbuf(stdout, NULL);

    int num_bytes_read = 0;
    int num_buf_byte_remaining = 0;

    #define HB_PRINT_INTERVAL 200

    unsigned long int heartbeat_seq_count = 0;
    unsigned long int heartbeat_seq_count_last_print = 0;

    pthread_mutex_init(&(s_rb.mutex), NULL);

    log_fh = fopen(argv[2], "wb");

    res = open_port(argv[1], O_RDONLY, &fd);
    pthread_create(&p_parse, NULL, dequeue_and_log, (void *)log_fh);

    while(1){
        heartbeat_seq_count++;

        if (heartbeat_seq_count - heartbeat_seq_count_last_print >= HB_PRINT_INTERVAL)
        {
            printf("Sequence ID %10ld\n", heartbeat_seq_count);             
            heartbeat_seq_count_last_print = heartbeat_seq_count;
        }

        num_bytes_read = 0;
        while (num_bytes_read < C_BUFFER_ENTRY_SIZE)
        {
            num_buf_byte_remaining = C_BUFFER_ENTRY_SIZE - num_bytes_read;      
            rc = read(fd, &buf[num_bytes_read], num_buf_byte_remaining);


            if (rc > 0)
            {
                num_bytes_read += rc;
            }
            else if (rc == -1)
            {
                printf("Read error %s\n",strerror(errno));
            }
            else 
            {}
        }

        //add data to logging queue
        add_to_queue(buf, num_bytes_read);
    }

    fclose(log_fh);
    return 0;
}

  1. 从串行读取添加到队列
  2. 第二个线程从队列中读取并写入磁盘

python 脚本 post 处理输出文件以检查它是否损坏了数据。这在测试完成后在单独的 PC 上运行,因此与负载无关。我知道这个脚本可以很好地解析数据。

我已经测试过三件事,无法解释

  1. VMIN = 200 且无后台负载 = 二进制文件中的数据损坏
  2. VMIN = 0 且无后台加载 = NO二进制文件中的数据损坏
  3. VMIN = 200 和后台负载 = NO二进制文件中的损坏数据

将 VMIN 设置为零会增加 CPU 负载(因为读取循环运行得更快,因为读取函数非常频繁地返回 0)。因此,我不确定是否实际上是 VMIN 导致了问题,或者更多的是增加 CPU 负载会使读取行为正常。

重申一下,它不是解码功能。在上述没有损坏数据的情况下,该功能可以完美地从文件中运行。

标签: clinuxserial-port

解决方案


由于您的代码无法在您无法阻止的常见情况下工作,为什么“为什么 [它] 只能在高 CPU 下工作?” 没关系。花大量时间和精力找出“为什么?”可能很有趣,但我认为您将不得不更改代码,因为当 CPU 负载下降时任何停止工作的东西都是,IMO,waaaaay脆弱的信任可以在任何时候工作。

首先,线程在您的系统上是否有用?如果只有一个 CPU 一次只能运行一个线程,那么创建多个线程将适得其反。有没有试过简单的单线程解决方案,实际发现行不通?

如果您尝试过单线程解决方案但它不起作用,我首先要注意的是,您当前发布的代码正在做大量不需要做的额外工作,而且它可能会与单个线程竞争当这根本没有多大帮助时锁定。

因此,消除您对数据的无关复制以及您正在做的所有不必要的簿记。

您也可能只使用一个互斥锁和条件变量就存在很多争用。没有必要不读,因为日志线程正在做某事,或者日志线程没有处理,因为读线程正在做一些簿记。您几乎肯定会从更精细的锁定粒度中受益。

我会做这样的事情:

#define CHUNK_SIZE ( 4 * 1024 )
#define NUM_BUFFERS 16

struct dataStruct
{
    sem_t full;
    sem_t empty;
    ssize_t count;
    char data[ CHUNK_SIZE ]
};

struct dataStruct dataArray[ NUM_BUFFERS ];

void initDataArray( void )
{
    for ( int ii = 0; ii < NUM_BUFFERS; ii++ )
    {
        // buffers all start empty
        sem_init( &( dataArray[ ii ].full ), 0, 0 );
        sem_init( &( dataArray[ ii ].empty ), 0, 1 );
    }
}

void *readPortThread( void *arg )
{
    unsigned currBuffer = 0;

    // get portFD from arg
    int portFD = ???
    for ( ;; )
    {
        sem_wait( &( dataArray[ currBuffer  ].empty ) );

        // probably should loop and read more, and don't
        // infinite loop on any error (hint...)
        dataArray[ currBuffer  ].count = read( portFD, 
            dataArray[ currBuffer  ].data,
            sizeof( dataArray[ currBuffer  ].data ) );
        sem_post( &( dataArray[ currBuffer  ].full ) );
        currBuffer++;
        currBuffer  %= NUM_BUFFERS;
    }
    return( NULL );
}

void *processData( char *data, ssize_t count )
{
    ...
}

void *logDataThread( void *arg )
{
    for ( ;; )
    {
        sem_wait( &( dataArray[ currBuffer  ].full ) );

        processData( dataArray[ currBuffer  ].data,
            dataArray[ currBuffer  ].count );

        sem_post( &( dataArray[ currBuffer  ].empty ) );
        currBuffer++;
        currBuffer  %= NUM_BUFFERS;
    }
    return( NULL );
}

请注意更精细的锁定粒度,以及完全没有多余的数据复制。正确的标题、所有错误检查和完整的实现都留作练习......

您必须进行测试才能找到CHUNK_SIZE和的最佳值NUM_BUFFERS。动态设置缓冲区的数量也是一个很好的改进。

int open_port(int flags, int *fd)OT:你的函数中不需要任何间接。只需返回fd值 - 它本身就足够了open()

int open_port(int flags )
{
    struct termios options;
    int fd = open("/dev/ttyUSB0", flags | O_NOCTTY);

    if(fd < 0){
        printf("Error opening port\n");
        return fd;
    }

    // no need for else - the above if statement returns
    printf("Port handle is %d\n", fd);

    // did you set **ALL** the fields???
    memset( options, 0, sizeof( options ) );

    options.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(fd, TCIFLUSH);
    tcsetattr(fd, TCSANOW, &options);

    return fd;
}

推荐阅读