Actual source code: tclockfree.c

petsc-3.3-p7 2013-05-11
  1: #include <../src/sys/threadcomm/impls/pthread/tcpthreadimpl.h>

  3: #define THREAD_TERMINATE      -1
  4: #define THREAD_WAITING_FOR_JOB 0
  5: #define THREAD_RECIEVED_JOB    1
  6: #define THREAD_INITIALIZE      2

  8: /* lock-free data structure */
  9: typedef struct {
 10:   PetscThreadCommJobCtx *data;
 11:   PetscInt           *my_job_status;
 12: } sjob_lockfree;

 14: static sjob_lockfree job_lockfree = {NULL,NULL};


 17: void SparkThreads_LockFree(PetscInt myrank,PetscThreadComm tcomm,PetscThreadCommJobCtx job)
 18: {
 19:   PetscInt i,thread_num;
 20:   PetscThreadComm_PThread  ptcomm;
 21:   PetscInt                 next;

 23:   ptcomm = (PetscThreadComm_PThread)tcomm->data;
 24: 
 25:   switch(ptcomm->spark) {
 26:   case PTHREADPOOLSPARK_LEADER:
 27:     if(PetscReadOnce(int,tcomm->leader) == myrank) {
 28:       /* Only leader sparks all the other threads */
 29:       for(i=ptcomm->thread_num_start+1; i < tcomm->nworkThreads;i++) {
 30:         thread_num = ptcomm->granks[i];
 31:         while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
 32:           ;
 33:         job_lockfree.data[thread_num] = job;
 34:         job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
 35:       }
 36:     }
 37:     break;
 38:   case PTHREADPOOLSPARK_CHAIN:
 39:     /* Spark the next thread */
 40:     next = ptcomm->ngranks[myrank];
 41:     if(next != -1) {
 42:       thread_num = next;
 43:       while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
 44:         ;
 45:       job_lockfree.data[thread_num] = job;
 46:       job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
 47:     }
 48:     break;
 49:   }
 50: }

 52: void* PetscPThreadCommFunc_LockFree(void* arg)
 53: {

 55: #if defined(PETSC_PTHREAD_LOCAL)
 56:   PetscPThreadRank = *(PetscInt*)arg;
 57: #else
 58:   PetscInt PetscPThreadRank=*(PetscInt*)arg;
 59:   pthread_setspecific(PetscPThreadRankkey,&PetscPThreadRank);
 60: #endif

 62: #if defined(PETSC_HAVE_SCHED_CPU_SET_T)
 63:   PetscPThreadCommDoCoreAffinity();
 64: #endif
 65:   job_lockfree.my_job_status[PetscPThreadRank] = THREAD_WAITING_FOR_JOB;

 67:   /* Spin loop */
 68:   while(PetscReadOnce(int,job_lockfree.my_job_status[PetscPThreadRank]) != THREAD_TERMINATE) {
 69:     if(job_lockfree.my_job_status[PetscPThreadRank] == THREAD_RECIEVED_JOB) {
 70:       /* Spark the thread pool */
 71:       SparkThreads_LockFree(PetscPThreadRank,job_lockfree.data[PetscPThreadRank]->tcomm,job_lockfree.data[PetscPThreadRank]);
 72:       /* Do own job */
 73:       PetscRunKernel(PetscPThreadRank,job_lockfree.data[PetscPThreadRank]->nargs,job_lockfree.data[PetscPThreadRank]);
 74:       /* Reset own status */
 75:       job_lockfree.my_job_status[PetscPThreadRank] = THREAD_WAITING_FOR_JOB;
 76:     }
 77:   }

 79:   return NULL;
 80: }

 84: PetscErrorCode PetscThreadCommBarrier_PThread_LockFree(PetscThreadComm tcomm)
 85: {
 86:   PetscInt active_threads=0,i;
 87:   PetscBool wait=PETSC_TRUE;
 88:   PetscThreadComm_PThread ptcomm=(PetscThreadComm_PThread)tcomm->data;

 91:   /* Loop till all threads signal that they have done their job */
 92:   while(wait) {
 93:     for(i=0;i<tcomm->nworkThreads;i++) active_threads += job_lockfree.my_job_status[ptcomm->granks[i]];
 94:     if(active_threads) active_threads = 0;
 95:     else wait=PETSC_FALSE;
 96:   }
 97:   return(0);
 98: }

102: PetscErrorCode PetscPThreadCommInitialize_LockFree(PetscThreadComm tcomm)
103: {
105:   PetscInt       i;
106:   PetscThreadComm_PThread ptcomm=(PetscThreadComm_PThread)tcomm->data;


110:   PetscMalloc(tcomm->nworkThreads*sizeof(PetscThreadCommJobCtx),&job_lockfree.data);
111:   PetscMalloc(tcomm->nworkThreads*sizeof(PetscInt),&job_lockfree.my_job_status);

113:   /* Create threads */
114:   for(i=ptcomm->thread_num_start; i < tcomm->nworkThreads;i++) {
115:     job_lockfree.my_job_status[i] = THREAD_INITIALIZE;
116:     pthread_create(&ptcomm->tid[i],NULL,&PetscPThreadCommFunc_LockFree,&ptcomm->granks[i]);
117:   }
118:   if(ptcomm->ismainworker) {
119:     job_lockfree.my_job_status[0] = THREAD_WAITING_FOR_JOB;
120:   }

122:   /* Put a barrier so that all threads get pinned properly */
123:   PetscThreadCommBarrier_PThread_LockFree(tcomm);
124:   return(0);
125: }

129: PetscErrorCode PetscPThreadCommFinalize_LockFree(PetscThreadComm tcomm)
130: {
131:   PetscErrorCode           ierr;
132:   void*                    jstatus;
133:   PetscThreadComm_PThread  ptcomm=(PetscThreadComm_PThread)tcomm->data;
134:   PetscInt                 i;
136:   PetscThreadCommBarrier_PThread_LockFree(tcomm);
137:   for(i=ptcomm->thread_num_start; i < tcomm->nworkThreads;i++) {
138:     job_lockfree.my_job_status[i] = THREAD_TERMINATE;
139:     pthread_join(ptcomm->tid[i],&jstatus);
140:   }
141:   PetscFree(job_lockfree.my_job_status);
142:   PetscFree(job_lockfree.data);

144:   return(0);
145: }

149: PetscErrorCode PetscThreadCommRunKernel_PThread_LockFree(MPI_Comm comm,PetscThreadCommJobCtx job)
150: {
151:   PetscErrorCode           ierr;
152:   PetscThreadComm          tcomm=0;
153:   PetscThreadComm_PThread  ptcomm;
154: 
156:   PetscCommGetThreadComm(comm,&tcomm);
157:   ptcomm = (PetscThreadComm_PThread)tcomm->data;
158:   if(ptcomm->nthreads) {
159:     PetscInt thread_num;
160:     /* Spark the leader thread */
161:     thread_num = tcomm->leader;
162:     /* Wait till the leader thread has finished its previous job */
163:     while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
164:       ;
165:     job_lockfree.data[thread_num] = job;
166:     job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
167:   }
168:   if(ptcomm->ismainworker) {
169:     job_lockfree.my_job_status[0] = THREAD_RECIEVED_JOB;
170:     job_lockfree.data[0] = job;
171:     PetscRunKernel(0,job->nargs, job_lockfree.data[0]);
172:     job_lockfree.my_job_status[0] = THREAD_WAITING_FOR_JOB;
173:   }
174:   if(ptcomm->synchronizeafter) {
175:     PetscThreadCommBarrier(comm);
176:   }
177:   return(0);
178: }