IBM Books

Hitchhiker's Guide


The Answer is 42

If you're familiar with message passing parallel programming, and you're completely familiar with message passing protocols, then you already know the answer is 42, and you can skip ahead to "Don't Panic" for a discussion on using the IBM Parallel Environment for AIX tools. If you're familiar with message passing parallel programming, (or if you can just say that five times really fast), but you would like to know more about the IBM Parallel Environment for AIX message passing protocols, look at the information in "Protocols Supported" before skipping ahead to the next chapter.

For the rest of us, this section discusses some of the techniques for creating a parallel program, using message passing, as well as the various advantages and pitfalls associated with each.

This chapter is not intended to be an in-depth tutorial on writing parallel programs. Instead, it's more of an introduction to basic message passing parallel concepts; it provides just enough information to help you understand the material covered in this book. If you want more information about parallel programming concepts, you may find some of the books listed in "Related Non-IBM Publications" helpful.

Good. Now that we've gotten rid of the know-it-alls that skipped to the next section, we can reveal the answer to the question of creating a successful parallel program. No, the answer isn't really 42. That was just a ploy to get everyone else out of our hair. The answer is to start with a working sequential program. Complex sequential programs are difficult enough to get working correctly, without also having to worry about the additional complexity introduced by parallelism and message passing. The bottom line is that it's easier to convert a working serial program to parallel, than it is to create a parallel program from scratch. As you become proficient at creating parallel programs, you'll develop an awareness of which sequential techniques translate better into parallel implementations, and you can then make a point of using these techniques in your sequential programs. In this chapter, you'll find information on some of the fundamentals of creating parallel programs.

There are two common techniques for turning a sequential program into a parallel program; data decomposition and functional decomposition. Data decomposition has to do with distributing the data that the program is processing among the parallel tasks. Each task does roughly the same thing but on a different set of data. With functional decomposition, the function that the application is performing is distributed among the tasks. Each task operates on the same data but does something different. Most parallel programs don't use data decomposition or functional decomposition exclusively, but rather a mixture of the two, weighted more toward one type or the other. One way to implement either form of decomposition is through the use of message passing.


Message Passing

The message passing model of communication is typically used in distributed memory systems, where each processor node owns private memory, and is linked by an interconnection network. In the case of the SP, its switch provides the interconnection network needed for high-speed exchange of messages. With message passing, each task operates exclusively in a private environment, but must cooperate with other tasks in order to interact. In this situation, tasks must exchange messages in order to interact with one another.

The challenge of the message passing model is in reducing message traffic over the interconnection network while ensuring that the correct and updated values of the passed data are promptly available to the tasks when required. Optimizing message traffic is one way of boosting performance.

Synchronization is the act of forcing events to occur at the same time or in a certain order, while taking into account the logical dependence and the order of precedence among the tasks. The message passing model can be described as self-synchronizing because the mechanism of sending and receiving messages involves implicit synchronization points. To put it another way, a message can't be received if it has not already been sent.


Data Decomposition

A good technique for parallelizing a sequential application is to look for loops where each iteration does not depend on any prior iteration (this is also a prerequisite for either unrolling or eliminating loops). An example of a loop that has dependencies on prior iterations is the loop for computing the Factorial series. The value calculated by each iteration depends on the value resulting from the previous pass. If each iteration of a loop does not depend on a previous iteration, the data being processed can be processed in parallel, with two or more iterations being performed simultaneously.

The C program example below includes a loop with independent iterations. This example doesn't include the routines for computing the coefficient and determinant because they are not part of the parallelization at this point. Note also that this example is incomplete; you can get the entire program by following the directions in "Getting the Books and the Examples Online".

/***********************************************************************
*
* Matrix Inversion Program - serial version
*
* To compile:
* cc -o inverse_serial inverse_serial.c
*
***********************************************************************/
 
#include<stdlib.h>
#include<stdio.h>
#include<assert.h>
#include<errno.h>
 
float determinant(float **matrix,
    int size,
    int * used_rows,
    int * used_cols,
    int depth);
float coefficient(float **matrix,int size, int row, int col);
void print_matrix(FILE * fptr,float ** mat,int rows, int cols);
float test_data[8][8] =  {
    {4.0, 2.0, 4.0, 5.0, 4.0, -2.0, 4.0, 5.0},
    {4.0, 2.0, 4.0, 5.0, 3.0, 9.0, 12.0, 1.0 },
    {3.0, 9.0, -13.0, 15.0, 3.0, 9.0, 12.0, 15.0},
    {3.0, 9.0, 12.0, 15.0, 4.0, 2.0, 7.0, 5.0 },
    {2.0, 4.0, -11.0, 10.0, 2.0, 4.0, 11.0, 10.0 },
    {2.0, 4.0, 11.0, 10.0, 3.0, -5.0, 12.0, 15.0 },
    {1.0, -2.0, 4.0, 10.0, 3.0, 9.0, -12.0, 15.0 } ,
    {1.0, 2.0, 4.0, 10.0, 2.0, -4.0, -11.0, 10.0 } ,
  };
#define ROWS 8
 
int main(int argc, char **argv)
{
 
  float **matrix;
  float **inverse;
  int   rows,i,j;
  float determ;
  int * used_rows, * used_cols;
 
  rows = ROWS;
 
  /* Allocate markers to record rows and columns to be skipped */
  /* during determinant calculation                            */
  used_rows = (int *)    malloc(rows*sizeof(*used_rows));
  used_cols = (int *)    malloc(rows*sizeof(*used_cols));
 
  /* Allocate working copy of matrix and initialize it from static copy */
  matrix  =   (float **) malloc(rows*sizeof(*matrix));
  inverse =   (float **) malloc(rows*sizeof(*inverse));
  for(i=0;i<rows;i++)
    {
      matrix[i]  = (float *) malloc(rows*sizeof(**matrix));
      inverse[i] = (float *) malloc(rows*sizeof(**inverse));
      for(j=0;j<rows;j++)
          matrix[i][j] = test_data[i][j];
    }
 
  /* Compute and print determinant */
  printf("The determinant of\n\n");
  print_matrix(stdout,matrix,rows,rows);
  determ=determinant(matrix,rows,used_rows,used_cols,0);
  printf("\nis %f\n",determ);
  fflush(stdout);
  assert(determ!=0);
 
  for(i=0;i<rows;i++)
    {
      for(j=0;j<rows;j++)
        {
          inverse[j][i] = coefficient(matrix,rows,i,j)/determ;
        }
    }
 
  printf("The inverse is\n\n");
  print_matrix(stdout,inverse,rows,rows);
 
  return 0;
}

Before we talk about parallelizing the algorithm, let's look at what's necessary to create the program with the IBM Parallel Environment for AIX. The example below shows the same program, but it's now aware of PE. You do this by using three calls in the beginning of the routine, and one at the end.

The first of these calls (MPI_Init) initializes the MPI environment and the last call (MPI_Finalize) closes the environment. MPI_Comm_size sets the variable tasks to the total number of parallel tasks running this application, and MPI_Comm_rank sets me to the task ID of the particular instance of the parallel code that invoked it.
Note:MPI_Comm_size actually gets the size of the communicator you pass in and MPI_COMM_WORLD is a pre-defined communicator that includes everybody. For more information about these calls, IBM Parallel Environment for AIX: MPI Programming and Subroutine Reference or other MPI publications may be of some help. See "Encyclopedia Galactica".

/************************************************************************
*
* Matrix Inversion Program - serial version enabled for parallel environment
*
* To compile:
* mpcc -g -o inverse_parallel_enabled inverse_parallel_enabled.c
*
************************************************************************/
 
#include<stdlib.h>
#include<stdio.h>
#include<assert.h>
#include<errno.h>
#include<mpi.h>
 
float determinant(float **matrix,int size, int * used_rows, int * used_cols,
                  int depth);
float coefficient(float **matrix,int size, int row, int col);
void print_matrix(FILE * fptr,float ** mat,int rows, int cols);
float test_data[8][8] =  {
      {4.0, 2.0, 4.0, 5.0, 4.0, -2.0, 4.0, 5.0},
      {4.0, 2.0, 4.0, 5.0, 3.0, 9.0, 12.0, 1.0 },
      {3.0, 9.0, -13.0, 15.0, 3.0, 9.0, 12.0, 15.0},
      {3.0, 9.0, 12.0, 15.0, 4.0, 2.0, 7.0, 5.0 },
      {2.0, 4.0, -11.0, 10.0, 2.0, 4.0, 11.0, 10.0 },
      {2.0, 4.0, 11.0, 10.0, 3.0, -5.0, 12.0, 15.0 },
      {1.0, -2.0, 4.0, 10.0, 3.0, 9.0, -12.0, 15.0 } ,
      {1.0, 2.0, 4.0, 10.0, 2.0, -4.0, -11.0, 10.0 } ,
};
#define ROWS 8
 
int me, tasks, tag=0;
 
int main(int argc, char **argv)
{
 
  float **matrix;
  float **inverse;
  int rows,i,j;
  float determ;
  int * used_rows, * used_cols;
 
  MPI_Status status[ROWS];   /* Status of messages */
  MPI_Request req[ROWS];  /* Message IDs */
 
  MPI_Init(&argc,&argv);   /* Initialize MPI */
  MPI_Comm_size(MPI_COMM_WORLD,&tasks); /* How many parallel tasks are there?*/
  MPI_Comm_rank(MPI_COMM_WORLD,&me); /* Who am I? */
 
  rows = ROWS;
 
  /* Allocate markers to record rows and columns to be skipped */
  /* during determinant calculation                            */
  used_rows = (int *)    malloc(rows*sizeof(*used_rows));
  used_cols = (int *)    malloc(rows*sizeof(*used_cols));
 
  /* Allocate working copy of matrix and initialize it from static copy */
  matrix  =   (float **) malloc(rows*sizeof(*matrix));
  inverse =   (float **) malloc(rows*sizeof(*inverse));
  for(i=0;i<rows;i++)
    {
      matrix[i]  = (float *) malloc(rows*sizeof(**matrix));
      inverse[i] = (float *) malloc(rows*sizeof(**inverse));
      for(j=0;j<rows;j++)
      matrix[i][j] = test_data[i][j];
    }
 
  /* Compute and print determinant */
  printf("The determinant of\n\n");
  print_matrix(stdout,matrix,rows,rows);
  determ=determinant(matrix,rows,used_rows,used_cols,0);
  printf("\nis %f\n",determ);
  fflush(stdout);
 
  for(i=0;i<rows;i++)
    {
      for(j=0;j<rows;j++)
        {
          inverse[j][i] = coefficient(matrix,rows,i,j)/determ;
        }
    }
 
  printf("The inverse is\n\n");
  print_matrix(stdout,inverse,rows,rows);
 
  /* Wait for all parallel tasks to get here, then quit */
  MPI_Barrier(MPI_COMM_WORLD);
  MPI_Finalize();
 
  return 0;
}
 
float determinant(float **matrix,int size, int * used_rows, int * used_cols,
                  int depth)
  {
    int col1, col2, row1, row2;
    int j,k;
    float total=0;
    int sign = 1;
 
    /* Find the first unused row */
    for(row1=0;row1<size;row1++)
      {
        for(k=0;k<depth;k++)
          {
            if(row1==used_rows[k]) break;
          }
        if(k>=depth)  /* this row is not used */
          break;
      }
    assert(row1<size);
 
    if(depth==(size-2))
      {
/* There are only 2 unused rows/columns left */
 
/* Find the second unused row */
for(row2=row1+1;row2<size;row2++)
  {
    for(k=0;k<depth;k++)
      {
        if(row2==used_rows[k]) break;
      }
    if(k>=depth)  /* this row is not used */
      break;
  }
assert(row2<size);
 
/* Find the first unused column */
for(col1=0;col1<size;col1++)
  {
    for(k=0;k<depth;k++)
      {
        if(col1==used_cols[k]) break;
      }
    if(k>=depth)  /* this column is not used */
      break;
  }
assert(col1<size);
 
/* Find the second unused column */
for(col2=col1+1;col2<size;col2++)
  {
    for(k=0;k<depth;k++)
      {
        if(col2==used_cols[k]) break;
      }
    if(k>=depth)  /* this column is not used */
      break;
  }
assert(col2<size);
 
/* Determinant = m11*m22-m12*m21 */
return matrix[row1][col1]*matrix[row2][col2]
-matrix[row2][col1]*matrix[row1] [col2];
     }
 
   /* There are more than 2 rows/columns in the matrix being processed  */
   /* Compute the determinant as the sum of the product of each element */
   /* in the first row and the determinant of the matrix with its row   */
   /* and column removed                                                */
   total = 0;
 
   used_rows[depth] = row1;
   for(col1=0;col1<size;col1++)
     {
       for(k=0;k<depth;k++)
         {
           if(col1==used_cols[k]) break;
         }
   if(k<depth)  /* This column is used */
     continue;
   used_cols[depth] = col1;
  total += sign*matrix[row1][col1]*determinant(matrix,size,
 used_rows,used_cols,depth+1);
      sign=(sign==1)?-1:1;
    }
  return total;
 }
 
void print_matrix(FILE * fptr,float ** mat,int rows, int cols)
{
  int i,j;
  for(i=0;i<rows;i++)
    {
      for(j=0;j<cols;j++)
        {
          fprintf(fptr,"%10.4f ",mat[i][j]);
        }
      fprintf(fptr,"\n");
    }
  fflush(fptr);
}
 
float coefficient(float **matrix,int size, int row, int col)
{
  float coef;
  int * ur, *uc;
 
  ur = malloc(size*sizeof(matrix));
  uc = malloc(size*sizeof(matrix));
  ur[0]=row;
  uc[0]=col;
  coef = (((row+col)%2)?-1:1)*determinant(matrix,size,ur,uc,1);
  return coef;
}

This particular example is pretty ridiculous because each parallel task is going to determine the entire inverse matrix, and they're all going to print it out. As we saw in the previous section, the output of all the tasks will be intermixed, so it will be difficult to figure out what the answer really is.

A better approach is to figure out a way to distribute the work among several parallel tasks and collect the results when they're done. In this example, the loop that computes the elements of the inverse matrix simply goes through the elements of the inverse matrix, computes the coefficient, and divides it by the determinant of the matrix. Since there's no relationship between elements of the inverse matrix, they can all be computed in parallel. Keep in mind that every communication call has an associated cost, so you need to balance the benefit of parallelism with the cost of communication. If we were to totally parallelize the inverse matrix element computation, each element would be derived by a separate task. The cost of collecting those individual values back into the inverse matrix would be significant, and might outweigh the benefit of having reduced the computation cost and time by running the job in parallel. So, instead, we're going to compute the elements of each row in parallel, and send the values back, one row at a time. This way we spread some of the communication overhead over several data values. In our case, we'll execute loop 1 in parallel in this next example.

*************************************************************************
*
* Matrix Inversion Program - First parallel implementation
* To compile:
* mpcc -g -o inverse_parallel inverse_parallel.c
*
*************************************************************************
 
#include<stdlib.h>
#include<stdio.h>
#include<assert.h>
#include<errno.h>
#include<mpi.h>
float determinant(float **matrix,int size, int * used_rows, 
                  int * used_cols, int depth);
float coefficient(float **matrix,int size, int row, int col);
void print_matrix(FILE * fptr,float ** mat,int rows, int cols);
 
float test_data[8][8] =  {
    {4.0, 2.0, 4.0, 5.0, 4.0, -2.0, 4.0, 5.0},
    {4.0, 2.0, 4.0, 5.0, 3.0, 9.0, 12.0, 1.0 },
    {3.0, 9.0, -13.0, 15.0, 3.0, 9.0, 12.0, 15.0},
    {3.0, 9.0, 12.0, 15.0, 4.0, 2.0, 7.0, 5.0 },
    {2.0, 4.0, -11.0, 10.0, 2.0, 4.0, 11.0, 10.0 },
    {2.0, 4.0, 11.0, 10.0, 3.0, -5.0, 12.0, 15.0 },
    {1.0, -2.0, 4.0, 10.0, 3.0, 9.0, -12.0, 15.0 },
    {1.0, 2.0, 4.0, 10.0, 2.0, -4.0, -11.0, 10.0 } ,
 
};
#define ROWS 8
int me, tasks, tag=0;
 
int main(int argc, char **argv)
{
 
  float **matrix;
  float **inverse;
  int rows,i,j;
  float determ;
  int * used_rows, * used_cols;
 
  MPI_Status status[ROWS];  /* Status of messages */
  MPI_Request req[ROWS];  /* Message IDs */
 
  MPI_Init(&argc,&argv);  /* Initialize MPI */
  MPI_Comm_size(MPI_COMM_WORLD,&tasks);  /* How many parallel tasks are there?*/
  MPI_Comm_rank(MPI_COMM_WORLD,&me);  /* Who am I? */
 
  rows = ROWS;
 
  /* We need exactly one task for each row of the matrix plus one task */
  /* to act as coordinator.  If we don't have this, the last task      */
  /* reports the error (so everybody doesn't put out the same message  */
  if(tasks!=rows+1)
    {
      if(me==tasks-1)
  fprintf(stderr,"%d tasks required for this demo"
  "(one more than the number of rows in matrix\n",rows+1)";
       exit(-1);
     }
  /* Allocate markers to record rows and columns to be skipped */
  /* during determinant calculation                            */
  used_rows = (int *)    malloc(rows*sizeof(*used_rows));
  used_cols = (int *)    malloc(rows*sizeof(*used_cols));
 
  /* Allocate working copy of matrix and initialize it from static copy */
  matrix = (float **) malloc(rows*sizeof(*matrix));
  for(i=0;i<rows;i++)
   {
    matrix[i] = (float *) malloc(rows*sizeof(**matrix));
    for(j=0;j<rows;j++)
     matrix[i][j] = test_data[i][j];
   }
 
  /* Everyone computes the determinant (to avoid message transmission) */
  determ=determinant(matrix,rows,used_rows,used_cols,0);
 
  if(me==tasks-1)
    {/* The last task acts as coordinator */
     inverse = (float**) malloc(rows*sizeof(*inverse));
     for(i=0;i<rows;i++)
       {
        inverse[i] = (float *) malloc(rows*sizeof(**inverse));
       }
     /* Print the determinant */
     printf("The determinant of\n\n");
     print_matrix(stdout,matrix,rows,rows);
     printf("\nis %f\n",determ);
     /* Collect the rows of the inverse matrix from the other tasks */
     /* First, post a receive from each task into the appropriate row */
     for(i=0;i<rows;i++)
       }
        MPI_Irecv(inverse[i],rows,MPI_REAL,i,tag,MPI_COMM_WORLD,&(req[i]));
       }
     /* Then wait for all the receives to complete */
     MPI_Waitall(rows,req,status);
     printf("The inverse is\n\n");
     print_matrix(stdout,inverse,rows,rows);
    }
   else
    {/* All the other tasks compute a row of the inverse matrix */
     int dest = tasks-1;
     float *one_row;
     int size = rows*sizeof(*one_row);
 
     one_row = (float*) malloc(size);
     for(j=0;j<rows;j++)
       {
         one_row[j] = coefficient(matrix,rows,j,me)/determ;
       }
     /* Send the row back to the coordinator */
     MPI_Send(one_row,rows,MPI_REAL,dest,tag,MPI_COMM_WORLD);
    }
/* Wait for all parallel tasks to get here, then quit */
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
exit(0);

Functional Decomposition

Parallel servers and data mining applications are examples of functional decomposition. With functional decomposition, the function that the application is performing is distributed among the tasks. Each task operates on the same data but does something different. The sine series algorithm is also an example of functional decomposition. With this algorithm, the work being done by each task is trivial. The cost of distributing data to the parallel tasks could outweigh the value of running the program in parallel, and parallelism would increase total time. Another approach to parallelism is to invoke different functions, each of which processes all of the data simultaneously. This is possible as long as the final or intermediate results of any function are not required by another function. For example, searching a matrix for the largest and smallest values as well as a specific value could be done in parallel.

This is a simple example, but suppose the elements of the matrix were arrays of polynomial coefficients, and the search involved actually evaluating different polynomial equations using the same coefficients. In this case, it would make sense to evaluate each equation separately.

On a simpler scale, let's look at the series for the sine function:



View figure.

The serial approach to solving this problem is to loop through the number of terms desired, accumulating the factorial value and the sine value. When the appropriate number of terms has been computed, the loop exits. The following example does exactly this. In this example, we have an array of values for which we want the sine, and an outer loop would repeat this process for each element of the array. Since we don't want to recompute the factorial each time, we need to allocate an array to hold the factorial values and compute them outside the main loop.

/************************************************************************
*
* Series Evaluation - serial version
*
* To compile:
* cc -o series_serial series_serial.c -lm
*
************************************************************************/
 
#include<stdlib.h>
#include<stdio.h>
#include<math.h>
 
double angle[] = { 0.0, 0.1*M_PI, 0.2*M_PI, 0.3*M_PI, 0.4*M_PI,
       0.5*M_PI, 0.6*M_PI, 0.7*M_PI, 0.8*M_PI, 0.9*M_PI, M_PI };
 
#define TERMS 8
 
int main(int argc, char **argv)
{
  double divisor[TERMS], sine;
  int a, t, angles = sizeof(angle)/sizeof(angle[0]);
 
  /* Initialize denominators of series terms */
  divisor[0] = 1;
  for(t=1;t<TERMS;t++)
    {
      divisor[t] = -2*t*(2*t+1)*divisor[t-1];
    }
 
  /* Compute sine of each angle */
  for(a=0;a<angles;a++)
    {
      sine = 0;
      /* Sum the terms of the series */
      for(t=0;t<TERMS;t++)
        {
          sine += pow(angle[a],(2*t+1))/divisor[t];
        }
      printf("sin(%lf) + %lf\n",angle[a],sine);
        }
}

In a parallel environment, we could assign each term to one task and just accumulate the results on a separate node. In fact, that's what the following example does.

/************************************************************************
*
* Series Evaluation - parallel version
*
* To compile:
* mpcc -g -o series_parallel series_parallel.c -lm
*
************************************************************************/
 
#include<stdlib.h>
#include<stdio.h>
#include<math.h>
#include<mpi.h>
 
double angle[] = { 0.0, 0.1*M_PI, 0.2*M_PI, 0.3*M_PI, 0.4*M_PI,
       0.5*M_PI, 0.6*M_PI, 0.7*M_PI, 0.8*M_PI, 0.9*M_PI, M_PI };
 
int main(int argc, char **argv)
{
  double data, divisor, partial, sine;
  int a, t, angles = sizeof(angle)/sizeof(angle[0]);
  int me, tasks, term;
 
  MPI_Init(&argc,&argv);    /* Initialize MPI */
  MPI_Comm_size(MPI_COMM_WORLD,&tasks); /* How many parallel tasks are there?*/
  MPI_Comm_rank(MPI_COMM_WORLD,&me);   /* Who am I? */
 
  term = 2*me+1;   /* Each task computes a term */
  /* Scan the factorial terms through the group members   */
  /* Each member will effectively multiply the product of */
  /* the result of all previous members by its factorial  */
  /* term, resulting in the factorial up to that point    */
  if(me==0)
    data = 1.0;
  else
    data = -(term-1)*term;
  MPI_Scan(&data,&divisor,1,MPI_DOUBLE,MPI_PROD,MPI_COMM_WORLD);
 
  /* Compute sine of each angle */
  for(a=0;a<angles;a++)
    {
      partial = pow(angle[a],term)/divisor;
      /* Pass all the partials back to task 0 and   */
      /* accumulate them with the MPI_SUM operation */
      MPI_Reduce(&partial,&sine,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);
      /* The first task has the total value */
      if(me==0)
        {
          printf("sin(%lf) + %lf\n",angle[a],sine);
        }
    }
  MPI_Finalize();
}

With this approach, each task i uses its position in the MPI_COMM_WORLD communicator group to compute the value of one term. It first computes its working value as 2i+1 and calculates the factorial of this value. Since (2i+1)! is (2i-1)! x 2i x (2i+1), if each task could get the factorial value computed by the previous task, all it would have to do is multiply it by 2i x (2i+1). Fortunately, MPI provides the capability to do this with the MPI_Scan function. When MPI_Scan is invoked on the first task in a communication group, the result is the input data to MPI_Scan. When MPI_Scan is invoked on subsequent members of the group, the result is obtained by invoking a function on the result of the previous member of the group and its input data.

Note that the MPI standard, as documented in MPI: A Message-Passing Interface Standard, Version 1.1, available from the University of Tennesee, does not specify how the scan function is to be implemented, so a particular implementation does not have to obtain the result from one task and pass it on to the next for processing. This is, however, a convenient way of visualizing the scan function, and the remainder of our discussion will assume this is happening.

In our example, the function invoked is the built-in multiplication function, MPI_PROD. Task 0 (which is computing 1!) sets its result to 1. Task 2 is computing 3! which it obtains by multiplying 2 x 3 by 1! (the result of Task 0). Task 3 multiplies 3! (the result of Task 2) by 4 x 5 to get 5!. This continues until all the tasks have computed their factorial values. The input data to the MPI_Scan calls is made negative so the signs of the divisors will alternate between plus and minus.

Once the divisor for a term has been computed, the loop through all the angles (&theta.) can be done. The partial term is computed as:



View figure.

Then, MPI_Reduce is called which is similar to MPI_Scan except that instead of calling a function on each task, the tasks send their raw data to Task 0, which invokes the function on all data values. The function being invoked in the example is MPI_SUM which just adds the data values from all of the tasks. Then, Task 0 prints out the result.

Duplication Versus Redundancy

In ***, each task goes through the process of allocating the matrix and copying the initialization data into it. So why doesn't one task do this and send the result to all the other tasks? This example has a trivial initialization process, but in a situation where initialization requires complex time-consuming calculations, this question is even more important.

In order to understand the answer to this question and, more importantly, be able to apply the understanding to answering the question for other applications, you need to stop and consider the application as a whole. If one task of a parallel application takes on the role of initializer, two things happen. First, all of the other tasks must wait for the initializer to complete (assuming that no work can be done until initialization is completed). Second, some sort of communication must occur in order to get the results of initialization distributed to all the other tasks. This not only means that there's nothing for the other tasks to do while one task is doing the initializing, there's also a cost associated with sending the results out. Although replicating the initialization process on each of the parallel tasks seems like unnecessary duplication, it allows the tasks to start processing more quickly because they don't have to wait to receive the data.

So, should all initialization be done in parallel? Not necessarily. You have to keep the big picture in mind when deciding. If the initialization is just computation and setup based on input parameters, each parallel task can initialize independently. Although this seems counter-intuitive at first, because the effort is redundant, for the reasons given above, it's the right answer. Eventually you'll get used to it. However, if initialization requires access to system resources that are shared by all the parallel tasks (such as file systems and networks), having each task attempt to obtain the resources will create contention in the system and hinder the initialization process. In this case, it makes sense for one task to access the system resources on behalf of the entire application. In fact, if multiple system resources are required, you could have multiple tasks access each of the resources in parallel. Once the data has been obtained from the resource, you need to decide whether to share the raw data among the tasks and have each task process it, or have one task perform the initialization processing and distribute the results to all the other tasks. You can base this decision whether the amount of data increases or decreases during the initialization processing. Of course, you want to transmit the smaller amount.

So, the bottom line is that duplicating the same work on all the remote tasks (which is not the same as redundancy, which implies something can be eliminated) is not bad if:

Protocols Supported

To perform data communication, the IBM Parallel Environment for AIX program product interfaces with the Message Passing Subsystem, the communication software that runs on the SP Switch adapters (if the SP Switch is available and specified). The Message Passing Subsystem interfaces with a low level protocol, running in the user space (User Space protocol), which offers a low-latency and high-bandwidth communication path to user applications, running over the SP system's SP Switch. The Message Passing Subsystem also interfaces with the IP layer.

For optimal performance, PE uses the User Space (US) protocol as its default communication path. However, PE also lets you run parallel applications that use the IP interface of the Message Passing Subsystem.

The User Space interface allows user applications to take full advantage of the SP Switch, and should be used whenever communication is a critical issue (for instance, when running a parallel application in a production environment). With LoadLeveler, the User Space interface can be used by more than one process per node at a given time. With the Resource Manager, this interface cannot be used by more than one process per node at a given time (this means that only one PE session can run on a set of nodes, and only one component of the parallel program can run on each node).

The IP interface doesn't perform as well with the Message Passing Subsystem as the US interface does, but it supports multiple processes. This means that although you are still restricted to one process per node, you can have multiple sessions, each with one process. As a result, the IP interface can be used in development or test environments, where more attention is paid to the correctness of the parallel program than to its speed-up, and therefore, more users can work on the same nodes at a given time.

In both cases, data exchange always occurs between processes, without involving the POE Partition Manager daemon.

To Thread or Not to Thread -- Protocol Implications

If you are unfamiliar with POSIX threads, don't try to learn both threads and MPI all at once. Get some experience writing and debugging serial multi-threaded programs first, then tackle parallel multi-threaded programs.

The MPI library exists in both threaded and non-threaded versions. The non-threaded library is included by the mpcc, mpCC, and mpxlf compiler scripts. The non-threaded library is called the signal handling library because it uses AIX signals to ensure that messages are transmitted. The threaded library is included by the mpcc_r, mpCC_r, and mpxlf_r compiler scripts.

It is important to note that while a threaded program has more than one independent instruction stream, all threads share the same address space, file, and environment variables. In addition, all the threads in a threaded MPI program have the same MPI communicators, data types, ranks, and so on.

In a threaded MPI program, the MPI_Init routine must be called before any thread can make an MPI call, and all MPI calls must be completed before MPI_Finalize is called. The principal difference between threaded libraries and non-threaded libraries is that, in a threaded library, more than one blocking call may be in progress at any given time.

The underlying communication subsystem provides thread-dispatching, so that all blocking messages are given a chance to run when a message completes.

Note that the underlying communication subsystem provides a thread-safe message passing environment, but only one SMP processor at a time is actually copying the data between memory and the adapter.

The signal handling library has registered signal handlers for SIGALRM, SIGPIPE, (User Space only), and SIGIO. The threaded library creates the following service threads:

The service threads above are terminated when MPI_Finalize is called. These threads are not available to end users, except, however, the packet interrupt thread will call the signal handling function that is registered by the user to handle SIGIO, if any.


Checkpointing and Restarting a Parallel Program

With the help of LoadLeveler, PE provides a mechanism for temporarily saving the state of a parallel program at a specific point (checkpointing), and then later restarting it from the saved state. When a program is checkpointed, the checkpointing function captures the state of the application as well as all data, and saves it in a file. When the program is restarted, the restart function retrieves the application information from the file it saved, and the program then starts running again from the place at which it was saved.

Limitations

When checkpointing a program, there are a few limitations you need to be aware of. You can only checkpoint POE and MPI applications that are submitted under LoadLeveler in batch mode; PE does not support checkpointing of interactive POE applications.

It is important to note that since the checkpointing library is part of LoadLeveler, and only POE batch jobs submitted with LoadLeveler are supported, LoadLeveler is required for checkpointing parallel programs. For more information on checkpointing limitations, see IBM Parallel Environment for AIX: MPI Programming and Subroutine Reference or IBM LoadLeveler for AIX: Using and Administering (SA22-7311)

How Checkpointing Works

Checkpointing occurs when an application calls the PE function mp_chkpt() in each task. Note that a program that calls mp_chkpt() must first be compiled with one of the POE checkpoint compile scripts (mpcc_chkpt, mpCC_chkpt, or mpxlf_chkpt). Before you submit the application, you first need to set the MP_CHECKDIR and MP_CHECKFILE POE environment variables to define the path name of the checkpoint file.

During checkpoint processing, each task executes the application, up to the point of the mp_chkpt() function call. At that point, the state and program data is written to the checkpoint file, which you defined with the MP_CHECKDIR and MP_CHECKFILE environment variables. The tasks then continue to execute the application.

When the application is restarted, the MP_CHECKDIR and MP_CHECKFILE POE environment variables point to the checkpoint file that was previously stopped. The application can be restarted on either the same or a different set of nodes, but the number of tasks must remain the same. When the restart function restarts a program, it retrieves the program state and data information from the checkpoint file. Note also that the restart function restores file pointers to the points at which the checkpoint occurred, but it does not restore the file content.

Since large data files are often produced as a result of checkpointing a program, you need to consider the amount of available space in your filesystem. You should also consider the type of filesystem. Writing and reading checkpointing files may yield better performance on Journaled File Systems (JFS) or General Parallel File Systems (GPFS) than on Networked File Systems (NFS), Distributed File Systems (DFS), or Andrew File Systems (AFS).


[ Top of Page | Previous Page | Next Page | Table of Contents | Index ]