首页 > 解决方案 > 如何从几个线程中异步读取相同的文件字节?

问题描述

我正在尝试使用带有多个线程的 aio_read 逐字节读取带有 aio.h 的文件。但我不知道我是否走在正确的轨道上,因为互联网上没有那么多东西可以阅读。

我刚刚创建了一个worker函数来将它传递给我的线程。作为传递给线程的参数,我创建了一个名为的结构thread_arguments,并在其中传递了一些必要的参数,aiocb例如打开、offset打开和。file_pathbuffer sizepriority

我可以从头到尾成功地读取一个线程的文件。但是当涉及到从几个线程中读取文件作为块时,我无法做到。而且我什至不确定是否可以在aio->reqprio不使用信号量或互斥锁的情况下做到这一点。(试图同时从几个线程中打开一个文件?)

如何从几个线程中异步读取几个字节?

假设文件包含“foobarquax”,我们有三个使用 aio 库的线程。

然后第一个应该读“foo”,

第二个应该是“bar”和

最后一个应该异步读取“quax”。

您可以在此处查看有关使用多个线程运行它的问题的屏幕截图

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <aio.h>
#include <string.h>
#include <fcntl.h> // open -> file descriptor O_RDONLY, O_WRONLY, O_RDWR
#include <errno.h>
#include <unistd.h>

typedef struct thread_args {
    char *source_path;
    char *destination_path;
    long int buffer_size;
    long int buffer_size_last; // buffer size for the last thread in case there is a remainder.
    long int offset;
    int priority;
} t_args;


void *worker(void *thread_args) {


    t_args *arguments = (t_args *) thread_args;


    struct aiocb *aiocbRead;

    aiocbRead = calloc(1, sizeof(struct aiocb));

    aiocbRead->aio_fildes = open(arguments->source_path, O_RDONLY);

    if (aiocbRead->aio_fildes == -1) {
        printf("Error");

    }

    printf("opened on descriptor %d\n", aiocbRead->aio_fildes);

    aiocbRead->aio_buf = malloc(sizeof(arguments->buffer_size));
    aiocbRead->aio_offset = arguments->offset;
    aiocbRead->aio_nbytes = (size_t) arguments->buffer_size;
    aiocbRead->aio_reqprio = arguments->priority;

    int s = aio_read(aiocbRead);

    if (s == -1) {
        printf("There was an error");
    }

    while (aio_error(aiocbRead) == EINPROGRESS) {}

    printf("Bytes read %ld", aio_return(aiocbRead));

    close(aiocbRead->aio_fildes);

    for (int i = 0; i < arguments->buffer_size ; ++i) {
        printf("%c\n", (*((char *) aiocbRead->aio_buf + i)));
    }
}

// Returns a random alphabetic character
char getrandomChar() {


    int letterTypeFlag = (rand() % 2);

    if (letterTypeFlag)
        return (char) ('a' + (rand() % 26));
    else
        return (char) ('A' + (rand() % 26));
}

void createRandomFile(char *source, int numberofBytes) {

    FILE *fp = fopen(source, "w");

    for (int i = 0; i < numberofBytes; i++) {
        fprintf(fp, "%c", getrandomChar());
    }

    fclose(fp);

}

int main(int argc, char *argv[]) {

    char *source_path = argv[1];
    char *destination_path = argv[2];
    long int nthreads = strtol(argv[3], NULL, 10);


    // Set the seed.
    srand(time(NULL));

    // Get random number of bytes to write to create the random file.
    int numberofBytes = 10 + (rand() % 100000001);

    // Create a random filled file at the source path.
    createRandomFile(source_path, 100);

    // Calculate the payload for each thread.
    long int payload = 100 / nthreads;
    long int payloadLast = payload + 100 % nthreads;

    // Create a thread argument to pass to pthread.
    t_args *thread_arguments = (t_args *) malloc(nthreads * sizeof(t_args));

    for (int l = 0; l < nthreads; ++l) {

        // Set arguments in the struct.
        (&thread_arguments)[l]->source_path = source_path;
        (&thread_arguments)[l]->destination_path = destination_path;
        (&thread_arguments)[l]->buffer_size = payload;
        (&thread_arguments)[l]->buffer_size_last = payloadLast;
        (&thread_arguments)[l]->offset = l * payload;
        (&thread_arguments)[l]->priority = l;

    }


    pthread_t tID[nthreads];

    // Create pthreads.
    for (int i = 0; i < nthreads; ++i) {
        pthread_create(&tID[i], NULL, worker, (void *) &thread_arguments[i]);
    }

    // Wait for pthreads to be done.
    for (int j = 0; j < nthreads; ++j) {
        pthread_join(tID[j], NULL);
    }

    free(thread_arguments);

    return 0;
}

如果我只是从一个线程调用它,则此代码读取成功,但如果我将它用于多个线程,这是我想要的,则它不起作用。

标签: casynchronousposix

解决方案


推荐阅读