// This program provides a layout for batch processing and coordiantion using the C's pthread constructs. // The asymmetry of this high level program arcitecture stems from all worker threads starting at the same time, but requiring different amounts of time to complete. #include #include #include #include #include typedef enum { FALSE = 0, TRUE = 1 } Bool; typedef Bool* BoolPtr; // To introduce a high level of unpredictability, use many more worker threads than the number of cores inside of the CPU that you intend to use. #define NUMBER_OF_WORKER_THREADS 30 #define NUMBER_OF_WORKING_ROUNDS 4 #define BOGUS 99 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Two sets of inter-thread communication variables are defined below. Each "condition variable" requires an associated "mutex". // It is the responsibility of the programmer to ensure that the global variables connected to a "mutex" are only modified under a lock condition. // Set one. pthread_mutex_t StartWorkMutex; pthread_cond_t StartWorkCondition; // The "StartWorkCondition" sends a boolean flag to request the worker threads to "WorkOn" or terminate, and a value indicating the current "Round". Bool WorkOn = FALSE; unsigned int Round = 0; // Set two. pthread_mutex_t CompleteMutex; pthread_cond_t CompleteCondition; // The worker threads need to let the main thread know which one has just sent the "CompleteCondition" signal, so it can coordinate the correct data set. // The main thread needs to let the worker threads know when it is waiting for a signal and this is communicated using "MainThreadWaiting". Bool MainThreadWaiting = FALSE; unsigned int TheCompletedBatch = BOGUS; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // This function represents an independent stream of computation that works on a batch of data. // For now the work is simply busy-work using the "sqrt()" function on "double" floating point numbers. void *WorkerThread(void *ThreadArgument){ unsigned int X; double BusyWork; double BusyWorkTwo; long unsigned int ThisThreadNumber = (long unsigned int)ThreadArgument; // Enter a waiting state for the "StartWorkCondition". pthread_mutex_lock(&StartWorkMutex); pthread_cond_wait(&StartWorkCondition, &StartWorkMutex); while ( TRUE ) { // It is necessary to unlock the "StartWorkMutex" before thread termination, so that the other worker threads can also terminate. if ( WorkOn ) { printf("Thread-%lu: Begin Work On Batch %d\n", ThisThreadNumber, Round); pthread_mutex_unlock(&StartWorkMutex); } else { pthread_mutex_unlock(&StartWorkMutex); pthread_exit(NULL); } // This is the location where batch processing work will be carried out. Right now it is busy-work. for ( X = 0; X < 45000000; X++ ) { BusyWork = (double)X + 50.75; BusyWork = sqrt(BusyWork); BusyWorkTwo = sqrt(BusyWork + (double)X); } // Get a lock on "CompleteMutex" and make sure that the main thread is waiting, then set "TheCompletedBatch" to "ThisThreadNumber". Set "MainThreadWaiting" to "FALSE". // If the main thread is not waiting, continue trying to get a lock on "CompleteMutex" unitl "MainThreadWaiting" is "TRUE". while ( TRUE ) { pthread_mutex_lock(&CompleteMutex); if ( MainThreadWaiting == TRUE ) { // While this thread still has a lock on the "CompleteMutex", set "MainThreadWaiting" to "FALSE", so that the next thread to maintain a lock will be the main thread. MainThreadWaiting = FALSE; break; } pthread_mutex_unlock(&CompleteMutex); } TheCompletedBatch = ThisThreadNumber; // Lock the "StartWorkMutex" before we send out the "CompleteCondition" signal. // This way, we can enter a waiting state for the next round before the main thread broadcasts the "StartWorkCondition". pthread_mutex_lock(&StartWorkMutex); printf("Thread-%lu: Completed Batch %d\n", ThisThreadNumber, Round); pthread_cond_signal(&CompleteCondition); pthread_mutex_unlock(&CompleteMutex); // Wait for the Main thread to send us the next "StartWorkCondition" broadcast. // Be sure to unlock the corresponding mutex immediately so that the other worker threads can exit their waiting state as well. pthread_cond_wait(&StartWorkCondition, &StartWorkMutex); } } int main(){ long unsigned int Identity; unsigned int X; unsigned int Y; // Initialize all of the thread related objects. pthread_t Threads[NUMBER_OF_WORKER_THREADS]; pthread_attr_t attr; pthread_mutex_init(&CompleteMutex, NULL); pthread_cond_init (&CompleteCondition, NULL); pthread_mutex_init(&StartWorkMutex, NULL); pthread_cond_init (&StartWorkCondition, NULL); /* For portability, explicitly create threads in a joinable state */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); // Create all of the worker threads here. for ( Identity = 0; Identity < NUMBER_OF_WORKER_THREADS; Identity++ ) pthread_create(&Threads[Identity], &attr, WorkerThread, (void *)Identity); // Allow time for the worker threads to start up and wait for the signal that is going to be sent out soon. printf("Wait for 1 second for the %d worker threads to enter a waiting state.\n\n", NUMBER_OF_WORKER_THREADS); sleep(1); // Broadcast the start work signal to all of the waiting worker threads, and recieve the complete signals one at a time. for ( X = 0; X < NUMBER_OF_WORKING_ROUNDS; X++ ) { // Lock the "StartWorkMutex" to set the global work coordination variables. pthread_mutex_lock(&StartWorkMutex); // Not a whole lot to coordinate in this framework demo. WorkOn = TRUE; Round += 1; printf("Main: Broadcast Signal To Start Batch |%d|\n", Round); // Lock the "CompleteMutex" so we can start waiting for completion before any of the worker threads finish their batch. pthread_mutex_lock(&CompleteMutex); pthread_cond_broadcast(&StartWorkCondition); pthread_mutex_unlock(&StartWorkMutex); for ( Y = 0; Y < NUMBER_OF_WORKER_THREADS; Y++ ) { // Before entering a waiting state, set "MainThreadWaiting" to "TRUE" while we still have a lock on the "CompleteMutex". // Worker threads will be waiting for this condition to be met before sending "CompleteCondition" signals. MainThreadWaiting = TRUE; pthread_cond_wait(&CompleteCondition, &CompleteMutex); printf("Main: Complete Signal Recieved From Thread-%d\n", TheCompletedBatch); // This is where partial work on the batch data coordination will happen. All of the worker threads will have to finish before we can start the next batch. } pthread_mutex_unlock(&CompleteMutex); } pthread_mutex_lock(&StartWorkMutex); // Set the GAME OVER condition. WorkOn = FALSE; printf("Main: Broadcast The Termination Signal\n"); pthread_cond_broadcast(&StartWorkCondition); pthread_mutex_unlock(&StartWorkMutex); // Wait for all threads to complete, and then join with them. for ( X = 0; X < NUMBER_OF_WORKER_THREADS; X++ ) { pthread_join(Threads[X], NULL); printf("Main: Thread[%d] Has Been Joined And Terminated.\n", X); } // Clean up and exit. pthread_attr_destroy(&attr); pthread_mutex_destroy(&CompleteMutex); pthread_cond_destroy(&CompleteCondition); pthread_mutex_destroy(&StartWorkMutex); pthread_cond_destroy(&StartWorkCondition); pthread_exit (NULL); }