0

I discovered that openmp doesn't support while loops( or at least doesn't like them too much). And also doesn't like the ' != ' operator.

I have this bit of code.

int count = 1;
#pragma omp parallel for
    while ( fgets(buff, BUFF_SIZE, f) != NULL )
    {
        len = strlen(buff);
        int sequence_counter = segment_read(buff,len,count);
        if (sequence_counter == 1)
        {
            count_of_reads++;
            printf("\n Total No. of reads: %d \n",count_of_reads);
        }
    count++;
    }

Any clues as to how to manage this ? I read somewhere ( another post on stackoverflow included) that I can use a pipeline. What is that ? and how to implement it ?

3
  • Provide a link to which stackoverflow post you read that, please. Commented May 29, 2013 at 15:14
  • 1
    @Shahbaz, I think he may be referring to this SO post stackoverflow.com/questions/8121077/… Commented May 30, 2013 at 8:29
  • actually ... this one .. stackoverflow.com/questions/7532067/… but thats also relevant ! Commented May 30, 2013 at 9:48

3 Answers 3

14

It's too bad people are so quick to select the best answer. Here is my answer.
First, you should read the file into a buffer with something like fread. This is very quick. An example of how to do this can be found here http://www.cplusplus.com/reference/cstdio/fread/

Then you can operate on the buffer in parallel with OpenMP. I have implemented most of this for you. Below is the code. You did not provide the segment_read function so I created a dummy one. I used a few functions from C++ such as std::vector and std::sort but with a little more work you could do this in pure C as well.

Edit: I edited this code and was able to remove the sorting and critical section.

I compiled with g++ foo.cpp -o foo -fopenmp -O3

#include <stdio.h>
#include <omp.h>
#include <vector>

using namespace std;

int segment_read(char *buff, const int len, const int count) {
  return 1;  
}

void foo(char* buffer, size_t size) {
    int count_of_reads = 0;
    int count = 1;
    std::vector<int> *posa;
    int nthreads;

    #pragma omp parallel 
    {
        nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();
        #pragma omp single 
        {
            posa = new vector<int>[nthreads];
            posa[0].push_back(0);
        }

        //get the number of lines and end of line position
        #pragma omp for reduction(+: count)
        for(int i=0; i<size; i++) {
            if(buffer[i] == '\n') { //should add EOF as well to be safe
                count++;
                posa[ithread].push_back(i);
            }
        }

        #pragma omp for     
        for(int i=1; i<count ;i++) {    
            const int len = posa[ithread][i] - posa[ithread][i-1];
            char* buff = &buffer[posa[ithread][i-1]];
            const int sequence_counter = segment_read(buff,len,i);
            if (sequence_counter == 1) {
                #pragma omp atomic
                count_of_reads++;
                printf("\n Total No. of reads: %d \n",count_of_reads);
            }

        }
    }
    delete[] posa;
}

int main () {
  FILE * pFile;
  long lSize;
  char * buffer;
  size_t result;

  pFile = fopen ( "myfile.txt" , "rb" );
  if (pFile==NULL) {fputs ("File error",stderr); exit (1);}

  // obtain file size:
  fseek (pFile , 0 , SEEK_END);
  lSize = ftell (pFile);
  rewind (pFile);

  // allocate memory to contain the whole file:
  buffer = (char*) malloc (sizeof(char)*lSize);
  if (buffer == NULL) {fputs ("Memory error",stderr); exit (2);}

  // copy the file into the buffer:
  result = fread (buffer,1,lSize,pFile);
  if (result != lSize) {fputs ("Reading error",stderr); exit (3);}

  /* the whole file is now loaded in the memory buffer. */
  foo(buffer, result);
  // terminate


  fclose (pFile);
  free (buffer);
  return 0;
}
Sign up to request clarification or add additional context in comments.

8 Comments

i absolutely adore this answer. Sorry for the quick "best answer". most questions seem to get them asap.
I edited the code (again), the inner loop over the threads was wrong to do. The main problem I was struggling with is that the threads come in randomly so segment_read is likely not being called in order. That may not be a problem. The good news is that the variable with the position is filled in order. In other words posa[0] is the vector of the lowest positions and posa[7] (using 8 threads) is the vector with the highest positions. So if you need the positions in order you got them. Originally, I used sort() and a critical section to do this but the latest code does not need that.
thats been my problem mostly, I wanted the lines to be read sequentially, but some clauses in segment_read might terminate that thread early. Would that have any implications ? My whole idea was the entire segment_read function be running in parallel. i have a 8 core machine, so you can assume 8 segment_reads running in 8 different lines of the file.
The lines are read sequentially within a thread. The threads, however, are run randomly. But as long as you save the results for each thread (like I did in posa) then you can loop over them in the end and get the results sequentially. Threads terminating early won't make any difference except you may be able to make further optimizations (such as trying schedule(dynamic)).
any clues for replacing this part ? posa = new vector<int>[nthreads]; posa[0].push_back(0);
|
4

One way to implement "parallel while" in OpenMP is to use a while loop that create tasks. Here is a general sketch:

void foo() {
    while( Foo* f = get_next_thing() ) {
#pragma omp task firstprivate(f)
        bar(f);
    }
#pragma omp taskwait
}

For the specific case of looping over fgets, note that fgets has inherently sequential semantics (it gets the "next" line), so it would need to be called before launching the task. It would also be important for each task to operate on its own copy of the data returned by fgets, so that a call to fgets does not overwrite the buffer being operated on by a previous task.

3 Comments

Could those tasks invoke functions ? i.e. a task calling a function, performing the function on the char array, and return a value ?
Yes, the tasks can invoke functions. Where you have to be careful is making sure no two concurrent tasks interfere with each other. For example, if I forgot to write "firstprivate(f) in my example, then when bar(f) got around to actually launching the value of f might have already disappeared or been overwritten by the next iteration.
The way f is declared, it is firstprivate by default and the firstprivate clause in the task construct is redundant.
1

First, even though it is very close, but openmp doesn't magically make your code parallel. It works with for because for has lower and upper bounds that it can understand. Openmp uses those bounds to divide work among different threads.

There is no such thing possible with a while loop.

Second, how do you expect your task to be parallelized? You are reading from a file, where probably sequential access is going to give you better performance than parallel access. You might parallelize segment_read (based on its implementation).

Alternatively, you may want to overlap file read with processing. For that, you need to use more low level functions such as Unix's open and read functions. Then, do asynchronous reads, meaning you send a read request, process the last read block and then wait for the read request to finish. Search for "linux asynchronous io" for example to read more on this.

Using a pipe might not actually help you much. That would depend on many internals of the pipe that I'm not very familiar with. However, if you have a big enough memory, you may also want to consider loading the whole data first, then processing it. That way, loading the data is done as fast as possible (sequentially) and then you can parallelize its processing.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.