/* LSU EE 7700-2 Fall 2003 Classroom Example Program Parallel Radix Sort using pthreads To compile on Solaris: cc -mt rsortp.c -o rsortp -lpthread -lrt -fast See "sorter" function for radix sort. */ #include <stdio.h> #include <malloc.h> #include <stdlib.h> #include <strings.h> #include <pthread.h> #include <time.h> double time_fp() { struct timespec tp; clock_gettime(CLOCK_HIGHRES,&tp); return ((double)tp.tv_sec)+((double)tp.tv_nsec) * 0.000000001; } struct _barrier_lock_info { pthread_cond_t cond; pthread_mutex_t mutex; int num_in_barrier; }; typedef struct _barrier_lock_info Barrier_Lock_Info; struct barrier_info { Barrier_Lock_Info barrier_lock_info[2]; int nprocs; int umm; }; typedef struct barrier_info Barrier_Info; typedef struct app_data App_Data; struct thread_data { pthread_t tid; int start; int stop; int id; int *bins; App_Data *app; }; typedef struct thread_data Thread_Data; struct app_data { int *array, *cpy; int radix_lg; int bin_count; int mask; int amt; int nprocs; Barrier_Info bi; Thread_Data *td; }; #define debug 0 void barrier_init(Barrier_Info *bi,int nprocs) { int i; bi->nprocs = nprocs; bi->umm = 0; for(i=0; i<2; i++) { int rv; bi->barrier_lock_info[i].num_in_barrier = 0; rv = pthread_cond_init(&bi->barrier_lock_info[i].cond,NULL); if( rv ) { fprintf(stderr,"Could not initialize cond (%d)\n",rv); exit(1); } rv = pthread_mutex_init(&bi->barrier_lock_info[i].mutex,NULL); if( rv ) { fprintf(stderr,"Could not initialize mutex (%d)\n",rv); exit(1); } } } void barrier(Barrier_Info *bi) { Barrier_Lock_Info *bli = &bi->barrier_lock_info[bi->umm]; pthread_mutex_lock(&bli->mutex); if( ++bli->num_in_barrier == bi->nprocs ) { bi->umm = 1 - bi->umm; bli->num_in_barrier--; pthread_mutex_unlock(&bli->mutex); pthread_cond_broadcast(&bli->cond); } else { pthread_cond_wait(&bli->cond,&bli->mutex); bli->num_in_barrier--; pthread_mutex_unlock(&bli->mutex); } } void* sorter(void *arg) { Thread_Data *td = (Thread_Data*) arg; App_Data *app = td->app; int *from = app->array; int *to = app->cpy; int start = td->start; int stop = td->stop; int mask = app->mask; int radix_lg = app->radix_lg; int bin_count = app->bin_count; int nprocs = app->nprocs; int bins_size = bin_count * sizeof(int); int *bins = td->bins = (int*) malloc( bins_size ); int shift; for(shift=0; shift<32; shift+=radix_lg) { int i; int *swap; bzero(bins,bins_size); /* * Compute Histogram of Digit Values */ for(i=start; i<stop; i++) { int digit = ( from[i] >> shift ) & mask; bins[digit]++; } barrier(&app->bi); /* * Compute Prefix Sum */ if( td->id == 0 ) { /* Note, this could be parallelized. */ int accumulator = 0; int p; for(i=0; i<bin_count; i++) for(p=0; p<nprocs; p++) { int this_bin = app->td[p].bins[i]; app->td[p].bins[i] = accumulator; accumulator += this_bin; } } barrier(&app->bi); /* * Permute Array Elements */ for(i=start; i<stop; i++) { int digit = ( from[i] >> shift ) & mask; to[ bins[digit]++ ] = from[i]; } swap = to; to = from; from = swap; barrier(&app->bi); } if( from != app->array ) memcpy(app->array,app->cpy,app->amt*sizeof(app->cpy[0])); } void check(int *array, int size) { int i; for(i=1; i<size; i++) if( array[i] < array[i-1] ) { fprintf(stderr,"Error, element %d too small: %d %d\n", i, array[i-1], array[i] ); return; } printf("Array correctly sorted.\n"); } int main(int argv, char **argc) { int per_child; int nprocs = 8; int amt; int print_amt; int i; int pos; double amtm = ((double)(1 << 20))/1000000; double start_time; Thread_Data *td; App_Data app; pthread_attr_t attr; app.radix_lg = 4; /* * Read Command-Line Arguments */ if( argv > 1 ) nprocs = atoi(argc[1]); if( argv > 2 ) amtm = atof(argc[2]); if( argv > 3 ) app.radix_lg = atoi(argc[3]); per_child = amtm * 1000000 / nprocs; amt = per_child * nprocs; print_amt = amt < 20 ? amt : 20; printf("Running radix sort for %d threads, %d (%d per proc), %d radix_lg.\n", nprocs, amt, per_child, app.radix_lg); app.nprocs = nprocs; app.bin_count = 1 << app.radix_lg; app.mask = app.bin_count - 1; app.amt = amt; app.array = (int*) malloc( amt * sizeof(app.array[0]) ); app.cpy = (int*) malloc( amt * sizeof(app.array[0]) ); for(i=0; i<amt; i++) app.array[i] = random(); start_time = time_fp(); /* * Initialize and start child threads. */ pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); app.td = td = (Thread_Data*) malloc( sizeof(*td) * nprocs ); barrier_init(&app.bi,nprocs); for(pos=0, i=0; i<nprocs; i++) { int rv; td[i].id = i; td[i].app = &app; td[i].start = pos; td[i].stop = pos += per_child; rv = pthread_create(&td[i].tid,&attr,sorter,(void*)&td[i]); if( rv ) { fprintf(stderr,"Could not create thread, rv %d.\n",rv); exit(1); } } /* * Wait for each thread to finish. */ for(i=0; i<nprocs; i++) { int status; pthread_join( td[i].tid, (void**) &status ); if( debug ) printf("Thread %d (%d) returned with status %d.\n", td[i].tid, i, status); } if( debug ) for(i=0; i<print_amt; i++) printf(" 0x%08x\n",app.array[i]); { double et = time_fp() - start_time; printf("Took %0.3f seconds\n",et); } check(app.array,amt); return 0; }