From befaee6dd8b2a72de9e0461fe2ec1f36e9f88f3c Mon Sep 17 00:00:00 2001 From: "Field G. Van Zee" Date: Tue, 18 Jul 2017 17:56:00 -0500 Subject: [PATCH] Updated openmp/pthread barriers with GNU atomics. Details: - Updated the non-tree openmp and pthreads barriers defined in bli_thrcomm_openmp.c and bli_thrcomm_pthreads.c to instead call a common implementation in bli_thrcomm.c, bli_thrcomm_barrier_atomic(). This new implementation goes through the same motions as the previous codes, but protects its loads and increments with GNU atomic built-ins. These atomic statements take memory ordering parameters that allow us to specify just enough constraints for the barrier to work as intended on weakly-ordered hardware. The prior implementation was only guaranteed to work on systems with strongly- ordered memory. (Thanks to Devin Matthews for suggesting this change and his crash-course in atomics and memory ordering.) - Removed 'volatile' from structs' barrier field declarations in bli_thrcomm_*.h. - Updated bli_thrcomm_pthread.? files to use renamed struct barrier fields consistent with that of the _openmp.? files. - Updated other bli_thrcomm_* files to rename "communicator" variables to simply "comm". --- frame/thread/bli_thrcomm.c | 56 +++++++++++++++++--- frame/thread/bli_thrcomm.h | 12 +++-- frame/thread/bli_thrcomm_openmp.c | 65 ++++++++++++----------- frame/thread/bli_thrcomm_openmp.h | 9 ++-- frame/thread/bli_thrcomm_pthreads.c | 81 +++++++++++++++-------------- frame/thread/bli_thrcomm_pthreads.h | 11 ++-- frame/thread/bli_thrcomm_single.c | 26 ++++----- 7 files changed, 157 insertions(+), 103 deletions(-) diff --git a/frame/thread/bli_thrcomm.c b/frame/thread/bli_thrcomm.c index f45827efd..dac705cfa 100644 --- a/frame/thread/bli_thrcomm.c +++ b/frame/thread/bli_thrcomm.c @@ -36,19 +36,63 @@ void* bli_thrcomm_bcast ( - thrcomm_t* communicator, + thrcomm_t* comm, dim_t id, void* to_send ) { - if ( communicator == NULL || communicator->n_threads == 1 ) return to_send; + if ( comm == NULL || comm->n_threads == 1 ) return to_send; - if ( id == 0 ) communicator->sent_object = to_send; + if ( id == 0 ) comm->sent_object = to_send; - bli_thrcomm_barrier( communicator, id ); - void* object = communicator->sent_object; - bli_thrcomm_barrier( communicator, id ); + bli_thrcomm_barrier( comm, id ); + void* object = comm->sent_object; + bli_thrcomm_barrier( comm, id ); return object; } +void bli_thrcomm_barrier_atomic( thrcomm_t* comm, dim_t t_id ) +{ + // Return early if the comm is NULL or if there is only one + // thread participating. + if ( comm == NULL || comm->n_threads == 1 ) return; + + // Read the "sense" variable. This variable is akin to a unique ID for + // the current barrier. The first n-1 threads will spin on this variable + // until it changes. The sense variable gets incremented by the last + // thread to enter the barrier, just before it exits. But it turns out + // that you don't need many unique IDs before you can wrap around. In + // fact, if everything else is working, a binary variable is sufficient, + // which is what we do here (i.e., 0 is incremented to 1, which is then + // decremented back to 0, and so forth). + bool_t orig_sense = __atomic_load_n( &comm->barrier_sense, __ATOMIC_RELAXED ); + + // Register ourselves (the current thread) as having arrived by + // incrementing the barrier_threads_arrived variable. We must perform + // this increment (and a subsequent read) atomically. + dim_t my_threads_arrived = + __atomic_add_fetch( &comm->barrier_threads_arrived, 1, __ATOMIC_ACQ_REL ); + + // If the current thread was the last thread to have arrived, then + // it will take actions that effectively ends and resets the barrier. + if ( my_threads_arrived == comm->n_threads ) + { + // Reset the variable tracking the number of threads that have arrived + // to zero (which returns the barrier to the "empty" state. Then + // atomically toggle the barrier sense variable. This will signal to + // the other threads (which are spinning in the branch elow) that it + // is now safe to exit the barrier. + comm->barrier_threads_arrived = 0; + __atomic_fetch_xor( &comm->barrier_sense, 1, __ATOMIC_RELEASE ); + } + else + { + // If the current thread is NOT the last thread to have arrived, then + // it spins on the sense variable until that sense variable changes at + // which time these threads will exit the barrier. + while ( __atomic_load_n( &comm->barrier_sense, __ATOMIC_ACQUIRE ) == orig_sense ) + ; // Empty loop body. + } +} + diff --git a/frame/thread/bli_thrcomm.h b/frame/thread/bli_thrcomm.h index 593f8d7fa..59fbc6576 100644 --- a/frame/thread/bli_thrcomm.h +++ b/frame/thread/bli_thrcomm.h @@ -49,11 +49,13 @@ // Thread communicator prototypes. thrcomm_t* bli_thrcomm_create( dim_t n_threads ); -void bli_thrcomm_free( thrcomm_t* communicator ); -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads ); -void bli_thrcomm_cleanup( thrcomm_t* communicator ); -void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t thread_id ); -void* bli_thrcomm_bcast( thrcomm_t* communicator, dim_t inside_id, void* to_send ); +void bli_thrcomm_free( thrcomm_t* comm ); +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads ); +void bli_thrcomm_cleanup( thrcomm_t* comm ); +void bli_thrcomm_barrier( thrcomm_t* comm, dim_t thread_id ); +void* bli_thrcomm_bcast( thrcomm_t* comm, dim_t inside_id, void* to_send ); + +void bli_thrcomm_barrier_atomic( thrcomm_t* comm, dim_t t_id ); #endif diff --git a/frame/thread/bli_thrcomm_openmp.c b/frame/thread/bli_thrcomm_openmp.c index 0882d1659..5777c5b6d 100644 --- a/frame/thread/bli_thrcomm_openmp.c +++ b/frame/thread/bli_thrcomm_openmp.c @@ -44,63 +44,66 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads ) return comm; } -void bli_thrcomm_free( thrcomm_t* communicator ) +void bli_thrcomm_free( thrcomm_t* comm ) { - if ( communicator == NULL ) return; - bli_thrcomm_cleanup( communicator ); - bli_free_intl( communicator ); + if ( comm == NULL ) return; + bli_thrcomm_cleanup( comm ); + bli_free_intl( comm ); } #ifndef BLIS_TREE_BARRIER -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads) +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads) { - if ( communicator == NULL ) return; - communicator->sent_object = NULL; - communicator->n_threads = n_threads; - communicator->barrier_sense = 0; - communicator->barrier_threads_arrived = 0; + if ( comm == NULL ) return; + comm->sent_object = NULL; + comm->n_threads = n_threads; + comm->barrier_sense = 0; + comm->barrier_threads_arrived = 0; } -void bli_thrcomm_cleanup( thrcomm_t* communicator ) +void bli_thrcomm_cleanup( thrcomm_t* comm ) { - if ( communicator == NULL ) return; + if ( comm == NULL ) return; } //'Normal' barrier for openmp //barrier routine taken from art of multicore programming -void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id ) +void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id ) { - if( communicator == NULL || communicator->n_threads == 1 ) +#if 0 + if ( comm == NULL || comm->n_threads == 1 ) return; - bool_t my_sense = communicator->barrier_sense; + bool_t my_sense = comm->barrier_sense; dim_t my_threads_arrived; _Pragma( "omp atomic capture" ) - my_threads_arrived = ++(communicator->barrier_threads_arrived); + my_threads_arrived = ++(comm->barrier_threads_arrived); - if ( my_threads_arrived == communicator->n_threads ) + if ( my_threads_arrived == comm->n_threads ) { - communicator->barrier_threads_arrived = 0; - communicator->barrier_sense = !communicator->barrier_sense; + comm->barrier_threads_arrived = 0; + comm->barrier_sense = !comm->barrier_sense; } else { - volatile bool_t* listener = &communicator->barrier_sense; + volatile bool_t* listener = &comm->barrier_sense; while ( *listener == my_sense ) {} } +#endif + bli_thrcomm_barrier_atomic( comm, t_id ); } #else -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads) +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads) { - if ( communicator == NULL ) return; - communicator->sent_object = NULL; - communicator->n_threads = n_threads; - communicator->barriers = bli_malloc_intl( sizeof( barrier_t* ) * n_threads ); - bli_thrcomm_tree_barrier_create( n_threads, BLIS_TREE_BARRIER_ARITY, communicator->barriers, 0 ); + if ( comm == NULL ) return; + comm->sent_object = NULL; + comm->n_threads = n_threads; + comm->barriers = bli_malloc_intl( sizeof( barrier_t* ) * n_threads ); + bli_thrcomm_tree_barrier_create( n_threads, BLIS_TREE_BARRIER_ARITY, comm->barriers, 0 ); } //Tree barrier used for Intel Xeon Phi @@ -145,14 +148,14 @@ barrier_t* bli_thrcomm_tree_barrier_create( int num_threads, int arity, barrier_ return me; } -void bli_thrcomm_cleanup( thrcomm_t* communicator ) +void bli_thrcomm_cleanup( thrcomm_t* comm ) { - if ( communicator == NULL ) return; - for ( dim_t i = 0; i < communicator->n_threads; i++ ) + if ( comm == NULL ) return; + for ( dim_t i = 0; i < comm->n_threads; i++ ) { - bli_thrcomm_tree_barrier_free( communicator->barriers[i] ); + bli_thrcomm_tree_barrier_free( comm->barriers[i] ); } - bli_free_intl( communicator->barriers ); + bli_free_intl( comm->barriers ); } void bli_thrcomm_tree_barrier_free( barrier_t* barrier ) diff --git a/frame/thread/bli_thrcomm_openmp.h b/frame/thread/bli_thrcomm_openmp.h index 6808b9772..435845b16 100644 --- a/frame/thread/bli_thrcomm_openmp.h +++ b/frame/thread/bli_thrcomm_openmp.h @@ -60,11 +60,12 @@ struct thrcomm_s #else struct thrcomm_s { - void* sent_object; - dim_t n_threads; + void* sent_object; + dim_t n_threads; - volatile bool_t barrier_sense; - dim_t barrier_threads_arrived; + //volatile bool_t barrier_sense; + bool_t barrier_sense; + dim_t barrier_threads_arrived; }; #endif diff --git a/frame/thread/bli_thrcomm_pthreads.c b/frame/thread/bli_thrcomm_pthreads.c index 230b63905..27fb37e6a 100644 --- a/frame/thread/bli_thrcomm_pthreads.c +++ b/frame/thread/bli_thrcomm_pthreads.c @@ -43,81 +43,84 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads ) return comm; } -void bli_thrcomm_free( thrcomm_t* communicator ) +void bli_thrcomm_free( thrcomm_t* comm ) { - if ( communicator == NULL ) return; - bli_thrcomm_cleanup( communicator ); - bli_free_intl( communicator ); + if ( comm == NULL ) return; + bli_thrcomm_cleanup( comm ); + bli_free_intl( comm ); } #ifdef BLIS_USE_PTHREAD_BARRIER -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads) +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads) { - if ( communicator == NULL ) return; - communicator->sent_object = NULL; - communicator->n_threads = n_threads; - pthread_barrier_init( &communicator->barrier, NULL, n_threads ); + if ( comm == NULL ) return; + comm->sent_object = NULL; + comm->n_threads = n_threads; + pthread_barrier_init( &comm->barrier, NULL, n_threads ); } -void bli_thrcomm_cleanup( thrcomm_t* communicator ) +void bli_thrcomm_cleanup( thrcomm_t* comm ) { - if ( communicator == NULL ) return; - pthread_barrier_destroy( &communicator->barrier ); + if ( comm == NULL ) return; + pthread_barrier_destroy( &comm->barrier ); } -void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id ) +void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id ) { - pthread_barrier_wait( &communicator->barrier ); + pthread_barrier_wait( &comm->barrier ); } #else -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads) +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads) { - if ( communicator == NULL ) return; - communicator->sent_object = NULL; - communicator->n_threads = n_threads; - communicator->sense = 0; - communicator->threads_arrived = 0; + if ( comm == NULL ) return; + comm->sent_object = NULL; + comm->n_threads = n_threads; + comm->barrier_sense = 0; + comm->barrier_threads_arrived = 0; -#ifdef BLIS_USE_PTHREAD_MUTEX - pthread_mutex_init( &communicator->mutex, NULL ); -#endif +//#ifdef BLIS_USE_PTHREAD_MUTEX +// pthread_mutex_init( &comm->mutex, NULL ); +//#endif } -void bli_thrcomm_cleanup( thrcomm_t* communicator ) +void bli_thrcomm_cleanup( thrcomm_t* comm ) { -#ifdef BLIS_USE_PTHREAD_MUTEX - if ( communicator == NULL ) return; - pthread_mutex_destroy( &communicator->mutex ); -#endif +//#ifdef BLIS_USE_PTHREAD_MUTEX +// if ( comm == NULL ) return; +// pthread_mutex_destroy( &comm->mutex ); +//#endif } -void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id ) +void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id ) { - if ( communicator == NULL || communicator->n_threads == 1 ) return; - bool_t my_sense = communicator->sense; +#if 0 + if ( comm == NULL || comm->n_threads == 1 ) return; + bool_t my_sense = comm->sense; dim_t my_threads_arrived; #ifdef BLIS_USE_PTHREAD_MUTEX - pthread_mutex_lock( &communicator->mutex ); - my_threads_arrived = ++(communicator->threads_arrived); - pthread_mutex_unlock( &communicator->mutex ); + pthread_mutex_lock( &comm->mutex ); + my_threads_arrived = ++(comm->threads_arrived); + pthread_mutex_unlock( &comm->mutex ); #else - my_threads_arrived = __sync_add_and_fetch(&(communicator->threads_arrived), 1); + my_threads_arrived = __sync_add_and_fetch(&(comm->threads_arrived), 1); #endif - if ( my_threads_arrived == communicator->n_threads ) + if ( my_threads_arrived == comm->n_threads ) { - communicator->threads_arrived = 0; - communicator->sense = !communicator->sense; + comm->threads_arrived = 0; + comm->sense = !comm->sense; } else { - volatile bool_t* listener = &communicator->sense; + volatile bool_t* listener = &comm->sense; while( *listener == my_sense ) {} } +#endif + bli_thrcomm_barrier_atomic( comm, t_id ); } #endif diff --git a/frame/thread/bli_thrcomm_pthreads.h b/frame/thread/bli_thrcomm_pthreads.h index 1c807772d..286387bcf 100644 --- a/frame/thread/bli_thrcomm_pthreads.h +++ b/frame/thread/bli_thrcomm_pthreads.h @@ -54,12 +54,13 @@ struct thrcomm_s void* sent_object; dim_t n_threads; -#ifdef BLIS_USE_PTHREAD_MUTEX - pthread_mutex_t mutex; -#endif +//#ifdef BLIS_USE_PTHREAD_MUTEX +// pthread_mutex_t mutex; +//#endif - volatile bool_t sense; - volatile dim_t threads_arrived; + //volatile bool_t barrier_sense; + bool_t barrier_sense; + dim_t barrier_threads_arrived; }; #endif diff --git a/frame/thread/bli_thrcomm_single.c b/frame/thread/bli_thrcomm_single.c index c038f59a0..76b48ca95 100644 --- a/frame/thread/bli_thrcomm_single.c +++ b/frame/thread/bli_thrcomm_single.c @@ -44,29 +44,29 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads ) return comm; } -void bli_thrcomm_free( thrcomm_t* communicator ) +void bli_thrcomm_free( thrcomm_t* comm ) { - if ( communicator == NULL ) return; - bli_thrcomm_cleanup( communicator ); - bli_free_intl( communicator ); + if ( comm == NULL ) return; + bli_thrcomm_cleanup( comm ); + bli_free_intl( comm ); } -void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads ) +void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads ) { - if ( communicator == NULL ) return; + if ( comm == NULL ) return; - communicator->sent_object = NULL; - communicator->n_threads = n_threads; - communicator->barrier_sense = 0; - communicator->barrier_threads_arrived = 0; + comm->sent_object = NULL; + comm->n_threads = n_threads; + comm->barrier_sense = 0; + comm->barrier_threads_arrived = 0; } -void bli_thrcomm_cleanup( thrcomm_t* communicator ) +void bli_thrcomm_cleanup( thrcomm_t* comm ) { - if ( communicator == NULL ) return; + if ( comm == NULL ) return; } -void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id ) +void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id ) { return; }