Actual source code: tclockfree.c
petsc-3.3-p7 2013-05-11
1: #include <../src/sys/threadcomm/impls/pthread/tcpthreadimpl.h>
3: #define THREAD_TERMINATE -1
4: #define THREAD_WAITING_FOR_JOB 0
5: #define THREAD_RECIEVED_JOB 1
6: #define THREAD_INITIALIZE 2
8: /* lock-free data structure */
9: typedef struct {
10: PetscThreadCommJobCtx *data;
11: PetscInt *my_job_status;
12: } sjob_lockfree;
14: static sjob_lockfree job_lockfree = {NULL,NULL};
17: void SparkThreads_LockFree(PetscInt myrank,PetscThreadComm tcomm,PetscThreadCommJobCtx job)
18: {
19: PetscInt i,thread_num;
20: PetscThreadComm_PThread ptcomm;
21: PetscInt next;
23: ptcomm = (PetscThreadComm_PThread)tcomm->data;
24:
25: switch(ptcomm->spark) {
26: case PTHREADPOOLSPARK_LEADER:
27: if(PetscReadOnce(int,tcomm->leader) == myrank) {
28: /* Only leader sparks all the other threads */
29: for(i=ptcomm->thread_num_start+1; i < tcomm->nworkThreads;i++) {
30: thread_num = ptcomm->granks[i];
31: while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
32: ;
33: job_lockfree.data[thread_num] = job;
34: job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
35: }
36: }
37: break;
38: case PTHREADPOOLSPARK_CHAIN:
39: /* Spark the next thread */
40: next = ptcomm->ngranks[myrank];
41: if(next != -1) {
42: thread_num = next;
43: while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
44: ;
45: job_lockfree.data[thread_num] = job;
46: job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
47: }
48: break;
49: }
50: }
52: void* PetscPThreadCommFunc_LockFree(void* arg)
53: {
55: #if defined(PETSC_PTHREAD_LOCAL)
56: PetscPThreadRank = *(PetscInt*)arg;
57: #else
58: PetscInt PetscPThreadRank=*(PetscInt*)arg;
59: pthread_setspecific(PetscPThreadRankkey,&PetscPThreadRank);
60: #endif
62: #if defined(PETSC_HAVE_SCHED_CPU_SET_T)
63: PetscPThreadCommDoCoreAffinity();
64: #endif
65: job_lockfree.my_job_status[PetscPThreadRank] = THREAD_WAITING_FOR_JOB;
67: /* Spin loop */
68: while(PetscReadOnce(int,job_lockfree.my_job_status[PetscPThreadRank]) != THREAD_TERMINATE) {
69: if(job_lockfree.my_job_status[PetscPThreadRank] == THREAD_RECIEVED_JOB) {
70: /* Spark the thread pool */
71: SparkThreads_LockFree(PetscPThreadRank,job_lockfree.data[PetscPThreadRank]->tcomm,job_lockfree.data[PetscPThreadRank]);
72: /* Do own job */
73: PetscRunKernel(PetscPThreadRank,job_lockfree.data[PetscPThreadRank]->nargs,job_lockfree.data[PetscPThreadRank]);
74: /* Reset own status */
75: job_lockfree.my_job_status[PetscPThreadRank] = THREAD_WAITING_FOR_JOB;
76: }
77: }
79: return NULL;
80: }
84: PetscErrorCode PetscThreadCommBarrier_PThread_LockFree(PetscThreadComm tcomm)
85: {
86: PetscInt active_threads=0,i;
87: PetscBool wait=PETSC_TRUE;
88: PetscThreadComm_PThread ptcomm=(PetscThreadComm_PThread)tcomm->data;
91: /* Loop till all threads signal that they have done their job */
92: while(wait) {
93: for(i=0;i<tcomm->nworkThreads;i++) active_threads += job_lockfree.my_job_status[ptcomm->granks[i]];
94: if(active_threads) active_threads = 0;
95: else wait=PETSC_FALSE;
96: }
97: return(0);
98: }
102: PetscErrorCode PetscPThreadCommInitialize_LockFree(PetscThreadComm tcomm)
103: {
105: PetscInt i;
106: PetscThreadComm_PThread ptcomm=(PetscThreadComm_PThread)tcomm->data;
110: PetscMalloc(tcomm->nworkThreads*sizeof(PetscThreadCommJobCtx),&job_lockfree.data);
111: PetscMalloc(tcomm->nworkThreads*sizeof(PetscInt),&job_lockfree.my_job_status);
113: /* Create threads */
114: for(i=ptcomm->thread_num_start; i < tcomm->nworkThreads;i++) {
115: job_lockfree.my_job_status[i] = THREAD_INITIALIZE;
116: pthread_create(&ptcomm->tid[i],NULL,&PetscPThreadCommFunc_LockFree,&ptcomm->granks[i]);
117: }
118: if(ptcomm->ismainworker) {
119: job_lockfree.my_job_status[0] = THREAD_WAITING_FOR_JOB;
120: }
122: /* Put a barrier so that all threads get pinned properly */
123: PetscThreadCommBarrier_PThread_LockFree(tcomm);
124: return(0);
125: }
129: PetscErrorCode PetscPThreadCommFinalize_LockFree(PetscThreadComm tcomm)
130: {
131: PetscErrorCode ierr;
132: void* jstatus;
133: PetscThreadComm_PThread ptcomm=(PetscThreadComm_PThread)tcomm->data;
134: PetscInt i;
136: PetscThreadCommBarrier_PThread_LockFree(tcomm);
137: for(i=ptcomm->thread_num_start; i < tcomm->nworkThreads;i++) {
138: job_lockfree.my_job_status[i] = THREAD_TERMINATE;
139: pthread_join(ptcomm->tid[i],&jstatus);
140: }
141: PetscFree(job_lockfree.my_job_status);
142: PetscFree(job_lockfree.data);
144: return(0);
145: }
149: PetscErrorCode PetscThreadCommRunKernel_PThread_LockFree(MPI_Comm comm,PetscThreadCommJobCtx job)
150: {
151: PetscErrorCode ierr;
152: PetscThreadComm tcomm=0;
153: PetscThreadComm_PThread ptcomm;
154:
156: PetscCommGetThreadComm(comm,&tcomm);
157: ptcomm = (PetscThreadComm_PThread)tcomm->data;
158: if(ptcomm->nthreads) {
159: PetscInt thread_num;
160: /* Spark the leader thread */
161: thread_num = tcomm->leader;
162: /* Wait till the leader thread has finished its previous job */
163: while(PetscReadOnce(int,job_lockfree.my_job_status[thread_num]) != THREAD_WAITING_FOR_JOB)
164: ;
165: job_lockfree.data[thread_num] = job;
166: job_lockfree.my_job_status[thread_num] = THREAD_RECIEVED_JOB;
167: }
168: if(ptcomm->ismainworker) {
169: job_lockfree.my_job_status[0] = THREAD_RECIEVED_JOB;
170: job_lockfree.data[0] = job;
171: PetscRunKernel(0,job->nargs, job_lockfree.data[0]);
172: job_lockfree.my_job_status[0] = THREAD_WAITING_FOR_JOB;
173: }
174: if(ptcomm->synchronizeafter) {
175: PetscThreadCommBarrier(comm);
176: }
177: return(0);
178: }