首页 > 解决方案 > POSIX 线程的并行程序比串行程序慢

问题描述

我知道这个问题是重复的,但我找不到与我的代码类似的任何其他主题。

问题陈述如下:

有一个包含 16,000 行的 CSV 文件。该程序的串行版本正在提取价格(SalePrice是 CSV 中的列标题)高于使用命令行参数提供给程序的特定值(阈值)的那些行,并计算它们的均值标准导数,这将是用于进一步计算。

对于并行版本,这个较大的 CSV 文件分为 4 个 CSV 文件。每个线程都分配给一个 CSV 文件,并且应该执行相同的计算(计算价格高于我的代码中名为阈值的特定值的行的平均值和标准差)。

由于数据足够大,我不认为这是因为多线程开销。

如果有人能帮我找出是什么部分减慢了我的并行版本,我将不胜感激?

#include <iostream>
#include <fstream>
#include <vector>
#include <math.h>
#include <iomanip> 
#include <pthread.h>
#include <stdio.h>
#include <time.h>
#include <sys/stat.h>
#include <unistd.h>

using namespace std;

#define COMMA ','
#define EMPTY_STR ""
#define FILENAME "dataset.csv"
#define CLASSIFIER "GrLivArea"
#define SALE_PRICE "SalePrice"

const int MAX_THREAD_NUMBERS = 20; 

int NUMBER_OF_THREADS;
int threshold;
int expensive_cnt[MAX_THREAD_NUMBERS];
vector<string> lines;
string head;
double _std;
long sum[MAX_THREAD_NUMBERS];
long ps[MAX_THREAD_NUMBERS];
long sumsq[MAX_THREAD_NUMBERS];
double mean;
int total_items;
int total_expensive_cnt;

struct Item
{
    int x;
    bool category;
};

vector<Item> items[MAX_THREAD_NUMBERS];

int getColNum(const string& head, const string& key)
{
    int cnt = 0;
    string cur = EMPTY_STR;
    for (int i = 0 ; i < head.size() ; i++)
    {
        if (head[i] == COMMA)
        {
            if (cur == key)
                return cnt;
            cnt++;
            cur = EMPTY_STR;
        }   
        else
        cur += head[i];
    }
    if (cur == key)
        return cnt;
    return -1;
}

vector<int> separateByComma(string s)
{
    vector<int> res;
    string cur = EMPTY_STR;
    for (int i = 0 ; i < s.size() ; i++)
        if (s[i] == COMMA)
        {
            res.push_back(stoi(cur));
            cur = EMPTY_STR;
        }
        else
        cur += s[i];

    res.push_back(stoi(cur));
    return res;
}

void* calcSums(void* tid)
{
    long thread_id = (long)tid;

    string filename = "dataset_" + to_string(thread_id) + ".csv"; 
    ifstream fin(filename);

    string head;
    fin >> head;
    int classifierColNum = getColNum(head, CLASSIFIER);
    if (classifierColNum == -1)
    {
        printf("NO GrLivArea FOUND IN HEAD OF CSV\n");
        exit(-1);
    }

    int priceColNum = getColNum(head, SALE_PRICE);
    if (priceColNum == -1)
    {
        printf("NO SalePrice FOUND IN HEAD OF CSV\n");
        exit(-1);
    }


    string line;
    while (fin >> line)
    {
        vector<int> cur = separateByComma(line);

        bool category = (cur[priceColNum] >= threshold);

        Item item{cur[classifierColNum], category};

        if (category)
        {
            sum[thread_id] += item.x;
            sumsq[thread_id] += (item.x * item.x);
            expensive_cnt[thread_id]++;
        }

        items[thread_id].push_back(item);
    }


    fin.close();
    pthread_exit(NULL);
}


void calcMeanSTD()
{
    string line;
    for (int i = 0 ; ; i++)
    {
        struct stat buffer;   
        string name = "dataset_" + to_string(i) + ".csv"; 
        if (!(stat (name.c_str(), &buffer) == 0))
            break;
        
        NUMBER_OF_THREADS++;
    }
  

    pthread_t threads[NUMBER_OF_THREADS];
    int return_code;
    for (long tid = 0 ; tid < NUMBER_OF_THREADS ; tid++)
    {
        return_code = pthread_create(&threads[tid], NULL, calcSums, (void*)tid);

        if (return_code)
        {
            printf("ERROR; return code from pthread_create() is %d\n", return_code);
            exit(-1);
        }
    }

    for (long tid = 0 ; tid < NUMBER_OF_THREADS ; tid++)
    {
        return_code = pthread_join(threads[tid], NULL);
        if (return_code)
        {
            printf("ERROR; return code from pthread_join() is %d\n", return_code);
            exit(-1);
        }
    }

    double total_sum = 0;
    double total_sum_sq = 0;
    total_expensive_cnt = 0;
    total_items = 0;
    for (int i = 0 ; i < NUMBER_OF_THREADS ; i++)
    {
        total_sum += sum[i];
        total_sum_sq += sumsq[i];

        total_expensive_cnt += expensive_cnt[i];
        total_items += items[i].size();
    }

    mean = total_sum / total_expensive_cnt;
    _std = sqrt((total_sum_sq - ((total_sum * total_sum) / (total_expensive_cnt))) / (total_expensive_cnt));

}


int main(int argc, char *argv[])
{
    threshold = atoi(argv[1]);          

    calcMeanSTD();                

    cout << mean << " " << _std << endl;

    return 0;
}

如果有任何部分无法理解,请告诉我。

以下是一些运行时值: Read CSV (Serial): 0.043268s Calculations (Serial): 0.000151s 在此处的多线程版本中,精确的时间计算并不容易,因为计算和文件读取是在同一个 while 循环中完成的,这在此处不可分离。还有很多线程切换。无论如何,他们的总和大约是:0.14587s

可以看出,从文件中读取所需的时间几乎是进行数学计算的 300 倍。

标签: c++linuxparallel-processingpthreads

解决方案


感谢评论中的答案,我发现了发生了什么:

我尝试增加 CSV 文件中的行数,以查看并行化是否有效。

具有 1000000 行的 CSV 文件的运行时值为:

并行:real 0m0.558s user 0m2.173s sys 0m0.020s 串行:real 0m1.834s user 0m1.818s sys 0m0.016s

由于我使用 4 个线程,我预计 1.834 除以 0.558 接近 4,实际上是 3.28,并且足够公平。

较小 CSV 文件的运行时值没有显示这些结果,这似乎是因为我的代码中的简单数学计算。

这段代码的瓶颈是我从 CSV 文件中读取的部分。这部分似乎是串行的,因为它是从磁盘读取的。

此代码中还有一个错误共享的问题,当这些位置共享相同的缓存行映射时,由于不同的线程更新不同的内存位置而导致缓存争用。这个问题有很多解决方案,例如,我可以在这些数组中引入填充,以确保多个线程访问的元素不共享缓存行。或者,更简单地说,使用线程局部变量而不是数组,最后只更新一次数组元素。


推荐阅读