Updated openmp/pthread barriers with GNU atomics.

Details:
- Updated the non-tree openmp and pthreads barriers defined in
  bli_thrcomm_openmp.c and bli_thrcomm_pthreads.c to instead call a common
  implementation in bli_thrcomm.c, bli_thrcomm_barrier_atomic(). This new
  implementation goes through the same motions as the previous codes, but
  protects its loads and increments with GNU atomic built-ins. These atomic
  statements take memory ordering parameters that allow us to specify just
  enough constraints for the barrier to work as intended on weakly-ordered
  hardware. The prior implementation was only guaranteed to work on systems
  with strongly- ordered memory. (Thanks to Devin Matthews for suggesting
  this change and his crash-course in atomics and memory ordering.)
- Removed 'volatile' from structs' barrier field declarations in
  bli_thrcomm_*.h.
- Updated bli_thrcomm_pthread.? files to use renamed struct barrier fields
  consistent with that of the _openmp.? files.
- Updated other bli_thrcomm_* files to rename "communicator" variables to
  simply "comm".
This commit is contained in:
Field G. Van Zee
2017-07-18 17:56:00 -05:00
committed by prangana
parent 8f739cc847
commit befaee6dd8
7 changed files with 157 additions and 103 deletions

View File

@@ -36,19 +36,63 @@
void* bli_thrcomm_bcast
(
thrcomm_t* communicator,
thrcomm_t* comm,
dim_t id,
void* to_send
)
{
if ( communicator == NULL || communicator->n_threads == 1 ) return to_send;
if ( comm == NULL || comm->n_threads == 1 ) return to_send;
if ( id == 0 ) communicator->sent_object = to_send;
if ( id == 0 ) comm->sent_object = to_send;
bli_thrcomm_barrier( communicator, id );
void* object = communicator->sent_object;
bli_thrcomm_barrier( communicator, id );
bli_thrcomm_barrier( comm, id );
void* object = comm->sent_object;
bli_thrcomm_barrier( comm, id );
return object;
}
void bli_thrcomm_barrier_atomic( thrcomm_t* comm, dim_t t_id )
{
// Return early if the comm is NULL or if there is only one
// thread participating.
if ( comm == NULL || comm->n_threads == 1 ) return;
// Read the "sense" variable. This variable is akin to a unique ID for
// the current barrier. The first n-1 threads will spin on this variable
// until it changes. The sense variable gets incremented by the last
// thread to enter the barrier, just before it exits. But it turns out
// that you don't need many unique IDs before you can wrap around. In
// fact, if everything else is working, a binary variable is sufficient,
// which is what we do here (i.e., 0 is incremented to 1, which is then
// decremented back to 0, and so forth).
bool_t orig_sense = __atomic_load_n( &comm->barrier_sense, __ATOMIC_RELAXED );
// Register ourselves (the current thread) as having arrived by
// incrementing the barrier_threads_arrived variable. We must perform
// this increment (and a subsequent read) atomically.
dim_t my_threads_arrived =
__atomic_add_fetch( &comm->barrier_threads_arrived, 1, __ATOMIC_ACQ_REL );
// If the current thread was the last thread to have arrived, then
// it will take actions that effectively ends and resets the barrier.
if ( my_threads_arrived == comm->n_threads )
{
// Reset the variable tracking the number of threads that have arrived
// to zero (which returns the barrier to the "empty" state. Then
// atomically toggle the barrier sense variable. This will signal to
// the other threads (which are spinning in the branch elow) that it
// is now safe to exit the barrier.
comm->barrier_threads_arrived = 0;
__atomic_fetch_xor( &comm->barrier_sense, 1, __ATOMIC_RELEASE );
}
else
{
// If the current thread is NOT the last thread to have arrived, then
// it spins on the sense variable until that sense variable changes at
// which time these threads will exit the barrier.
while ( __atomic_load_n( &comm->barrier_sense, __ATOMIC_ACQUIRE ) == orig_sense )
; // Empty loop body.
}
}

View File

@@ -49,11 +49,13 @@
// Thread communicator prototypes.
thrcomm_t* bli_thrcomm_create( dim_t n_threads );
void bli_thrcomm_free( thrcomm_t* communicator );
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads );
void bli_thrcomm_cleanup( thrcomm_t* communicator );
void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t thread_id );
void* bli_thrcomm_bcast( thrcomm_t* communicator, dim_t inside_id, void* to_send );
void bli_thrcomm_free( thrcomm_t* comm );
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads );
void bli_thrcomm_cleanup( thrcomm_t* comm );
void bli_thrcomm_barrier( thrcomm_t* comm, dim_t thread_id );
void* bli_thrcomm_bcast( thrcomm_t* comm, dim_t inside_id, void* to_send );
void bli_thrcomm_barrier_atomic( thrcomm_t* comm, dim_t t_id );
#endif

View File

@@ -44,63 +44,66 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads )
return comm;
}
void bli_thrcomm_free( thrcomm_t* communicator )
void bli_thrcomm_free( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
bli_thrcomm_cleanup( communicator );
bli_free_intl( communicator );
if ( comm == NULL ) return;
bli_thrcomm_cleanup( comm );
bli_free_intl( comm );
}
#ifndef BLIS_TREE_BARRIER
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads)
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads)
{
if ( communicator == NULL ) return;
communicator->sent_object = NULL;
communicator->n_threads = n_threads;
communicator->barrier_sense = 0;
communicator->barrier_threads_arrived = 0;
if ( comm == NULL ) return;
comm->sent_object = NULL;
comm->n_threads = n_threads;
comm->barrier_sense = 0;
comm->barrier_threads_arrived = 0;
}
void bli_thrcomm_cleanup( thrcomm_t* communicator )
void bli_thrcomm_cleanup( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
if ( comm == NULL ) return;
}
//'Normal' barrier for openmp
//barrier routine taken from art of multicore programming
void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id )
void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id )
{
if( communicator == NULL || communicator->n_threads == 1 )
#if 0
if ( comm == NULL || comm->n_threads == 1 )
return;
bool_t my_sense = communicator->barrier_sense;
bool_t my_sense = comm->barrier_sense;
dim_t my_threads_arrived;
_Pragma( "omp atomic capture" )
my_threads_arrived = ++(communicator->barrier_threads_arrived);
my_threads_arrived = ++(comm->barrier_threads_arrived);
if ( my_threads_arrived == communicator->n_threads )
if ( my_threads_arrived == comm->n_threads )
{
communicator->barrier_threads_arrived = 0;
communicator->barrier_sense = !communicator->barrier_sense;
comm->barrier_threads_arrived = 0;
comm->barrier_sense = !comm->barrier_sense;
}
else
{
volatile bool_t* listener = &communicator->barrier_sense;
volatile bool_t* listener = &comm->barrier_sense;
while ( *listener == my_sense ) {}
}
#endif
bli_thrcomm_barrier_atomic( comm, t_id );
}
#else
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads)
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads)
{
if ( communicator == NULL ) return;
communicator->sent_object = NULL;
communicator->n_threads = n_threads;
communicator->barriers = bli_malloc_intl( sizeof( barrier_t* ) * n_threads );
bli_thrcomm_tree_barrier_create( n_threads, BLIS_TREE_BARRIER_ARITY, communicator->barriers, 0 );
if ( comm == NULL ) return;
comm->sent_object = NULL;
comm->n_threads = n_threads;
comm->barriers = bli_malloc_intl( sizeof( barrier_t* ) * n_threads );
bli_thrcomm_tree_barrier_create( n_threads, BLIS_TREE_BARRIER_ARITY, comm->barriers, 0 );
}
//Tree barrier used for Intel Xeon Phi
@@ -145,14 +148,14 @@ barrier_t* bli_thrcomm_tree_barrier_create( int num_threads, int arity, barrier_
return me;
}
void bli_thrcomm_cleanup( thrcomm_t* communicator )
void bli_thrcomm_cleanup( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
for ( dim_t i = 0; i < communicator->n_threads; i++ )
if ( comm == NULL ) return;
for ( dim_t i = 0; i < comm->n_threads; i++ )
{
bli_thrcomm_tree_barrier_free( communicator->barriers[i] );
bli_thrcomm_tree_barrier_free( comm->barriers[i] );
}
bli_free_intl( communicator->barriers );
bli_free_intl( comm->barriers );
}
void bli_thrcomm_tree_barrier_free( barrier_t* barrier )

View File

@@ -60,11 +60,12 @@ struct thrcomm_s
#else
struct thrcomm_s
{
void* sent_object;
dim_t n_threads;
void* sent_object;
dim_t n_threads;
volatile bool_t barrier_sense;
dim_t barrier_threads_arrived;
//volatile bool_t barrier_sense;
bool_t barrier_sense;
dim_t barrier_threads_arrived;
};
#endif

View File

@@ -43,81 +43,84 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads )
return comm;
}
void bli_thrcomm_free( thrcomm_t* communicator )
void bli_thrcomm_free( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
bli_thrcomm_cleanup( communicator );
bli_free_intl( communicator );
if ( comm == NULL ) return;
bli_thrcomm_cleanup( comm );
bli_free_intl( comm );
}
#ifdef BLIS_USE_PTHREAD_BARRIER
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads)
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads)
{
if ( communicator == NULL ) return;
communicator->sent_object = NULL;
communicator->n_threads = n_threads;
pthread_barrier_init( &communicator->barrier, NULL, n_threads );
if ( comm == NULL ) return;
comm->sent_object = NULL;
comm->n_threads = n_threads;
pthread_barrier_init( &comm->barrier, NULL, n_threads );
}
void bli_thrcomm_cleanup( thrcomm_t* communicator )
void bli_thrcomm_cleanup( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
pthread_barrier_destroy( &communicator->barrier );
if ( comm == NULL ) return;
pthread_barrier_destroy( &comm->barrier );
}
void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id )
void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id )
{
pthread_barrier_wait( &communicator->barrier );
pthread_barrier_wait( &comm->barrier );
}
#else
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads)
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads)
{
if ( communicator == NULL ) return;
communicator->sent_object = NULL;
communicator->n_threads = n_threads;
communicator->sense = 0;
communicator->threads_arrived = 0;
if ( comm == NULL ) return;
comm->sent_object = NULL;
comm->n_threads = n_threads;
comm->barrier_sense = 0;
comm->barrier_threads_arrived = 0;
#ifdef BLIS_USE_PTHREAD_MUTEX
pthread_mutex_init( &communicator->mutex, NULL );
#endif
//#ifdef BLIS_USE_PTHREAD_MUTEX
// pthread_mutex_init( &comm->mutex, NULL );
//#endif
}
void bli_thrcomm_cleanup( thrcomm_t* communicator )
void bli_thrcomm_cleanup( thrcomm_t* comm )
{
#ifdef BLIS_USE_PTHREAD_MUTEX
if ( communicator == NULL ) return;
pthread_mutex_destroy( &communicator->mutex );
#endif
//#ifdef BLIS_USE_PTHREAD_MUTEX
// if ( comm == NULL ) return;
// pthread_mutex_destroy( &comm->mutex );
//#endif
}
void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id )
void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id )
{
if ( communicator == NULL || communicator->n_threads == 1 ) return;
bool_t my_sense = communicator->sense;
#if 0
if ( comm == NULL || comm->n_threads == 1 ) return;
bool_t my_sense = comm->sense;
dim_t my_threads_arrived;
#ifdef BLIS_USE_PTHREAD_MUTEX
pthread_mutex_lock( &communicator->mutex );
my_threads_arrived = ++(communicator->threads_arrived);
pthread_mutex_unlock( &communicator->mutex );
pthread_mutex_lock( &comm->mutex );
my_threads_arrived = ++(comm->threads_arrived);
pthread_mutex_unlock( &comm->mutex );
#else
my_threads_arrived = __sync_add_and_fetch(&(communicator->threads_arrived), 1);
my_threads_arrived = __sync_add_and_fetch(&(comm->threads_arrived), 1);
#endif
if ( my_threads_arrived == communicator->n_threads )
if ( my_threads_arrived == comm->n_threads )
{
communicator->threads_arrived = 0;
communicator->sense = !communicator->sense;
comm->threads_arrived = 0;
comm->sense = !comm->sense;
}
else
{
volatile bool_t* listener = &communicator->sense;
volatile bool_t* listener = &comm->sense;
while( *listener == my_sense ) {}
}
#endif
bli_thrcomm_barrier_atomic( comm, t_id );
}
#endif

View File

@@ -54,12 +54,13 @@ struct thrcomm_s
void* sent_object;
dim_t n_threads;
#ifdef BLIS_USE_PTHREAD_MUTEX
pthread_mutex_t mutex;
#endif
//#ifdef BLIS_USE_PTHREAD_MUTEX
// pthread_mutex_t mutex;
//#endif
volatile bool_t sense;
volatile dim_t threads_arrived;
//volatile bool_t barrier_sense;
bool_t barrier_sense;
dim_t barrier_threads_arrived;
};
#endif

View File

@@ -44,29 +44,29 @@ thrcomm_t* bli_thrcomm_create( dim_t n_threads )
return comm;
}
void bli_thrcomm_free( thrcomm_t* communicator )
void bli_thrcomm_free( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
bli_thrcomm_cleanup( communicator );
bli_free_intl( communicator );
if ( comm == NULL ) return;
bli_thrcomm_cleanup( comm );
bli_free_intl( comm );
}
void bli_thrcomm_init( thrcomm_t* communicator, dim_t n_threads )
void bli_thrcomm_init( thrcomm_t* comm, dim_t n_threads )
{
if ( communicator == NULL ) return;
if ( comm == NULL ) return;
communicator->sent_object = NULL;
communicator->n_threads = n_threads;
communicator->barrier_sense = 0;
communicator->barrier_threads_arrived = 0;
comm->sent_object = NULL;
comm->n_threads = n_threads;
comm->barrier_sense = 0;
comm->barrier_threads_arrived = 0;
}
void bli_thrcomm_cleanup( thrcomm_t* communicator )
void bli_thrcomm_cleanup( thrcomm_t* comm )
{
if ( communicator == NULL ) return;
if ( comm == NULL ) return;
}
void bli_thrcomm_barrier( thrcomm_t* communicator, dim_t t_id )
void bli_thrcomm_barrier( thrcomm_t* comm, dim_t t_id )
{
return;
}