Monday, January 30, 2012

Multiple Reader Single Writer Locks

A simple bare-bone version of a "multiple-reader-single-writer" program. 

Note, that a single writer here implies, that only a single thread can perform a write at a time. There can be multiple writer threads though. Here, we're not modifying any shared buffers. This code is just added to understand the nuances of the locking mechanism involved when there are multiple read / write threads. This code handles both "writer starvation" and "reader starvation" cases. The output pasted below should be an interesting part for introspection:
#include <stdio.h>
#include <pthread.h>

typedef int bool;

/* Current lock status:
   Valid values are one of: [ -1 0 1 2 3 4 5 .. numReaders ]
   -1 indicates a single writer is currently active.
    0 indicates no writer and readers are currently active.
   >0 indicates the number of readers currently active.

   Primary usage:

   This is the indirect single dimension lock, which tells us whether
   we need to wait, or it's ok to proceed.
*/
signed rwIndex = 0;

/* Number of waiting writers:
   Valid values: >= 0

   =0 indicates that no writers are waiting. This means one of the following:
   - no writers and readers are active.
   - 1 writer is active, and there is no second writer now.
   - there are >0 readers, but no writers currently active.

   >0 indicates that atleast 1 writer is waiting. This means one of the following:
   - Few readers are active, thereby making the writers to wait till they're done.
   - 1 writer is active, thereby making the other writers wait.

   Primary usage:

   This variable is used to find, whether we need to signal a single
   waiting writer or signal all waiting readers. Waiting writers are
   signalled before any waiting readers.
 */
unsigned numWaitingWriters = 0;

/* These 2 TSDs ( Thread specific data ) are just used for convenience
   sake. 

   * simpleThreadKey - This is used to hold thread id's starting from
   value 1. This is simple and useful rather than printing some
   platform specific thread ID like 1231294959.

   * rwKey - This indicates the type of the thread ( reader / writer
   ). 

   Primary Usage:
   
   Both are useful in analyzing the lock flow among the threads. Refer
   program output. */
pthread_key_t rwKey;
pthread_key_t simpleThreadKey;

/* Starting thread id used in simpleThreadKey */
unsigned initialThreadId = 1;

/* We synchronize on a single lock. */
pthread_mutex_t rwLock = PTHREAD_MUTEX_INITIALIZER;

/* We need separate condition variables for readers and writers.  */
pthread_cond_t readerWaitingForSignal = PTHREAD_COND_INITIALIZER;
pthread_cond_t writerWaitingForSignal = PTHREAD_COND_INITIALIZER;




/* Prints the current lock status of the program wrt to the current
   thread */
void printStatus( const char *function, const char *marker ) {
  char type = ( ( unsigned ) pthread_getspecific( rwKey ) ) ? 'W' : 'R';
  unsigned threadId = ( unsigned ) pthread_getspecific( simpleThreadKey );

  printf( "%c%-2d %18s::%-40s rwIndex( %2d ) waitingWriters( %2u )\n", 
      type, threadId, function, marker, rwIndex, numWaitingWriters );
}


/* This function is normally a dummy, if there are no waiting writers
   and the first reader already has obtained a lock. Else, we have to
   wait if there is an active writer or waiting writers. */
void getReadLock() {
  pthread_mutex_lock( &rwLock );
  
  /* 1. If there is a single writer already writing ( rwIndex == -1 )  */
  /* 2. [1] + more writers waiting ( rwIndex < -1 ) */
  /* 3. If there is writer waiting for readers to clear up ( numWaitingWriters > 0 )  */
  while ( rwIndex < 0 || numWaitingWriters ) {
    printStatus( __FUNCTION__, "Waiting for signal" );
    pthread_cond_wait( &readerWaitingForSignal, &rwLock );
    printStatus( __FUNCTION__, "Got signal" );
  }

  rwIndex++;
  printStatus( __FUNCTION__, "Got read access" );

  pthread_mutex_unlock( &rwLock );
}


/* We have to wait in the case of active readers or a single active
   writer. Free to proceed otherwise. */
void getWriteLock() {
  pthread_mutex_lock( &rwLock );
  
  while ( rwIndex != 0 ) {
    numWaitingWriters++;
    printStatus( __FUNCTION__, "Waiting for signal" );
    pthread_cond_wait( &writerWaitingForSignal, &rwLock );
    printStatus( __FUNCTION__, "Got signal" );
    numWaitingWriters--;
  }

  /* Cool ... Got the write lock. All other readers and writers can
     wait. */
  rwIndex = -1;
  printStatus( __FUNCTION__, "Got write access" );

  pthread_mutex_unlock( &rwLock );
}


/* Nothing big here. If you're the last reader, notify any waiting
   writers ( even if there are waiting readers ). The writers will
   notify the waiting readers, when they're done. */
void releaseReadLock() {
  pthread_mutex_lock( &rwLock );

  rwIndex--;
  printStatus( __FUNCTION__, "I'm done reading" );

  if ( rwIndex == 0 ) {
    if ( numWaitingWriters ) {
      printStatus( __FUNCTION__, "Signal waiting writers first" );
      /* Only 1 writer can be active at a time. So, no point wasting a
     broadcast. Just signal here. */
      pthread_cond_signal( &writerWaitingForSignal );
    } else {
      printStatus( __FUNCTION__, "I'm the last reader. No waiting writers" );
    }
  }

  pthread_mutex_unlock( &rwLock );
}


/* Signal waiting writers first. Signal waiting readers next. */
void releaseWriteLock() {
  pthread_mutex_lock( &rwLock );

  rwIndex = 0;
  printStatus( __FUNCTION__, "I'm done writing" );

  /* Notify writers first */
  if ( numWaitingWriters  ) {
    printStatus( __FUNCTION__, "Wake writers now. Readers later" );
    /* Only 1 writer can be active at a time. So, no point wasting a
       broadcast. Just signal here. */
    pthread_cond_signal( &writerWaitingForSignal );
  } else {
    /* Signal all readers, if there are no waiting writers */
    printStatus( __FUNCTION__, "No writers. Signalling all readers" );
    pthread_cond_broadcast( &readerWaitingForSignal );
  }

  pthread_mutex_unlock( &rwLock );
}


/* Common init code for readers and writers */
void threadCommon( bool isWriter ) {
  pthread_setspecific( rwKey, ( void * ) isWriter );
  pthread_setspecific( simpleThreadKey, ( void * ) initialThreadId );

  initialThreadId++;

  /* This sleep is here to make sure that all 'lock fighting' among
     readers and writers start almost at the same time.  */
  sleep( 1 );
}


void* writeToBuffer( void *arg ) {
  threadCommon( 1 );

  getWriteLock();

  /* Write something to buffer */

  releaseWriteLock();
}


void* readFromBuffer( void *arg ) {
  threadCommon( 0 );

  getReadLock();

  /* Read something from buffer */

  releaseReadLock();
}


int main() {
  const unsigned numReaders = 5;
  const unsigned numWriters = 5;

  pthread_t reader[ numReaders ];
  pthread_t writer[ numWriters ];

  pthread_attr_t attr;
  pthread_attr_init( &attr );
  pthread_key_create( &rwKey, NULL );
  pthread_key_create( &simpleThreadKey, NULL );

  int i = 0;

  for ( i = 0 ; i < numReaders; i++ )
    pthread_create( &reader[ i ], &attr, readFromBuffer, NULL );

  for ( i = 0 ; i < numWriters; i++ )
    pthread_create( &writer[ i ], &attr, writeToBuffer, NULL );


  for ( i = 0 ; i < numReaders; i++ )
    pthread_join( reader[ i ], NULL );

  for ( i = 0 ; i < numWriters; i++ )
    pthread_join( writer[ i ], NULL );

  return 0;
}



Output:

R2         getReadLock::Got read access                          rwIndex(  1 ) waitingWriters(  0 )
R3         getReadLock::Got read access                          rwIndex(  2 ) waitingWriters(  0 )
R4         getReadLock::Got read access                          rwIndex(  3 ) waitingWriters(  0 )
R5         getReadLock::Got read access                          rwIndex(  4 ) waitingWriters(  0 )
W6        getWriteLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  1 )
W7        getWriteLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  2 )
W8        getWriteLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  3 )
R1         getReadLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  3 )
W9        getWriteLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  4 )
W10       getWriteLock::Waiting for signal                       rwIndex(  4 ) waitingWriters(  5 )
R2     releaseReadLock::I'm done reading                         rwIndex(  3 ) waitingWriters(  5 )
R3     releaseReadLock::I'm done reading                         rwIndex(  2 ) waitingWriters(  5 )
R4     releaseReadLock::I'm done reading                         rwIndex(  1 ) waitingWriters(  5 )
R5     releaseReadLock::I'm done reading                         rwIndex(  0 ) waitingWriters(  5 )
R5     releaseReadLock::Signal waiting writers first             rwIndex(  0 ) waitingWriters(  5 )
W6        getWriteLock::Got signal                               rwIndex(  0 ) waitingWriters(  5 )
W6        getWriteLock::Got write access                         rwIndex( -1 ) waitingWriters(  4 )
W6    releaseWriteLock::I'm done writing                         rwIndex(  0 ) waitingWriters(  4 )
W6    releaseWriteLock::Wake writers now. Readers later          rwIndex(  0 ) waitingWriters(  4 )
W7        getWriteLock::Got signal                               rwIndex(  0 ) waitingWriters(  4 )
W7        getWriteLock::Got write access                         rwIndex( -1 ) waitingWriters(  3 )
W7    releaseWriteLock::I'm done writing                         rwIndex(  0 ) waitingWriters(  3 )
W7    releaseWriteLock::Wake writers now. Readers later          rwIndex(  0 ) waitingWriters(  3 )
W8        getWriteLock::Got signal                               rwIndex(  0 ) waitingWriters(  3 )
W8        getWriteLock::Got write access                         rwIndex( -1 ) waitingWriters(  2 )
W8    releaseWriteLock::I'm done writing                         rwIndex(  0 ) waitingWriters(  2 )
W8    releaseWriteLock::Wake writers now. Readers later          rwIndex(  0 ) waitingWriters(  2 )
W9        getWriteLock::Got signal                               rwIndex(  0 ) waitingWriters(  2 )
W9        getWriteLock::Got write access                         rwIndex( -1 ) waitingWriters(  1 )
W9    releaseWriteLock::I'm done writing                         rwIndex(  0 ) waitingWriters(  1 )
W9    releaseWriteLock::Wake writers now. Readers later          rwIndex(  0 ) waitingWriters(  1 )
W10       getWriteLock::Got signal                               rwIndex(  0 ) waitingWriters(  1 )
W10       getWriteLock::Got write access                         rwIndex( -1 ) waitingWriters(  0 )
W10   releaseWriteLock::I'm done writing                         rwIndex(  0 ) waitingWriters(  0 )
W10   releaseWriteLock::No writers. Signalling all readers       rwIndex(  0 ) waitingWriters(  0 )
R1         getReadLock::Got signal                               rwIndex(  0 ) waitingWriters(  0 )
R1         getReadLock::Got read access                          rwIndex(  1 ) waitingWriters(  0 )
R1     releaseReadLock::I'm done reading                         rwIndex(  0 ) waitingWriters(  0 )
R1     releaseReadLock::I'm the last reader. No waiting writers  rwIndex(  0 ) waitingWriters(  0 )