I have this simple self-contained example of a very rudimentary 2 dimensional stencil application using OpenMP tasks on dynamic arrays to represent an issue that I am having on a problem that is less of a toy problem.
There are 2 update steps in which for each point in the array 3 values are added from another array, from the corresponding location as well as the upper and lower neighbour location. The program is executed on a NUMA CPU with 8 cores and 2 hardware threads on each NUMA node. The array initializations are parallelized and using the environment variables OMP_PLACES=threads and OMP_PROC_BIND=spread the data is evenly distributed among the nodes' memories. To avoid data races I have set up dependencies so that for every section on the second update a task can only be scheduled if the relevant tasks for the sections from the first update step are executed. The computation is correct but not NUMA aware. The affinity clause seems to be not enough to change the scheduling as it is just a hint. I am also not sure whether using the single for task creation is efficient but all I know is it is the only way to make all task sibling tasks and thus the dependencies applicable.
Is there a way in OpenMP where I could parallelize the task creation under these constraints or guide the runtime system to a more NUMA-aware task scheduling? If not, it is also okay, I am just trying to see whether there are options available that use OpenMP in a way that it is intended and not trying to break it. I already have a version that only uses worksharing loops. This is for research.
NUMA NODE 0 pus {0-7,16-23} NUMA NODE 1 pus {8-15,24-31}
Environment Variables
export OMP_PLACES=threads
export OMP_PROC_BIND=spread
#define _GNU_SOURCE // sched_getcpu(3) is glibc-specific (see the man page)
#include <sched.h>
#include <iostream>
#include <omp.h>
#include <math.h>
#include <vector>
#include <string>
typedef double value_type;
int main(int argc, char * argv[]){
std::size_t rows = 8192;
std::size_t cols = 8192;
std::size_t part_rows = 32;
std::size_t num_threads = 16;
std::size_t parts = ceil(float(rows)/part_rows);
value_type * A = (value_type *) malloc(sizeof(value_type)*rows*cols);
value_type * B = (value_type *) malloc(sizeof(value_type)*rows*cols);
value_type * C = (value_type *) malloc(sizeof(value_type)*rows*cols);
#pragma omp parallel for schedule(static)
for (int i = 0; i < rows; ++i)
for(int j = 0; j<cols; ++j){
A[i*cols+j] = 1;
B[i*cols+j] = 1;
C[i*cols+j] = 0;
}
std::vector<std::vector<std::size_t>> putasks(32, std::vector<std::size_t>(2,0));
std::cout << std::endl;
#pragma omp parallel num_threads(num_threads)
#pragma omp single
{
for(int part=0; part<parts; part++){
std::size_t row = part * part_rows;
std::size_t current_first_loc = row * cols;
//first index of the upper part in the array
std::size_t upper_part_first_loc = part != 0 ? (part-1)*part_rows*cols : current_first_loc;
//first index of the lower part in the array
std::size_t lower_part_first_loc = part != parts-1 ? (part+1)*part_rows * cols : current_first_loc;
std::size_t start = row;
std::size_t end = part == parts-1 ? rows-1 : start+part_rows;
if(part==0) start = 1;
#pragma omp task depend(in: A[current_first_loc], A[upper_part_first_loc], A[lower_part_first_loc])\
depend(out: B[current_first_loc]) affinity(A[current_first_loc], B[current_first_loc])
{
if(end <= ceil(rows/2.0))
putasks[sched_getcpu()][0]++;
else putasks[sched_getcpu()][1]++;
for(std::size_t i=start; i<end; ++i){
for(std::size_t j = 0; j < cols; ++j)
B[i*cols+j] += A[i*cols+j] + A[(i-1)*cols+j] + A[(i+1)*cols+j];
}
}
}
for(int part=0; part<parts; part++){
std::size_t row = part * part_rows;
std::size_t current_first_loc = row * cols;
std::size_t upper_part_first_loc = part != 0 ? (part-1)*part_rows*cols : current_first_loc;
std::size_t lower_part_first_loc = part != parts-1 ? (part+1)*part_rows * cols : current_first_loc;
std::size_t start = row;
std::size_t end = part == parts-1 ? rows-1 : start+part_rows;
if(part==0) start = 1;
#pragma omp task depend(in: B[current_first_loc], B[upper_part_first_loc], B[lower_part_first_loc])\
depend(out: C[current_first_loc]) affinity(B[current_first_loc], C[current_first_loc])
{
if(end <= ceil(rows/2.0))
putasks[sched_getcpu()][0]++;
else putasks[sched_getcpu()][1]++;
for(std::size_t i=start; i<end; ++i){
for(std::size_t j = 0; j < cols; ++j)
C[i*cols+j] += B[i*cols+j] + B[(i-1)*cols+j] + B[(i+1)*cols+j];
}
}
}
}
if(rows <= 16 & cols <= 16)
for(std::size_t i = 0; i < rows; ++i){
for(std::size_t j = 0; j < cols; ++j){
std::cout << C[i*cols+j] << " ";
}
std::cout << std::endl;
}
for(std::size_t i = 0; i<putasks.size(); ++i){
if(putasks[i][0]!=0 && putasks[i][1]!=0){
for(std::size_t node = 0; node < putasks[i].size(); ++node){
std::cout << "pu: " << i << " worked on ";
std::cout << putasks[i][node]<< " NODE " << node << " tasks" << std::endl;
}
std::cout << std::endl;
}
}
return 0;
}
Task Distribution Output Excerpt
pu: 1 worked on 26 NODE 0 tasks
pu: 1 worked on 12 NODE 1 tasks
pu: 2 worked on 27 NODE 0 tasks
pu: 2 worked on 13 NODE 1 tasks
...
pu: 7 worked on 26 NODE 0 tasks
pu: 7 worked on 13 NODE 1 tasks
pu: 8 worked on 10 NODE 0 tasks
pu: 8 worked on 11 NODE 1 tasks
pu: 9 worked on 8 NODE 0 tasks
pu: 9 worked on 14 NODE 1 tasks
...
pu: 15 worked on 8 NODE 0 tasks
pu: 15 worked on 12 NODE 1 tasks