首页 > 解决方案 > c语言中读取文件大小的线程(互斥问题)

问题描述

我有一个学校的项目,我们将用线程制作一个程序,该程序将计算目录中文件的大小。我们需要使用 lstat 来获取文件的块大小。我的解决方案是创建所有线程,主线程将遍历目录并将找到的每个文件添加到队列中。然后线程可以从队列中获取文件并使用 lstat 获取块大小。这张照片显示了我做这个时的想法。 在此处输入图像描述

该程序的工作是我所期望的。但是我遇到的问题是,对于我添加到程序中的每个线程,我添加的每个线程完成 get 所需的时间都更长。因此,一个线程需要 5 秒,而 2 个线程需要 7 秒。

我将展示我编写的一些代码。我想我想念如何在这里使用互斥锁,因为互斥锁块是添加到队列的主要线程,并且所有线程在添加到队列时都会被阻塞。因此,如果您看到我在互斥锁中想念的东西,请告诉我:)。

/* If -j flag is used and it's more then 1 thread */
if(flag_j == 1 && threads > 1) {

    /* Create each thread */
    createThreads(th, threads);

    /* Make a task of each file and add task to queue */
    makeTask(argv);

    /* Tell that no more file will be added to the queue */
    pthread_mutex_lock(&mutex);
    endThreads = 1;
    pthread_mutex_unlock(&mutex);

    /* Join thread's */
    joinThreads(th, threads);
}

/**
 * Going to create all the thread's that was given
 * with the -j flag at start.
 * @param th An array of all the thread's.
 * @param threads The number of thread's.
 * 
 */
void createThreads(pthread_t th[], int threads) {
    for(int i = 0; i < threads; i++) {
        int *a = malloc(sizeof(a));
        *a = i;
        /* pthread create */
        if(pthread_create(&th[i], NULL, &getQueue, a) != 0) {
            perror("Failed to create thread");
            exit(EXIT_FAILURE);
        }    
    }
}


/**
 * Each thread will get a task from the queue to execute until
 * the queue is empty and the conditional variable is set to 1.
 * @param arg In this case it's NULL.
 * 
*/
void *getQueue(void *arg) {

    free(arg);
    while(1) {
        Task task;
    
        /* Lock */
        pthread_mutex_lock(&mutex);
        if(queueCount > 0) {
            /* Get the first task in queue */
            task = queue[0];
            /* Move all the task in the queue forward one step */
            for (int i = 0; i < queueCount - 1; i++)
            {
                queue[i] = queue[i + 1];
            }
            queueCount--;
            /* execute the task */
            pthread_mutex_unlock(&mutex);
            execute(&task);
        } else if ( endThreads == 1) {
            pthread_mutex_unlock(&mutex);
            break;
        } else {
            pthread_mutex_unlock(&mutex);
        }
    
    }

    /* The thread will just resturn NULL */
    return arg;
}


/**
 * Find the correct struct in the list of arguments.
 * Then add the size of the file to that struct and
 * add the struct back in to the list.
 * @param task Is the task that's need to be executed.
 * 
*/
void execute(Task* task) {

    int i = 0;
    char *path = malloc(1024*sizeof(path) + 1);

    /* Get the first name of the path */
    while (task->file[i] != '\0')
    {   
        /* Break when it find's the first "/" */
        if (task->file[i] == '/' && i != 0)
        {
             break;
        }
    
        path[i] = task->file[i];
        i++;
    }

    path[i] = '\0';


    pthread_mutex_lock(&mutex1);
    for (int j = 0; j < numOfArg; j++)
    {
        Arguments entity;
        /* Get the first struct in listOfArg */
        entity = listOfArg[0];

        /* Move all the struct's one step forward */
        for (int k = 0; k < numOfArg - 1; k++){
            listOfArg[k] = listOfArg[k + 1];
        }
    
        /* Check if the struct name is the same name as the path */
        if (strcmp(path, entity.name) == 0)
        {
            /* Add the size of the file to the struct size */
            entity.size += getFileSize(task->file);
            /* Add the struct back to the list */
            listOfArg[numOfArg - 1] = entity;
            break;
        } else {
            /* Add the struct back to the list */
            listOfArg[numOfArg - 1] = entity;
        }   
    }

    /* Get the size of the file and add it to the total size */
    totalSize += getFileSize(task->file);
    pthread_mutex_unlock(&mutex1);
    free(path);
}

/**
 * Will check if the argument is a directory or an file.
 * If it is a directory it's going to open and get the file's.
 * If it is a file it will make a Task and add the file to it
 * and add the task to the queue.
 * @param argv An array of the argument's.
 * 
*/
void makeTask(char *argv[]) {
    while(argv[optind] != NULL) {
        if (isDirectory(argv[optind]))
        {
            filesDir(argv[optind]);
        } else {
            Task t;
            t.file[0] = '\0';
            strcpy(t.file, argv[optind]);
            addTask(t);
        }
    
        optind++;
    }
}

/**
 * Will add the task to the queue of task for the thread's
 * to execute.
 * @param task The task to add to the queue of task's.
 * 
*/
void addTask(Task task) {
    pthread_mutex_lock(&mutex);
    queue[queueCount] = task;
    queueCount++;
    pthread_mutex_unlock(&mutex);
}

标签: cmultithreadingthread-safetythreadpoolmutex

解决方案


这些评论表达了对多线程可以加速哪些操作不能加速的有效担忧,但它们似乎在很大程度上已经锁定了对您的观察的一种合理解释,而没有考虑其他的。I/O 考虑因素限制并行化可实现的收益,但操作系统会采取措施尝试减少慢速 I/O 的影响,例如预读以及数据和元数据缓存。因此,在这种情况下,I/O 瓶颈不一定足以完全克服并行计算的所有加速。你同学的观察在这里不能打折扣,尽管假设他们的程序是正确的也不一定安全。

事实上,您的代码还有许多其他问题会导致多线程性能和扩展性不佳,包括

  • 忙着等待。当getQueue()检查队列并发现它为空时,它会解锁互斥锁,然后立即尝试重新获取它。尽管其他线程在等待互斥体,例如试图填充队列的线程,但重新获取很有可能成功。添加的工作线程越多,主线程 runningmakeTask()对任务进行排队的难度就越大。

    这类问题通常在条件变量的帮助下解决。

  • 非常低效的队列实现。每次从队列中删除一个元素时,队列的整个尾部都会向上移动,以便头部位于索引 0 处。所有这些都必须在保持互斥锁锁定的情况下执行,从而减少了整体工作的比例可以并行运行。有多种性能更好的替代方案——例如,循环队列或链表。

  • 中出现第二个临界区execute()。目前尚不清楚所有这些操作listOfArg是关于什么的,但它也限制了并行执行的程序的比例。并且它与那些涉及操作任务队列的效率问题相似。

该程序还表现出一些可能对加速和缩放没有太大影响的一般低效率,例如通过循环而不是 via 一次围绕一个元素移动数组的附加实例memmove(),手动滚动字符串操作而不是使用内置,优化的库函数,例如strchr(),并执行动态内存分配,其中自动分配会更好。


推荐阅读