首页 > 解决方案 > C:多生产者、多消费者有界队列

问题描述

我尝试(更好地尝试)使用以下接口实现循环缓冲区:

ring_buffer *ring_buffer_create(int capacity, int element_size);
void ring_buffer_destroy(ring_buffer *buffer)

const void *ring_buffer_read_acquire(ring_buffer *buffer, ring_buffer_loc *loc);
void ring_buffer_read_finish(ring_buffer *buffer, ring_buffer_loc loc);

void *ring_buffer_write_acquire(ring_buffer *buffer, ring_buffer_loc *loc);
void ring_buffer_write_finish(ring_buffer *buffer, ring_buffer_loc loc);

应该可以同时(甚至并行)读取/写入多个元素。例如:

ring_buffer *buffer = ring_buffer_create(10, sizeof(int));

/* Write a single element */
ring_buffer_loc loc0;
int *i0 = ring_buffer_write_acquire(buffer, &loc);
*i0 = 42; // this could be a big data structure and way more expensive
ring_buffer_write_finish(buffer, loc0);

/* Write "concurrently" */
ring_buffer_loc loc1, loc2;
int *i1 = ring_buffer_write_acquire(buffer, &loc);
int *i2 = ring_buffer_write_acquire(buffer, &loc);
*i1 = 1729;
*i2 = 314;
ring_buffer_write_finish(buffer, loc1);
ring_buffer_write_finish(buffer, loc2);

所有“获取”功能都应该被阻塞,直到可以操作为止。

到目前为止,一切都很好。我认为这很简单,所以我从一个基于互斥锁的干净实现开始。但很快我就发现这对于我的用例来说太慢了(每秒 100'000 次写入和读取),所以我切换到自旋锁等。

我的实现变得非常混乱,在某个时候(现在),我开始思考为什么不存在像这样具有所需接口的“简单”东西?也许,重新实现这样的东西无论如何都不是一个好主意。

也许有人知道一个具有这样一个接口的实现,如果操作不可能,它会阻塞?我在互联网上找了很长时间,但我找不到适合我的问题的匹配项。也许我想要的界面只是“坏”或“错误”?

不过,我添加了我当前的代码。它基本上为每个“单元格”(=值)分配一个状态,可以是 NONE(未设置;单元格基本上是空的)、WRITING(有人获得了要写入数据的单元格)、READING(有人获得了要读取的单元格)和 SET (单元格有一个可以读取的值)。每个单元都有一个自旋锁,用于更新单元状态。

然后它的工作原理是这样的:当有人获得读取并且当前单元格的状态为“SET”时,可以读取该值(新状态为 READING)并增加读取索引。在所有其他情况下,条件变量用于等待元素可用。当一个元素读取完成时,单元状态变为 NONE,如果有任何写入器在等待,则发送一个条件变量信号。

如果获取单元写入,则同样如此。唯一的区别是该单元需要使用“NONE”状态,并且如果有任何可能的读取器,则会发出信号。

由于某些原因,代码有时会锁定,因此我不得不在条件变量中添加“脏”超时。如果可以解决这个问题,我已经非常高兴了,因为“超时”基本上使代码轮询(相对难看),同时完成了许多上下文切换。也许有人看到了这个错误?“新”代码也有缺点,它有时真的很慢,这对我的应用程序来说就像一个杀手。我附上了“旧”和“新”代码(标记了更改的行)。

感谢你们对我的帮助:)!

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>


typedef int ring_buffer_loc;

enum t_ring_buffer_cell_state
{
    NONE        = 0,
    WRITING     = 1,
    READING     = 2,
    SET         = 3
};

typedef struct {
    char *buffer;                   // data
    atomic_int_fast8_t *states;     // state per cell
    pthread_spinlock_t *locks;      // lock per cell
    int capacity;
    int element_size;

    pthread_spinlock_t read_i_lock;
    int read_i;
    pthread_spinlock_t write_i_lock;
    int write_i;

    pthread_spinlock_t waiting_readers_lock;
    int waiting_readers;
    pthread_spinlock_t waiting_writers_lock;
    int waiting_writers;

    pthread_mutex_t value_written_lock;
    pthread_mutex_t value_read_lock;
    pthread_cond_t value_written;
    pthread_cond_t value_read;
} ring_buffer;

ring_buffer *ring_buffer_create(int capacity, int element_size)
{
    ring_buffer *res = calloc(1, sizeof(ring_buffer));
    res->buffer = calloc(capacity, element_size);
    res->states = calloc(capacity, sizeof(*res->states));
    res->locks = malloc(capacity * sizeof(*res->locks));
    for (int i = 0; i < capacity; ++i) {
        pthread_spin_init(&res->locks[i], PTHREAD_PROCESS_PRIVATE);
    }
    pthread_spin_init(&res->write_i_lock, PTHREAD_PROCESS_PRIVATE);
    pthread_spin_init(&res->read_i_lock, PTHREAD_PROCESS_PRIVATE);
    pthread_spin_init(&res->waiting_readers_lock, PTHREAD_PROCESS_PRIVATE);
    pthread_spin_init(&res->waiting_writers_lock, PTHREAD_PROCESS_PRIVATE);
    res->capacity = capacity;
    res->element_size = element_size;
    return res;
}

void ring_buffer_destroy(ring_buffer *buffer)
{
    free(buffer->buffer);
    free(buffer->states);
    free(buffer);
}

static inline void ring_buffer_inc_index(ring_buffer *buffer, int *index)
{
    *index = (*index + 1) % buffer->capacity;
}

void timespec_now_plus_ms(struct timespec *result, long ms_to_add)
{
    const int one_second_us = 1000 * 1000 * 1000;
    timespec_get(result, TIME_UTC);
    const long nsec = result->tv_nsec + ms_to_add * 1000 * 1000;
    result->tv_sec  += nsec / one_second_us;
    result->tv_nsec += nsec % one_second_us;
}

const void *ring_buffer_read_acquire(ring_buffer *buffer, ring_buffer_loc *loc)
{
    bool is_waiting = false;
start:
    pthread_spin_lock(&buffer->read_i_lock);
    const int read_i = buffer->read_i;
    pthread_spinlock_t *cell_lock = &buffer->locks[read_i];
    pthread_spin_lock(cell_lock);

    const int state = buffer->states[read_i];
    if (state == NONE || state == WRITING || state == READING) {
        if (!is_waiting) {
            is_waiting = true;
            pthread_spin_lock(&buffer->waiting_readers_lock);
            ++buffer->waiting_readers;
            pthread_mutex_lock(&buffer->value_written_lock);
            pthread_spin_unlock(&buffer->waiting_readers_lock);
        } else {
            pthread_mutex_lock(&buffer->value_written_lock);
        }

        pthread_spin_unlock(cell_lock);
        pthread_spin_unlock(&buffer->read_i_lock);

        // "new" code:
        // struct timespec ts;
        // do {
        //     timespec_now_plus_ms(&ts, 50);
        // } while (pthread_cond_timedwait(&buffer->value_written, &buffer->value_written_lock, &ts) == ETIMEDOUT && buffer->states[read_i] == state);
        // pthread_mutex_unlock(&buffer->value_written_lock);
        // "old" code (which hangs quite often):
        pthread_cond_wait(&buffer->value_written, &buffer->value_written_lock);
        pthread_mutex_unlock(&buffer->value_written_lock);

        goto start;
    } else if (state == SET) {
        if (is_waiting) {
            pthread_spin_lock(&buffer->waiting_readers_lock);
            --buffer->waiting_readers;
            assert(buffer->waiting_readers >= 0);
            pthread_spin_unlock(&buffer->waiting_readers_lock);
        }

        buffer->states[read_i] = READING;
        ring_buffer_inc_index(buffer, &buffer->read_i);
        pthread_spin_unlock(&buffer->read_i_lock);
        pthread_spin_unlock(cell_lock);
        *loc = read_i;
        return &buffer->buffer[read_i * buffer->element_size];
    } else {
        printf("unknown state!\n");
        exit(1);
    }
}

void ring_buffer_read_finish(ring_buffer *buffer, ring_buffer_loc loc)
{
    pthread_spinlock_t *cell_lock = &buffer->locks[loc];
    pthread_spin_lock(cell_lock);
    buffer->states[loc] = NONE;
    pthread_spin_unlock(cell_lock);

    pthread_spin_lock(&buffer->waiting_writers_lock);
    if (buffer->waiting_writers > 0) {
        pthread_cond_signal(&buffer->value_read);
    }
    pthread_spin_unlock(&buffer->waiting_writers_lock);
}

void *ring_buffer_write_acquire(ring_buffer *buffer, ring_buffer_loc *loc)
{
    bool is_waiting = false;
start:
    pthread_spin_lock(&buffer->write_i_lock);
    const int write_i = buffer->write_i;
    pthread_spinlock_t *cell_lock = &buffer->locks[write_i];
    pthread_spin_lock(cell_lock);

    const int state = buffer->states[write_i];
    if (state == SET || state == READING || state == WRITING) {
        if (!is_waiting) {
            is_waiting = true;
            pthread_spin_lock(&buffer->waiting_writers_lock);
            ++buffer->waiting_writers;
            pthread_mutex_lock(&buffer->value_read_lock);
            pthread_spin_unlock(&buffer->waiting_writers_lock);
        } else {
            pthread_mutex_lock(&buffer->value_read_lock);
        }

        pthread_spin_unlock(cell_lock);
        pthread_spin_unlock(&buffer->write_i_lock);

        // "new" code:
        // struct timespec ts;
        // do {
        //     timespec_now_plus_ms(&ts, 5);
        // } while (pthread_cond_timedwait(&buffer->value_read, &buffer->value_read_lock, &ts) == ETIMEDOUT && buffer->states[write_i] == state);
        // pthread_mutex_unlock(&buffer->value_read_lock);
        // "old" code (which hangs quite often):
        pthread_cond_wait(&buffer->value_read, &buffer->value_read_lock);
        pthread_mutex_unlock(&buffer->value_read_lock);

        goto start;
    } else if (state == NONE) {
        if (is_waiting) {
            pthread_spin_lock(&buffer->waiting_writers_lock);
            --buffer->waiting_writers;
            assert(buffer->waiting_writers >= 0);
            pthread_spin_unlock(&buffer->waiting_writers_lock);
        }

        buffer->states[write_i] = WRITING;
        ring_buffer_inc_index(buffer, &buffer->write_i);
        pthread_spin_unlock(&buffer->write_i_lock);
        pthread_spin_unlock(cell_lock);
        *loc = write_i;
        return &buffer->buffer[write_i * buffer->element_size];
    } else {
        printf("unknown state!\n");
        exit(1);
    }
}

void ring_buffer_write_finish(ring_buffer *buffer, ring_buffer_loc loc)
{
    pthread_spinlock_t *cell_lock = &buffer->locks[loc];
    pthread_spin_lock(cell_lock);
    buffer->states[loc] = SET;
    pthread_spin_unlock(cell_lock);

    pthread_spin_lock(&buffer->waiting_readers_lock);
    if (buffer->waiting_readers > 0) {
        pthread_cond_signal(&buffer->value_written);
    }
    pthread_spin_unlock(&buffer->waiting_readers_lock);
}

/* just for debugging */
void ring_buffer_dump(const ring_buffer *buffer)
{
    printf("RingBuffer\n");
    printf(" Capacity:     %d\n", buffer->capacity);
    printf(" Element size: %d\n", buffer->element_size);
    printf(" Read index:   %d\n", buffer->read_i);
    printf(" Write index:  %d\n", buffer->write_i);
    printf(" Cells:\n");
    for (int i = 0; i < buffer->capacity; ++i) {
        printf(" [%d]: STATE = ", i);
        switch (buffer->states[i]) {
            case NONE:
                printf("NONE");
                break;
            case WRITING:
                printf("WRITING");
                break;
            case READING:
                printf("READING");
                break;
            case SET:
                printf("SET");
                break;
        }
        printf("\n");
    }
    printf("\n");
}



/*
 * Test run
 */

struct write_read_n_conf {
    ring_buffer *buffer;
    int n;
};
 
static void *producer_thread(void *arg)
{
    struct write_read_n_conf conf = *(struct write_read_n_conf *)arg;
    for (int i = 0; i < conf.n; ++i) {
        ring_buffer_loc loc;
        int *value = ring_buffer_write_acquire(conf.buffer, &loc);
        *value = i;
        ring_buffer_write_finish(conf.buffer, loc);

        if (i % 1000 == 0) {
            printf("%d / %d\n", i, conf.n);
        }
    }
    return NULL;
}

static void *consumer_thread(void *arg)
{
    struct write_read_n_conf conf = *(struct write_read_n_conf *)arg;
    int tmp;
    bool ok = true;
    for (int i = 0; i < conf.n; ++i) {
        ring_buffer_loc loc;
        const int *value = ring_buffer_read_acquire(conf.buffer, &loc);
        tmp = *value;
        ring_buffer_read_finish(conf.buffer, loc);

        ok = ok && (tmp == i);
    }
    printf("ok = %d\n", ok);
    return (void *)ok;
}

void write_read_n_parallel(int n)
{
    ring_buffer *buffer = ring_buffer_create(50, sizeof(int));
    struct write_read_n_conf conf = {
        .buffer = buffer,
        .n      = n
    };

    pthread_t consumer;
    pthread_t producer;

    pthread_create(&consumer, NULL, consumer_thread, &conf);
    pthread_create(&producer, NULL, producer_thread, &conf);

    pthread_join(producer, NULL);

    void *res;
    pthread_join(consumer, &res); // hacky way to pass a bool: res == NULL means false, and otherwise true
    assert(res != NULL);
}   

int main() {
    write_read_n_parallel(10000000);
}

标签: cparallel-processingpthreadslockingcircular-buffer

解决方案


推荐阅读