path: root/liblo/pthreads.2/tests/multiPCcaBetter.c
diff options
Diffstat (limited to 'liblo/pthreads.2/tests/multiPCcaBetter.c')
1 files changed, 282 insertions, 0 deletions
diff --git a/liblo/pthreads.2/tests/multiPCcaBetter.c b/liblo/pthreads.2/tests/multiPCcaBetter.c
new file mode 100644
index 0000000..fe130e3
--- /dev/null
+++ b/liblo/pthreads.2/tests/multiPCcaBetter.c
@@ -0,0 +1,282 @@
+/* Session 7. multiPCca.c Lab 7-1 */
+/* Maintain a producer thread and several consumer threads */
+/* The producer periodically creates checksummed data buffers, */
+/* or "message block" which the consumers display as soon */
+/* as possible. The conusmers read the NEXT complete */
+/* set of data, and each consumer validates the data before */
+/* before display. */
+/* Consumers are created and cancelled on demand based */
+/* user input. */
+/* Usage: multiPCca maxconsumer */
+#if defined (_MSC_VER)
+#include <windows.h>
+#define sleep(i) Sleep(i*1000)
+#include <pthread.h>
+//#include "errors.h"
+//#include "utility.h"
+#include <stdio.h>
+#include <stdlib.h>
+#define DATA_SIZE 256
+typedef struct msg_block_tag { /* Message block */
+ pthread_mutex_t mguard; /* Guard the message block */
+ pthread_cond_t mready; /* Message ready */
+ pthread_cond_t mok; /* Ok for the producer to produce */
+ volatile int f_ready;
+ volatile int f_stop;
+ /* ready state flag; producer stopped flag */
+ volatile int sequence; /* Message block sequence number */
+ time_t timestamp;
+ int checksum; /* Message contents checksum */
+ int data[DATA_SIZE]; /* Message Contents */
+} msg_block_t;
+/* The invariant and condition variable predicates are: */
+/* Invariant - */
+/* f_ready && data is valid */
+/* && checksum and timestamp are valid */
+/* Condition variable predicate */
+/* mready if and only if f_ready and a new message */
+/* has just been generated */
+/* Single message block, ready to fill with a new message */
+struct msg_block_tag mblock = { PTHREAD_MUTEX_INITIALIZER,
+void * produce (void *), * consume (void *);
+void message_fill (msg_block_t *);
+void message_display (int, msg_block_t *);
+static volatile int ShutDown = 0;
+int main (int argc, char * argv[])
+ int tstatus, nthread, ithread;
+ int * f_consumer; /* Array of flags to indicate corresponding thread exists */
+ void *join_result;
+ pthread_t *consume_t;
+ char command [10];
+ if (argc != 2) {
+ printf ("Usage: multiPCca maxconsumer\n");
+ return 1;
+ }
+ nthread = atoi(argv[1]);
+ consume_t = calloc (nthread, sizeof(pthread_t));
+ f_consumer = (int *)calloc (nthread, sizeof(int));
+ while (!ShutDown) {
+ printf ("Enter command: nc (new consumer), cc (cancel), ");
+ printf ("pr (produce msg), sh (shutdown):");
+ fflush (stdout);
+ scanf ("%s", command);
+ printf ("Command received: %s.\n", command); fflush (stdout);
+ if (strcmp (command, "nc") == 0) { /* New consumer thread */
+ /* Look for empty thread slot */
+ for (ithread = 0; ithread < nthread; ithread++) {
+ if (!f_consumer[ithread]) break;
+ }
+ if (ithread >= nthread) {
+ printf ("Maximum # consumers (%d) already exist\n", nthread);
+ fflush (stdout);
+ continue;
+ }
+ tstatus = pthread_create (&consume_t[ithread], NULL,
+ consume, (void *)ithread);
+// if (tstatus != 0)
+// err_abort (tstatus, "Cannot create consumer thread");
+ f_consumer[ithread] = 1;
+ printf ("Consumer number %d created successfully.\n", ithread);
+ fflush (stdout);
+ } else if (strcmp (command, "cc") == 0) { /* cancel consumer thread */
+ printf ("Enter consumer number: 0 to %d:", nthread-1);
+ fflush (stdout);
+ scanf ("%d", &ithread);
+ if (ithread < 0 || ithread >= nthread || !f_consumer[ithread]) {
+ printf ("Thread %d does not exist.\n", ithread); fflush (stdout);
+ fflush (stdout);
+ } else {
+ printf ("About to cancel thread # %d.\n", ithread); fflush (stdout);
+ tstatus = pthread_cancel (consume_t[ithread]);
+ printf ("Cancel status: %d. About to join thread # %d\n", tstatus, ithread); fflush (stdout);
+ tstatus = pthread_join (consume_t[ithread], &join_result);
+ printf ("Join status: %d after joining thread # %d. Result: %d\n", tstatus, ithread, (int) join_result); fflush (stdout);
+ f_consumer[ithread] = 0;
+ }
+ continue;
+ } else if (strcmp (command, "pr") == 0) { /* Produce a message */
+ printf ("About to produce a new message.\n"); fflush (stdout);
+ produce(NULL);
+ /* Note the race to prompt before/after message display */
+ } else if (strcmp (command, "sh") == 0) { /* shutdown system */
+ printf ("Shutdown command received\n");
+ for (ithread = 0; ithread < nthread; ithread++)
+ printf ("Thread #: %d. Flag: %d.\n", ithread, f_consumer[ithread]);
+ fflush (stdout);
+ ShutDown = 1; /* Cancel and join all running threads */
+ for (ithread = 0; ithread < nthread; ithread++) {
+ printf ("Thread #: %d. Flag: %d.\n", ithread, f_consumer[ithread]);
+ fflush (stdout);
+ if (f_consumer[ithread]) {
+ printf ("About to cancel consumer thread #: %d.\n", ithread);
+ fflush (stdout);
+ tstatus = pthread_cancel (consume_t[ithread]);
+ if (tstatus != 0)
+ printf ("Cannot cancel consumer thread %d", ithread);
+ printf ("About to join consumer thread #: %d.\n", ithread);
+ fflush (stdout);
+ tstatus = pthread_join (consume_t[ithread], &join_result);
+ if (tstatus != 0 || join_result != PTHREAD_CANCELED)
+ printf ("Error joining thread #: %d. tstatus: %d.\n",
+ ithread, tstatus);
+ printf ("Joined consumer thread #: %d.\n", ithread);
+ fflush (stdout);
+ }
+ }
+ fflush (stdout);
+ printf ("All consumer threads cancelled and joined.\n");
+ fflush (stdout);
+ } else { /* Illegal command */
+ printf ("Illegal command. %s. Try again\n", command);
+ fflush (stdout);
+ }
+ }
+ free (consume_t);
+ printf ("Producer and consumer threads have terminated\n");
+ fflush (stdout);
+ return 0;
+void * produce (void *arg)
+/* Producer function. Create new message when called, and notify consumers */
+/* The arg is there as this was derived from a thread function. */
+ int tstatus = 1;
+ /* Get the buffer, fill it,*/
+ /* and inform all consumers with a broadcast */
+ printf ("Entering producer.\n"); fflush (stdout);
+ while (tstatus != 0) {
+ tstatus = pthread_mutex_trylock (&mblock.mguard);
+ printf ("Trylock status: %d\n", tstatus); fflush (stdout);
+ if (tstatus != 0) sleep (1);
+ }
+ message_fill (&mblock);
+ mblock.sequence++;
+ mblock.f_ready = 1;
+ pthread_cond_broadcast (&mblock.mready);
+ printf ("Producer produced one message.\n"); fflush (stdout);
+ pthread_mutex_unlock (&mblock.mguard);
+ return NULL;
+/* Mutex cleanup handler used by the consumers */
+void free_mutex (void * arg)
+ int tstatus;
+ printf ("Entering free_mutex cleanup handler.\n");
+ tstatus = pthread_mutex_unlock ((pthread_mutex_t *)arg);
+ printf ("Unlocked mutex. Status: %d\n", tstatus); fflush (stdout);
+/* Cleanup handler for the consumer thread */
+void cancel_consumer (void * arg)
+ int ithread;
+ ithread = (int) arg;
+ printf ("Thread number %d cancellation handler. Curently, does nothing.\n", ithread);
+ fflush (stdout);
+void *consume (void *arg)
+ int ithread, old_state, old_type;
+ struct timespec timeout;
+ timeout.tv_nsec = 0;
+ ithread = (int)arg;
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state);
+ pthread_cleanup_push (cancel_consumer, (void *)ithread);
+ pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, &old_type);
+ pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, &old_state);
+ /* Consume the NEXT message */
+ do {
+ pthread_testcancel();
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state);
+ pthread_mutex_lock (&mblock.mguard);
+ pthread_cleanup_push (free_mutex, &mblock.mguard);
+ pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, &old_state);
+ if (!mblock.f_stop) {
+ do { /* Wait for the NEXT message */
+ pthread_testcancel();
+ printf ("Thread #: %d about to wait for mready.\n", ithread); fflush (stdout);
+ pthread_cond_wait (&mblock.mready, &mblock.mguard);
+ printf ("Thread #: %d after wait for mready.\n", ithread); fflush (stdout);
+ } while (!mblock.f_ready && !ShutDown);
+ message_display (ithread, &mblock);
+ pthread_testcancel();
+ }
+ /* Free the mutex through the free_mutex cleanup handler (a macro) */
+ pthread_cleanup_pop (1);
+ } while (!ShutDown);
+ pthread_cleanup_pop(1); /* NOTE: Try removing this!! Compiler error!
+ Why?? Because it's a macro */
+ printf ("Consumer number %d is shutting down\n", ithread);
+ fflush (stdout);
+ return NULL;
+void message_fill (msg_block_t *mblock)
+ /* Fill the message buffer, and include checksum and timestamp */
+ /* This function is called from the producer thread while it */
+ /* owns the message block mutex */
+ int i;
+ mblock->checksum = 0;
+ for (i = 0; i < DATA_SIZE; i++) {
+ mblock->data[i] = rand();
+ mblock->checksum ^= mblock->data[i];
+ }
+ mblock->timestamp = time(NULL);
+ return;
+void message_display (int ithread, msg_block_t *mblock)
+ /* Display message buffer and timestamp, validate checksum */
+ /* This function is called from the consumer thread while it */
+ /* owns the message block mutex */
+ int i, tcheck = 0;
+ for (i = 0; i < DATA_SIZE; i++)
+ tcheck ^= mblock->data[i];
+ printf ("\nConsumer thread #: %d\n", ithread);
+ printf ("Message number %d generated at: %s",
+ mblock->sequence, ctime (&(mblock->timestamp)));
+ printf ("First and last entries: %x %x\n",
+ mblock->data[0], mblock->data[DATA_SIZE-1]);
+ fflush (stdout);
+ if (tcheck == mblock->checksum)
+ printf ("GOOD ->Checksum was validated.\n");
+ else
+ printf ("BAD ->Checksum failed. message was corrupted\n");
+ fflush (stdout);
+ return;