首页 > 解决方案 > PThread 的分布式计算不工作

问题描述

我有一个用 C 语言编写的程序,用于我正在上课的课程。目标是获取一个数组并使用 10 个不同的线程计算某个字符串的所有实例。在这种情况下,我们试图计算文件中有多少个“是”。应该是55。

我当前的逻辑是将数组拆分为每个单词,然后单独处理每个单词并包含一个测试以查看它是否应该在当前线程或另一个线程上处理。

任何帮助将不胜感激。除了:num_substring、allowedOnThread 和计数器之外,我的导师还提供了所有代码。我做了这些。

我对C很陌生。

我目前有:

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#define MAX 10240
#define NUM_THREADS  10

int n1,n2;
char *s1,*s2;
FILE *fp;
int countArray[NUM_THREADS]={0};

int total = 0;

//read input file and generate string s1/s2 and length n1/n2
int readf(FILE *fp)
{
    if((fp=fopen("strings.txt", "r"))==NULL){
        printf("ERROR: can't open string.txt!\n");
        return 0;
    }
    s1=(char *)malloc(sizeof(char)*MAX);
    if(s1==NULL){
        printf("ERROR: Out of memory!\n");
        return -1;
    }
    s2=(char *)malloc(sizeof(char)*MAX);
    if(s1==NULL){
        printf("ERROR: Out of memory\n");
        return -1;
    }
    /*read s1 s2 from the file*/
    s1=fgets(s1, MAX, fp);
    s2=fgets(s2, MAX, fp);
    n1=strlen(s1);  /*length of s1*/
    n2=strlen(s2)-1; /*length of s2*/

    if(s1==NULL || s2==NULL || n1<n2)  /*when error exit*/
        return -1;
    return 0;
}

int num_substring(int t) {
//add your logic here
//1, how to distribute different parts of string s1 into different threads
//2, how to sum up the total number of substring from all threads
    
    char *str = s1; // This is what we will need to process for the string
    
    char *token = strtok(str, " ");
    
    int count = 0;
    
    int index = 0;
    while(token != NULL) {
        // Determine if it should be on the thread
        
        if(allowedOnThread(t, index) == 1){
            count = count + counter(token);
        }else{
            return count;
        }
        index++;
        token = strtok(NULL, " ");
    }

    return count;
}

int allowedOnThread(int thread, int index) {
    int threadMultiplier = n1 / NUM_THREADS;
    
    // Check range
    int min = thread * threadMultiplier;
    int max = (thread * threadMultiplier) + threadMultiplier;
    
    if(thread >= 1){
        min = (thread * threadMultiplier) + 1;
    }
    
    if(thread + 1 == NUM_THREADS){
        max = n1 + 1;
    }
    
    if(min <= index && index <= max) {
        return 1;
    }
    
    
    return 0;
}

int counter(char *str){
    int i,j,k;
    int count;

    int complete = 0;
    
    for (i = 0; i <= (n1-n2); i++){   
        count=0;
        for(j = i,k = 0; k < n2; j++,k++){  /*search for the next string of size of n2*/  
            if (*(str+j)!=*(s2+k)){
                break;
            }else{
                count++;
            }

            if(count==n2){  
                complete++;
            }                       
        }
    }
    
    total = total + complete;
    
    return complete;
}


void *calSubStringThread(void *threadid){
    long tid = (long)threadid;
    printf("This is thread %ld, ", tid);
    int num = num_substring(tid);
    printf("find num of is: %d\n", num);
    pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
    pthread_t threads[NUM_THREADS];
    int t, rc;

    readf(fp);
    
    for(t=0; t<NUM_THREADS; t++){
        rc = pthread_create(&threads[t], NULL, calSubStringThread, (void *) (size_t)t);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(t=0; t<NUM_THREADS; t++){
        pthread_join(threads[t], NULL);
    }

    printf("The number of substrings is: %d\n", total);
    return 1;
}

输入文件:

Thss is an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. This is an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss ss. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That is a kiwi fruit. This is an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ssss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. That is cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. This is cherry. That ss a haw. Thss ss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ssss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. This is cherry. That is a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. This is cherry. This is a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree This is a banana. This is a berry. This is cherry. This is a haw. Thss is a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. This is a banana. This is a berry. This is cherry. This is a haw. This is a lemon. There ss a hickory on the tree. Thss ss an apple. That ss a pear. That ss an orange. That ss a kiwi fruit. Thss ss an avocado. There ss a peach on the tree. Thss ss a banana. That ss a berry. That ss cherry. That ss a haw. Thss ss a lemon. There ss a hickory on the
is

提前谢谢你。我已经为此工作了好几个小时。

标签: arraysclinuxpthreadsdistributed-computing

解决方案


这是我的热门评论的开头。

我不得不稍微重构你的代码。

我还必须修改一些提供的功能以允许调试printf

根据您想要的结果55,而不是strtok,您可以/应该使用strstr. 这是我让计数正确的唯一方法。

我添加了一个互斥锁,因此更新total不会被线程冲突破坏。

关键是我提到的allowedOnThread使用 a的重写。struct计算每个线程的每个段的开始和结束偏移量,调整前后空格,以便单词不会在中间被切掉。


听到的是重构的代码。它是注释的。它允许每个线程单独计算其范围。

代码给出了正确的答案,但在end每个段上停止似乎是正确的,但我可能会仔细检查。

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <stdatomic.h>

#define MAX 10240
#define NUM_THREADS  10

int n1, n2;
char *s1, *s2;
FILE *fp;
int countArray[NUM_THREADS] = { 0 };

pthread_mutex_t mutex;

int total = 0;

const char *delims = ",. \t\n";

struct range {
    size_t off;                         // starting offset
    size_t end;                         // ending offset (one past last char)
};

__thread int curtid;
__thread FILE *logxf;
#ifdef DEBUG
#define dbgprt(_fmt...) \
    _dbgprt(_fmt)
#else
#define dbgprt(_fmt...) \
    do { \
    } while (0)
#endif

#define prt(_lvl) \
    __attribute__((__format__(__printf__,_lvl,_lvl + 1)))

void prt(1)
_dbgprt(const char *fmt,...)
{
    char *bp;
    char buf[1000];
    va_list ap;

    if (logxf == NULL) {
        char logf[100];
        sprintf(logf,"log%2.2d",curtid);
        logxf = fopen(logf,"w");
        setlinebuf(logxf);
    }

    fprintf(logxf,"[%d] ",curtid);

    va_start(ap,fmt);
    bp += vfprintf(logxf,fmt,ap);
    va_end(ap);
}

//read input file and generate string s1/s2 and length n1/n2
int
readf(FILE * fp)
{
    if ((fp = fopen("strings.txt", "r")) == NULL) {
        printf("ERROR: can't open string.txt!\n");
        return 0;
    }
    s1 = (char *) malloc(sizeof(char) * MAX);
    if (s1 == NULL) {
        printf("ERROR: Out of memory!\n");
        return -1;
    }
    s2 = (char *) malloc(sizeof(char) * MAX);
    if (s1 == NULL) {
        printf("ERROR: Out of memory\n");
        return -1;
    }
    // read s1 s2 from the file

    s1 = fgets(s1, MAX, fp);
    s2 = fgets(s2, MAX, fp);
    // length of s1
    n1 = strlen(s1);
    // length of s2
    n2 = strlen(s2) - 1;

    // when error exit
    if (s1 == NULL || s2 == NULL || n1 < n2)
        return -1;
    return 0;
}

size_t
skip_to_delim(size_t off,const char *tag)
{
    char *str;

    dbgprt("skip_to_delim: ENTER off=%zu tag=%s\n",off,tag);

    str = &s1[off];
    off += strcspn(str,delims);

    dbgprt("skip_to_delim: EXIT off=%zu\n",off);

    return off;
}

int
allowedOnThread(int thread, struct range *seg)
{
    int threadMultiplier = n1 / NUM_THREADS;

    dbgprt("allowedOnThread: ENTER thread=%d\n",thread);

    // get starting offset
    do {
        seg->off = threadMultiplier * thread;

        // first thread always starts at offset 0
        if (thread == 0)
            break;

        // skip past a word and stop on a delimiter
        seg->off = skip_to_delim(seg->off,"off");
    } while (0);

    // get ending offset/length
    do {
        if (thread == (NUM_THREADS - 1)) {
            seg->end = n1;
            break;
        }

        // scan at least the amount we're allocated
        seg->end = seg->off + threadMultiplier;

        // skip past a word and stop on a delimiter
        seg->end = skip_to_delim(seg->end,"end");
    } while (0);

    dbgprt("allowedOnThread: EXIT thread=%d off=%zu end=%zu\n",
        thread,seg->off,seg->end);

    return 0;
}

int
num_substring(int t)
{
    //add your logic here
    //1, how to distribute different parts of string s1 into different threads
    //2, how to sum up the total number of substring from all threads

    dbgprt("num_substring: ENTER\n");

    struct range seg;
    allowedOnThread(t,&seg);

    char *str = &s1[seg.off];
    char *end = &s1[seg.end];

    char *token = str;
    size_t count = 0;

    // NOTE/FIXME -- this should be double checked to ensure that we're not
    // double counting by going beyond our range
    while (1) {
        // look for a substring match of s2 in s1
        token = strstr(token,s2);
        if (token == NULL)
            break;

        // don't intrude on next thread's segment
        if (token >= end)
            break;

        // advance the count
        count += 1;

        // point to start of next possible match point for s2
        token += n2;

        // stop when we go beyond the end of our thread's area
        if (token >= end)
            break;
    }

    // add to global count (under thread lock)
    pthread_mutex_lock(&mutex);
    total += count;
    pthread_mutex_unlock(&mutex);

    dbgprt("num_substring: EXIT count=%zu\n",count);

    return count;
}

void *
calSubStringThread(void *threadid)
{
    long tid = (long) threadid;

    curtid = tid + 1;

    dbgprt("calSubstringThread: ENTER\n");

    int num = num_substring(tid);

    dbgprt("calSubstringThread: EXIT num=%d\n",num);

    pthread_exit(NULL);
}

// docheck -- check with non-threaded algorithm
void
docheck(void)
{
    size_t count = 0;

    char *token = s1;
    while (1) {
        token = strstr(token,s2);
        if (token == NULL)
            break;

        count += 1;

        token += n2;
    }

    printf("docheck: count=%zu\n",count);
}

int
main(int argc, char *argv[])
{
    pthread_t threads[NUM_THREADS];
    int t, rc;

    pthread_mutex_init(&mutex,NULL);

    readf(fp);

    // get rid of newline
    s2[n2] = 0;
    dbgprt("main: s2='%s'\n",s2);

    docheck();

    for (t = 0; t < NUM_THREADS; t++) {
        rc = pthread_create(&threads[t], NULL, calSubStringThread,
            (void *) (size_t) t);
        if (rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for (t = 0; t < NUM_THREADS; t++) {
        pthread_join(threads[t], NULL);
    }

    printf("The number of substrings is: %d\n", total);

    return 0;
}

这是调试日志输出。(请注意,我根据ENTER/EXIT消息手动缩进了日志)。

==> log00 <==
[0] main: s2='is'

==> log01 <==
[1] calSubstringThread: ENTER
  [1] num_substring: ENTER
    [1] allowedOnThread: ENTER thread=0
      [1] skip_to_delim: ENTER off=479 tag=end
      [1] skip_to_delim: EXIT off=479
    [1] allowedOnThread: EXIT thread=0 off=0 end=479
  [1] num_substring: EXIT count=1
[1] calSubstringThread: EXIT num=1

==> log02 <==
[2] calSubstringThread: ENTER
  [2] num_substring: ENTER
    [2] allowedOnThread: ENTER thread=1
      [2] skip_to_delim: ENTER off=479 tag=off
      [2] skip_to_delim: EXIT off=479
      [2] skip_to_delim: ENTER off=958 tag=end
      [2] skip_to_delim: EXIT off=960
    [2] allowedOnThread: EXIT thread=1 off=479 end=960
  [2] num_substring: EXIT count=2
[2] calSubstringThread: EXIT num=2

==> log03 <==
[3] calSubstringThread: ENTER
  [3] num_substring: ENTER
    [3] allowedOnThread: ENTER thread=2
      [3] skip_to_delim: ENTER off=958 tag=off
      [3] skip_to_delim: EXIT off=960
      [3] skip_to_delim: ENTER off=1439 tag=end
      [3] skip_to_delim: EXIT off=1440
    [3] allowedOnThread: EXIT thread=2 off=960 end=1440
  [3] num_substring: EXIT count=3
[3] calSubstringThread: EXIT num=3

==> log04 <==
[4] calSubstringThread: ENTER
  [4] num_substring: ENTER
    [4] allowedOnThread: ENTER thread=3
      [4] skip_to_delim: ENTER off=1437 tag=off
      [4] skip_to_delim: EXIT off=1440
      [4] skip_to_delim: ENTER off=1919 tag=end
      [4] skip_to_delim: EXIT off=1920
    [4] allowedOnThread: EXIT thread=3 off=1440 end=1920
  [4] num_substring: EXIT count=4
[4] calSubstringThread: EXIT num=4

==> log05 <==
[5] calSubstringThread: ENTER
  [5] num_substring: ENTER
    [5] allowedOnThread: ENTER thread=4
      [5] skip_to_delim: ENTER off=1916 tag=off
      [5] skip_to_delim: EXIT off=1920
      [5] skip_to_delim: ENTER off=2399 tag=end
      [5] skip_to_delim: EXIT off=2402
    [5] allowedOnThread: EXIT thread=4 off=1920 end=2402
  [5] num_substring: EXIT count=5
[5] calSubstringThread: EXIT num=5

==> log06 <==
[6] calSubstringThread: ENTER
  [6] num_substring: ENTER
    [6] allowedOnThread: ENTER thread=5
      [6] skip_to_delim: ENTER off=2395 tag=off
      [6] skip_to_delim: EXIT off=2396
      [6] skip_to_delim: ENTER off=2875 tag=end
      [6] skip_to_delim: EXIT off=2876
    [6] allowedOnThread: EXIT thread=5 off=2396 end=2876
  [6] num_substring: EXIT count=6
[6] calSubstringThread: EXIT num=6

==> log07 <==
[7] calSubstringThread: ENTER
  [7] num_substring: ENTER
    [7] allowedOnThread: ENTER thread=6
      [7] skip_to_delim: ENTER off=2874 tag=off
      [7] skip_to_delim: EXIT off=2876
      [7] skip_to_delim: ENTER off=3355 tag=end
      [7] skip_to_delim: EXIT off=3356
    [7] allowedOnThread: EXIT thread=6 off=2876 end=3356
  [7] num_substring: EXIT count=7
[7] calSubstringThread: EXIT num=7

==> log08 <==
[8] calSubstringThread: ENTER
  [8] num_substring: ENTER
    [8] allowedOnThread: ENTER thread=7
      [8] skip_to_delim: ENTER off=3353 tag=off
      [8] skip_to_delim: EXIT off=3356
      [8] skip_to_delim: ENTER off=3835 tag=end
      [8] skip_to_delim: EXIT off=3835
    [8] allowedOnThread: EXIT thread=7 off=3356 end=3835
  [8] num_substring: EXIT count=8
[8] calSubstringThread: EXIT num=8

==> log09 <==
[9] calSubstringThread: ENTER
  [9] num_substring: ENTER
    [9] allowedOnThread: ENTER thread=8
      [9] skip_to_delim: ENTER off=3832 tag=off
      [9] skip_to_delim: EXIT off=3832
      [9] skip_to_delim: ENTER off=4311 tag=end
      [9] skip_to_delim: EXIT off=4311
    [9] allowedOnThread: EXIT thread=8 off=3832 end=4311
  [9] num_substring: EXIT count=9
[9] calSubstringThread: EXIT num=9

==> log10 <==
[10] calSubstringThread: ENTER
  [10] num_substring: ENTER
    [10] allowedOnThread: ENTER thread=9
      [10] skip_to_delim: ENTER off=4311 tag=off
      [10] skip_to_delim: EXIT off=4311
    [10] allowedOnThread: EXIT thread=9 off=4311 end=4799
  [10] num_substring: EXIT count=10
[10] calSubstringThread: EXIT num=10

非常感谢您对此进行解释和帮助。我想知道您是否可以向我解释互斥锁的工作原理以及为什么使用它是一种好习惯?- 低音批准

从 C 语法来看,这似乎是一个原子操作:

total += count;

但是,它不是。它实际上是三个操作:

temp = total;
temp += count;
total = temp;

不同的线程将按顺序执行这些。通常(例如 99.44% 的时间),这三个操作将由一个线程执行,而不受另一个线程的干扰。如果我们有两个线程(例如AB),“好的”顺序是线程操作是“很好”排序的:

thread A / cpu 0        thread B / cpu 1
--------------------    ------------------------
tempA = total;
tempA += countA;
total = tempA;
                        tempB = total;
                        tempB += countB;
                        total = tempB;

最终total是:total + countA + countB,这就是我们想要的。

但是,如果两个线程同时在不同的 CPU 上运行,它们可能会穿插这些操作。我们可以有一个序列,例如:

thread A / cpu 0        thread B / cpu 1
--------------------    ------------------------
tempA = total;
                        tempB = total;
tempA += countA;
                        tempB += countB;
total = tempA;
                        total = tempB;

在这种情况下,在序列的末尾, 的最终值total将是:total + countB[这不是我们想要的]。(ie) totalby countA[executed by thread A] 的增量将丢失/丢弃!

在这种情况下,线程正在竞争并且线程“赢得”了比赛B

使用互斥体 [或其他锁定机制或使用原子操作] 将防止这种情况发生。

[松散地] 是两个操作: “pthread_mutex_lock请求”和“授予”。如果互斥锁没有被持有,这些操作会同时发生。如果互斥锁[由另一个线程] 持有,则“授权”会及时推迟。在其他线程完成“释放”(例如)后授予它pthread_mutex_unlock。这是时间线:

thread A / cpu 0        thread B / cpu 1
--------------------    ------------------------
mutex requested
mutex granted
                        mutex requested
tempA = total;
tempA += countA;
total = tempA;
mutex released
                        mutex granted
                        tempB = total;
                        tempB += countB;
                        total = tempB;
                        mutex released

有关更详细的解释,请参阅我的答案:Windows 的 Linux 子系统中的线程差异

另一种保证原子更新的方法是使用stdatomic.h原语。请参阅我的答案:在 c 中使用互斥锁进行多线程处理并一次运行一个线程

另一种解决方案是“票证锁”。请参阅我的回答:C Pthreads - 线程安全队列实现的问题


推荐阅读