Actual source code: lockfree.c

petsc-3.3-p7 2013-05-11
  2: #include <petscsys.h>        /*I  "petscsys.h"   I*/
  3: #include <../src/sys/objects/pthread/pthreadimpl.h>

  5: /* lock-free data structure */
  6: typedef struct {
  7:   PetscErrorCode (*pfunc)(void*);
  8:   void** pdata;
  9:   PetscInt *my_job_status;
 10: } sjob_lockfree;

 12: static sjob_lockfree job_lockfree = {NULL,NULL,0};

 14: /* This struct holds information for PetscThreadsWait_LockFree */
 15: static struct {
 16:   PetscInt nthreads; /* Number of busy threads */
 17:   PetscInt *list;    /* List of busy threads */
 18: } busy_threads;

 20: #if __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ >= 1050
 21: #define PetscAtomicCompareandSwap(ptr, oldval, newval) (OSAtomicCompareAndSwapPtr(oldval,newval,ptr))
 22: #elif defined(_MSC_VER)
 23: #define PetscAtomicCompareandSwap(ptr, oldval, newval) (InterlockedCompareExchange(ptr,newval,oldval))
 24: #elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100
 25: #define PetscAtomicCompareandSwap(ptr, oldval, newval) (__sync_bool_compare_and_swap(ptr,oldval,newval))
 26: #else
 27: #  error No maping for PetscAtomicCompareandSwap
 28: #endif

 30: #define PetscReadOnce(type,val) (*(volatile type *)&val)
 31: /* 
 32:   ----------------------------
 33:      'LockFree' Thread Functions 
 34:   ----------------------------
 35: */
 36: void* PetscThreadFunc_LockFree(void* arg)
 37: {
 38: #if defined(PETSC_PTHREAD_LOCAL)
 39:   PetscThreadRank = *(PetscInt*)arg;
 40: #else
 41:   PetscInt PetscThreadRank=*(PetscInt*)arg;
 42:   pthread_setspecific(PetscThreadsRankkey,&PetscThreadRank);
 43: #endif

 45: #if defined(PETSC_HAVE_SCHED_CPU_SET_T)
 46:   PetscThreadsDoCoreAffinity();
 47: #endif

 49:   /* Spin loop */
 50:   while(PetscReadOnce(int,job_lockfree.my_job_status[PetscThreadRank]) != -1) {
 51:     if(job_lockfree.my_job_status[PetscThreadRank] == 1) {
 52:       (*job_lockfree.pfunc)(job_lockfree.pdata[PetscThreadRank]);
 53:       job_lockfree.my_job_status[PetscThreadRank] = 0;
 54:     }
 55:   }

 57:   return NULL;
 58: }

 62: PetscErrorCode PetscThreadsSynchronizationInitialize_LockFree(PetscInt N)
 63: {
 64:   PetscInt i;
 66:   PetscInt nworkThreads=N+PetscMainThreadShareWork;

 69:   PetscMalloc(nworkThreads*sizeof(pthread_t),&PetscThreadPoint);
 70:   PetscMalloc(nworkThreads*sizeof(void*),&(job_lockfree.pdata));
 71:   PetscMalloc(nworkThreads*sizeof(PetscInt),&(job_lockfree.my_job_status));
 72:   PetscMalloc(N*sizeof(PetscInt),&(busy_threads.list));

 74:   if(PetscMainThreadShareWork) {
 75:     job_lockfree.pdata[0] =NULL;
 76:     PetscThreadPoint[0] = pthread_self();
 77:   }

 79:   /* Create threads */
 80:   for(i=PetscMainThreadShareWork; i<nworkThreads; i++) {
 81:     job_lockfree.my_job_status[i] = 0;
 82:     job_lockfree.pdata[i] = NULL;
 83:     pthread_create(&PetscThreadPoint[i],NULL,PetscThreadFunc,&PetscThreadRanks[i]);
 84:   }
 85:   return(0);
 86: }

 90: PetscErrorCode PetscThreadsSynchronizationFinalize_LockFree()
 91: {
 92:   PetscInt       i;
 93:   void*          jstatus;
 95:   PetscInt       nworkThreads=PetscMaxThreads+PetscMainThreadShareWork;


 99:   /* join the threads */
100:   for(i=PetscMainThreadShareWork; i < nworkThreads; i++) {
101:     job_lockfree.my_job_status[i] = -1;
102:     pthread_join(PetscThreadPoint[i],&jstatus);
103:   }

105:   PetscFree(PetscThreadPoint);
106:   PetscFree(job_lockfree.my_job_status);
107:   PetscFree(job_lockfree.pdata);
108:   PetscFree(busy_threads.list);

110:   return(0);
111: }

115: void* PetscThreadsWait_LockFree(void* arg)
116: {
117:   PetscInt active_threads=0,i;
118:   PetscBool wait=PETSC_TRUE;

120:   /* Loop till all threads signal that they have done their job */
121:   while(wait) {
122:     for(i=0;i<busy_threads.nthreads;i++) active_threads += job_lockfree.my_job_status[busy_threads.list[i]];
123:     if(active_threads) active_threads = 0;
124:     else wait=PETSC_FALSE;
125:   }
126:   return(0);
127: }

131: PetscErrorCode PetscThreadsRunKernel_LockFree(PetscErrorCode (*pFunc)(void*),void** data,PetscInt n,PetscInt* cpu_affinity)
132: {
133:   PetscInt i,j,k=0;
134:   PetscInt nworkThreads=PetscMaxThreads+PetscMainThreadShareWork;


138:   busy_threads.nthreads = n-PetscMainThreadShareWork;
139:   job_lockfree.pfunc = pFunc;
140:   for(i=PetscMainThreadShareWork; i < n;i++) {
141:     for(j=PetscMainThreadShareWork;j < nworkThreads;j++) {
142:       if(cpu_affinity[i] == PetscThreadsCoreAffinities[j]) {
143:         job_lockfree.pdata[j] = data[i];
144:         busy_threads.list[k++] = j;
145:         /* signal thread j to start the job */
146:         job_lockfree.my_job_status[j] = 1;
147:       }
148:     }
149:   }

151:   if(PetscMainThreadShareWork) (*pFunc)(data[0]);

153:   /* Wait for all busy threads to finish their job */
154:   PetscThreadsWait(NULL);
155: 
156:   return(0);
157: }