Operating Systems 2022F: Tutorial 6: Difference between revisions

From Soma-notes
Line 37: Line 37:
     trace -p `pidof 3000pc-fifo | awk '{print $2}'` ...
     trace -p `pidof 3000pc-fifo | awk '{print $2}'` ...


<tt>trace</tt> allows you to do many things that you can do with gdb; however, it is designed to work on production applications and thus is very efficient and much less intrusive.  In technical terms, gdb uses the ptrace system call, while <tt>trace</tt> uses eBPF (a much newer technology).
<tt>trace</tt> allows you to do many things that you can do with gdb; however, it is designed to work on production applications and thus is very efficient and much less intrusive.  In technical terms, gdb, strace, and ltrace use the ptrace system call, while <tt>trace</tt> uses eBPF.


==B: Producer/Consumer with Pipes==
==B: Producer/Consumer with Pipes==

Revision as of 18:30, 21 October 2022

In this tutorial you'll be learning about two implementations of the producer-consumer problem, a classic example of a concurrency problem. The class textbook covers concurrency in great detail in Chapters 25-34, and the producer-consumer problem is covered in Chapter 30 (Condition Variables) and Chapter 31 (Semaphores). While you should look at this part of the textbook, note that we will not be covering this material in the same level of detail, as should be clear from this tutorial.

A: Getting Started

  1. Make sure you system has the a recent version of bcc-tools installed. The class VM has this already installed; on other systems, you'll have to install it yourself.
  2. Download 3000pc.zip, unpack, and run make to compile 3000pc-fifo and 3000pc-rendezvous.
  3. Note that these programs take 3 arguments each:
    • The number of events to process
    • The number of events to produce before the producer sleeps for 1 second
    • The number of events to consume before the consumer sleeps for 1 second
  4. Run both programs with the same arguments of your choice. Do this a few times. Do they behave the same way? Do you notice any differences?
  5. Repeat the above experiment, this time running each program under strace (with the -f flag to trace children too). Do you notice any difference in the system calls each program makes?
  6. In the next question, you will be playing around with trace from bcc-tools. As it makes use of eBPF, a Linux kernel extension mechanism, it needs to be run with root privileges. (sudo should work on the class VM) We will explore eBPF more in later tutorials.
  7. Here are some example trace commands to play with. You can learn more about trace from its man page. Note these assume you unpacked the 3000pc.zip in the Documents folder.
Report on the string passed to pick_word() on its return:

   trace 'r:/home/student/Documents/3000pc/3000pc-fifo:pick_word "%s", arg1'

Print the buffer passed to the read function as a string (on read's return) for process 6312 (running 3000pc-fifo, won't work on 3000pc-rendezvous):

   trace -p 6312 'r:pthread:read "%s", arg2'

(Normally read and write are in the main C library and so would be denoted by r:c:read; however we are linking against the pthread library as well and it replaces the read implementation.)

Print calls to sem_wait in process 6312 and the value of the semaphore at function entry and return.

   trace -p 6312 'p:pthread:sem_wait(int *sem) "entry: %d", *sem' 'r:pthread:sem_wait(int *sem) "exit:  %d", *sem'
 


If you want to automatically trace the producer process, use 

   trace -p `pidof -s 3000pc-fifo`  ...

To trace the consumer, use

   trace -p `pidof 3000pc-fifo | awk '{print $2}'` ...

trace allows you to do many things that you can do with gdb; however, it is designed to work on production applications and thus is very efficient and much less intrusive. In technical terms, gdb, strace, and ltrace use the ptrace system call, while trace uses eBPF.

B: Producer/Consumer with Pipes

  1. Examine the source code of 3000pc-fifo. Explain the following:
    1. What does the call to pipe(pipefd) on line 192 do? Hint: Look at the manpage for pipe(2)
    2. How does the consumer receive words to consume?
    3. How does the producer send words to the consumer?
    4. Why is the call to srandom(time(NULL)) on line 169 necessary?
  2. Replace the call to srandom(time(NULL)) on line 169 with srandom(42). What differences do you notice in 3000pc-fifo's behavior?
  3. What descriptors are opened by 3000pc-fifo? What are they for and what files do the point to? How did you find this information? Hint: Try checking in /proc.
  4. What do you think would happen if you had multiple producers and multiple consumers in 3000pc-fifo? Would things break or work normally?
  5. (Advanced) Implement multiple producers and consumers in 3000pc-fifo. Compare your results with your answer to question 4.

C: Producer/Consumer with Shared Memory

  1. Examine the source code of 3000pc-rendezvous. Explain the following:
    1. What are a few ways that 3000pc-rendezvous is different from 3000pc-fifo?
    2. What does the call to mmap on line 335 do? How did you figure this out?
    3. How does the producer notify the consumer that the queue is no longer empty?
    4. How does the consumer notify the producer that the queue is no longer full?
  2. What arguments can you provide to make the producer wait for the consumer? Hint: Check the size of the queue.
  3. What arguments can you provide to make the consumer wait for the producer?
  4. Another student tells you that the difference between processes and threads is that processes never share memory, while threads do. Is this statement correct or incorrect? How can the behavior of 3000pc-rendezvous help justify your answer?
  5. Note that 3000pc-rendezvous uses /dev/urandom to pick its words instead of the C library's random functions. How do these two methods differ? You may wish to look at man urandom(4).
  6. Change the calls to pthread_mutexattr_setpshared and pthread_condattr_setpshared to take 0 instead of 1. How does the behavior or 3000pc-rendezvous change? Does anything break? Demonstrate this using your answers to question 2 and question 3.
  7. 3000pc-rendezvous can deadlock when run for long enough. 3000pc-rendezvous-timeout should not deadlock, but it may pause sometimes. What is the difference between these two programs? Why would this effect the tendency to deadlock?
  8. Benchmark all three programs using the time command. For example, run 3000fifo as follows:

      time (./3000pc-fifo 100000 0 0 > fifo-output.log)

    Which one is fastest? Why?
  9. (Advanced) Implement multiple producers and consumers in 3000pc-rendezvous. Compare your results to what you did in 3000pc-fifo.

Code

source zip file

3000pc-fifo.c

/* 3000pc-fifo.c  Simple producer-consumer with a fifo
 * Original Version Copyright (C) 2017  Anil Somayaji
 * Modified Version Copyright (C) 2020  William Findlay
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>. */

/* You really shouldn't be incorporating parts of this in any other code,
   it is meant for teaching, not production */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
#include <time.h>

#define QUEUESIZE 32
#define WORDSIZE 16

const int wordlist_size = 27;
const char *wordlist[] = {
        "Alpha",
        "Bravo",
        "Charlie",
        "Delta",
        "Echo",
        "Foxtrot",
        "Golf",
        "Hotel",
        "India",
        "Juliet",
        "Kilo",
        "Lima",
        "Mike",
        "November",
        "Oscar",
        "Papa",
        "Quebec",
        "Romeo",
        "Sierra",
        "Tango",
        "Uniform",
        "Victor",
        "Whiskey",
        "X-ray",
        "Yankee",
        "Zulu",
        "Dash"
};

void report_error(char *error)
{
        fprintf(stderr, "Error: %s\n", error);
}

void usage_exit(char *progname)
{
        fprintf(stderr,
                "Usage: %s <event count> <prod interval int> <con interval int>\n",
                progname);
        exit(-1);
}

void pick_word(char *word)
{
        int pick;

        pick = random() % wordlist_size;

        strcpy(word, wordlist[pick]);
}

void output_word(int c, char *w)
{
        printf("Word %d: %s\n", c, w);
}

int queue_word(char *word, int pipefd_write)
{
        if (write(pipefd_write, word, WORDSIZE) == -1)
        {
                fprintf(stderr, "Error: Unable to write to pipe: %s\n", strerror(errno));
                return -1;
        }

        return 0;
}

int get_next_word(char *word, int pipefd_read)
{
        if (read(pipefd_read, word, WORDSIZE) == -1)
        {
                fprintf(stderr, "Error: Unable to read from pipe: %s\n", strerror(errno));
                return -1;
        }

        return 0;
}

void producer(int event_count, int pipefd_write, int prod_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                pick_word(word);
                queue_word(word, pipefd_write);

                /* Don't sleep if interval <= 0 */
                if (prod_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (prod_interval > 0 && i % prod_interval == 0)
                {
                        fprintf(stderr, "Producer sleeping for 1 second...\n");
                        sleep(1);
                }
        }

        close(pipefd_write);
        fprintf(stderr, "Producer finished.\n");
        exit(0);
}

void consumer(int event_count, int pipefd_read, int con_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                get_next_word(word, pipefd_read);
                output_word(i, word);

                /* Don't sleep if interval <= 0 */
                if (con_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (con_interval > 0 && i % con_interval == 0)
                {
                        fprintf(stderr, "Consumer sleeping for 1 second...\n");
                        sleep(1);
                }
        }

        close(pipefd_read);
        fprintf(stderr, "Consumer finished.\n");
        exit(0);
}

int main(int argc, char *argv[])
{
        int pid, count, prod_interval, con_interval;
        int pipefd[2];

        srandom(time(NULL));

        if (argc < 4)
        {
                if (argc < 1)
                {
                        report_error("no command line");
                        usage_exit(argv[0]);
                }
                else
                {
                        report_error("Not enough arguments");
                        usage_exit(argv[0]);
                }
        }

        count = atoi(argv[1]);
        prod_interval = atoi(argv[2]);
        con_interval = atoi(argv[3]);

        /* Open a fifo
         * pipefd[0] will be open for reading, and
         * pipefd[1] will be open for writing */
        if (pipe(pipefd))
        {
                fprintf(stderr, "Error: Unable to open pipe: %s\n", strerror(errno));
                exit(-1);
        }

        pid = fork();

        if (pid == 0)
        {
                /* Producer */
                producer(count, pipefd[1], prod_interval);
        }
        else
        {
                /* Consumer */
                consumer(count, pipefd[0], con_interval);
        }

        /* This line should never be reached */
        return -1;
}

3000pc-rendezvous.c

/* 3000pc-rendezvous.c  More complex producer-consumer using mmap shared memory and pthread_cond_wait
 * Original Version Copyright (C) 2017  Anil Somayaji
 * Modified Version Copyright (C) 2020  William Findlay
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>. */

/* You really shouldn't be incorporating parts of this in any other code,
   it is meant for teaching, not production */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#define QUEUESIZE 32
#define WORDSIZE 16

const int wordlist_size = 27;
const char *wordlist[] = {
        "Alpha",
        "Bravo",
        "Charlie",
        "Delta",
        "Echo",
        "Foxtrot",
        "Golf",
        "Hotel",
        "India",
        "Juliet",
        "Kilo",
        "Lima",
        "Mike",
        "November",
        "Oscar",
        "Papa",
        "Quebec",
        "Romeo",
        "Sierra",
        "Tango",
        "Uniform",
        "Victor",
        "Whiskey",
        "X-ray",
        "Yankee",
        "Zulu",
        "Dash"
};

typedef struct entry {
        char word[WORDSIZE];
        sem_t lock;
} entry;

typedef struct shared {
        pthread_mutex_t cond_mutex;
        pthread_cond_t  queue_nonempty;
        pthread_cond_t  queue_nonfull;
        entry queue[QUEUESIZE];
        int last_produced;
        int last_consumed;
        pid_t prod_pid;
        pid_t con_pid;
        int prod_count;
        int con_count;
} shared;


void report_error(char *error)
{
        fprintf(stderr, "Error: %s\n", error);
}

void usage_exit(char *progname)
{
        fprintf(stderr,
                "Usage: %s <event count> <prod interval int> <con interval int>\n",
                progname);
        exit(-1);
}

void pick_word(char *word)
{
        unsigned int pick;

        /* Open /dev/urandom for reading */
        int fd = open("/dev/urandom", O_RDONLY);
        if (fd < 0)
        {
                fprintf(stderr, "Error: Unable to open /dev/urandom for reading: %s\n", strerror(errno));
                pick = 0;
        }
        else if (read(fd, (void *)&pick, sizeof(pick)) == -1)
        {
                fprintf(stderr, "Error: Unable to read from /dev/urandom: %s\n",strerror(errno));
                pick = 0;
        }

        pick = pick % wordlist_size;

        strcpy(word, wordlist[pick]);

        close(fd);
}

void wait_for_producer(shared *s)
{
        pthread_mutex_lock(&s->cond_mutex);
        /* fprintf(stderr, "Waiting for producer...\n"); */
        pthread_cond_wait(&s->queue_nonempty, &s->cond_mutex);
        pthread_mutex_unlock(&s->cond_mutex);
}

void wait_for_consumer(shared *s)
{
        pthread_mutex_lock(&s->cond_mutex);
        /* fprintf(stderr, "Waiting for consumer...\n"); */
        pthread_cond_wait(&s->queue_nonfull, &s->cond_mutex);
        pthread_mutex_unlock(&s->cond_mutex);
}

void output_word(int c, char *w)
{
        printf("Word %d: %s\n", c, w);
}

int queue_word(char *word, shared *s)
{
        entry *e;
        int current, retval;

        current = (s->last_produced + 1) % QUEUESIZE;

        e = &s->queue[current];

        sem_wait(&e->lock);

        if (e->word[0] != '\0')
        {
                /* consumer hasn't consumed this entry yet */
                sem_post(&e->lock);
                wait_for_consumer(s);
                sem_wait(&e->lock);
        }

        if (e->word[0] != '\0')
        {
                fprintf(stderr, "ERROR: No room for producer after waiting!\n");
                retval = -1;
                goto done;
        }
        else
        {
                strncpy(e->word, word, WORDSIZE);
                s->last_produced = current;
                s->prod_count++;
                /* Notify that queue is nonempty */
                pthread_cond_signal(&s->queue_nonempty);
                retval = 0;
                goto done;
        }

 done:
        sem_post(&e->lock);
        return retval;
}

int get_next_word(char *word, shared *s)
{
        entry *e;
        int current, retval;

        current = (s->last_consumed + 1) % QUEUESIZE;

        e = &s->queue[current];

        sem_wait(&e->lock);

        if (e->word[0] == '\0')
        {
                /* producer hasn't filled in this entry yet */
                sem_post(&e->lock);
                wait_for_producer(s);
                sem_wait(&e->lock);
        }

        if (e->word[0] == '\0')
        {
                fprintf(stderr, "ERROR: Nothing for consumer after waiting!\n");
                retval = -1;
                goto done;
        }
        else
        {
                strncpy(word, e->word, WORDSIZE);
                e->word[0] = '\0';
                s->last_consumed = current;
                s->con_count++;
                /* Notify that queue is nonfull */
                pthread_cond_signal(&s->queue_nonfull);
                retval = 0;
                goto done;
        }

 done:
        sem_post(&e->lock);
        return retval;
}

void producer(shared *s, int event_count, int prod_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                pick_word(word);
                queue_word(word, s);

                /* Don't sleep if interval <= 0 */
                if (prod_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (i % prod_interval == 0)
                {
                        fprintf(stderr, "Producer sleeping for 1 second...\n");
                        sleep(1);
                }
        }

        fprintf(stderr, "Producer finished.\n");
        exit(0);
}

void consumer(shared *s, int event_count, int con_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                get_next_word(word, s);
                output_word(s->con_count, word);

                /* Don't sleep if interval <= 0 */
                if (con_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (i % con_interval == 0)
                {
                        fprintf(stderr, "Consumer sleeping for 1 second...\n");
                        sleep(1);
                }
        }

        fprintf(stderr, "Consumer finished.\n");
        exit(0);
}

void init_shared(shared *s)
{
        int i;

        /* We need to explicitly mark the mutex as shared or risk undefined behavior */
        pthread_mutexattr_t mattr = {};
        pthread_mutexattr_setpshared(&mattr, 1);
        pthread_mutex_init(&s->cond_mutex, &mattr);

        /* We need to explicitly mark the conditions as shared or risk undefined behavior */
        pthread_condattr_t cattr = {};
        pthread_condattr_setpshared(&cattr, 1);
        pthread_cond_init(&s->queue_nonempty, &cattr);
        pthread_cond_init(&s->queue_nonfull, &cattr);

        s->last_consumed = -1;
        s->last_produced = -1;

        s->prod_pid = -1;
        s->con_pid  = -1;

        s->prod_count = 0;
        s->con_count  = 0;

        for (i=0; i<QUEUESIZE; i++)
        {
                s->queue[i].word[0] = '\0';
                /* semaphore is shared between processes,
                   and initial value is 1 (unlocked) */
                sem_init(&s->queue[i].lock, 1, 1);
        }
}

int main(int argc, char *argv[])
{
        int pid, count, prod_interval, con_interval;

        shared *s;

        if (argc < 4)
        {
                if (argc < 1)
                {
                        report_error("no command line");
                        usage_exit(argv[0]);
                }
                else
                {
                        report_error("Not enough arguments");
                        usage_exit(argv[0]);
                }
        }

        count = atoi(argv[1]);
        prod_interval = atoi(argv[2]);
        con_interval = atoi(argv[3]);

        s = (shared *) mmap(NULL, sizeof(shared),
                             PROT_READ|PROT_WRITE,
                             MAP_SHARED|MAP_ANONYMOUS, -1, 0);

        if (s == MAP_FAILED)
        {
                fprintf(stderr, "Error: Unable to mmap: %s\n", strerror(errno));
                exit(-1);
        }

        init_shared(s);

        pid = fork();

        if (pid == 0)
        {
                /* Producer */
                s->prod_pid = getpid();
                producer(s, count, prod_interval);
        } else
        {
                /* Consumer */
                s->con_pid = getpid();
                consumer(s, count, con_interval);
        }

        /* This line should never be reached */
        return -1;
}

3000pc-rendezvous-timeout.c

/* 3000pc-rendezvous-timeout.c  More complex producer-consumer using 
   mmap shared memory and pthread_cond_wait
 * Original Version Copyright (C) 2017  Anil Somayaji
 * Modified Version Copyright (C) 2020  William Findlay & Anil Somayaji
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>. */

/* You really shouldn't be incorporating parts of this in any other code,
   it is meant for teaching, not production */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>

#define QUEUESIZE 32
#define WORDSIZE 16

const int wordlist_size = 27;
const char *wordlist[] = {
        "Alpha",
        "Bravo",
        "Charlie",
        "Delta",
        "Echo",
        "Foxtrot",
        "Golf",
        "Hotel",
        "India",
        "Juliet",
        "Kilo",
        "Lima",
        "Mike",
        "November",
        "Oscar",
        "Papa",
        "Quebec",
        "Romeo",
        "Sierra",
        "Tango",
        "Uniform",
        "Victor",
        "Whiskey",
        "X-ray",
        "Yankee",
        "Zulu",
        "Dash"
};

typedef struct entry {
        char word[WORDSIZE];
        sem_t lock;
} entry;

typedef struct shared {
        pthread_mutex_t nonempty_mutex;
        pthread_mutex_t nonfull_mutex;
        pthread_cond_t  queue_nonempty;
        pthread_cond_t  queue_nonfull;
        entry queue[QUEUESIZE];
        int last_produced;
        int last_consumed;
        pid_t prod_pid;
        pid_t con_pid;
        int prod_count;
        int con_count;
} shared;


void report_error(char *error)
{
        fprintf(stderr, "Error: %s\n", error);
}

void usage_exit(char *progname)
{
        fprintf(stderr,
                "Usage: %s <event count> <prod interval int> <con interval int>\n",
                progname);
        exit(-1);
}

void pick_word(char *word)
{
        unsigned int pick;

        /* Open /dev/urandom for reading */
        int fd = open("/dev/urandom", O_RDONLY);
        if (fd < 0)
        {
                fprintf(stderr,
                        "Error: Unable to open /dev/urandom for reading: %s\n",
                        strerror(errno));
                pick = 0;
        }
        else if (read(fd, (void *) &pick, sizeof(pick)) == -1)
        {
                fprintf(stderr,
                        "Error: Unable to read from /dev/urandom: %s\n",
                        strerror(errno));
                pick = 0;
        }

        pick = pick % wordlist_size;

        strcpy(word, wordlist[pick]);

        close(fd);
}

void wait_for_producer(shared *s)
{
        struct timeval now;
        struct timespec timeout;

        /* fprintf(stderr, "%d: Waiting for producer...\n", s->con_count); */

        gettimeofday(&now, NULL);
        /* 10 millisecond from now = 10,000,000 nanoseconds */
        timeout.tv_sec = now.tv_sec;
        timeout.tv_nsec = (now.tv_usec * 1000) + 10000000;

        pthread_mutex_lock(&s->nonempty_mutex);
        pthread_cond_timedwait(&s->queue_nonempty,
                               &s->nonempty_mutex,
                               &timeout);
        pthread_mutex_unlock(&s->nonempty_mutex);
}

void wait_for_consumer(shared *s)
{
        struct timeval now;
        struct timespec timeout;

        /* fprintf(stderr, "%d: Waiting for consumer...\n", s->prod_count); */

        gettimeofday(&now, NULL);
        /* 10 millisecond from now = 10,000,000 nanoseconds */
        timeout.tv_sec = now.tv_sec;
        timeout.tv_nsec = (now.tv_usec * 1000) + 10000000;
        
        pthread_mutex_lock(&s->nonfull_mutex);
        pthread_cond_timedwait(&s->queue_nonfull,
                               &s->nonfull_mutex,
                               &timeout);
        pthread_mutex_unlock(&s->nonfull_mutex);
}

void output_word(int c, char *w)
{
        printf("Word %d: %s\n", c, w);
}

int queue_word(char *word, shared *s)
{
        entry *e;
        int current, retval;
        int wait_count = 0;
        int wait_count_max = 2000; /* 2000 1-millisecond waits */
        
        current = (s->last_produced + 1) % QUEUESIZE;

        e = &s->queue[current];

        sem_wait(&e->lock);

        while (e->word[0] != '\0')
        {
                /* consumer hasn't consumed this entry yet */
                wait_count++;
                sem_post(&e->lock);
                usleep(1);
                wait_for_consumer(s);
                sem_wait(&e->lock);
                if (wait_count >= wait_count_max) {
                        break;
                }
        }

        if (e->word[0] != '\0')
        {
                fprintf(stderr,
                        "ERROR: No room for producer after waiting %d times!\n",
                        wait_count_max);
                retval = -1;
                goto done;
        }
        else
        {
                strncpy(e->word, word, WORDSIZE);
                s->last_produced = current;
                s->prod_count++;
                retval = 0;
                goto done;
        }

 done:
        sem_post(&e->lock);
        /* Notify that queue is nonempty */
        pthread_mutex_lock(&s->nonempty_mutex);
        pthread_cond_signal(&s->queue_nonempty);
        pthread_mutex_unlock(&s->nonempty_mutex);

        return retval;
}

int get_next_word(char *word, shared *s)
{
        entry *e;
        int current, retval;
        int wait_count = 0;
        int wait_count_max = 2000;

        current = (s->last_consumed + 1) % QUEUESIZE;

        e = &s->queue[current];

        sem_wait(&e->lock);
        
        while (e->word[0] == '\0')
        {
                /* producer hasn't filled in this entry yet */
                wait_count++;
                sem_post(&e->lock);
                usleep(1);
                wait_for_producer(s);
                sem_wait(&e->lock);
                if (wait_count >= wait_count_max) {
                        break;
                }
        }

        if (e->word[0] == '\0')
        {
                fprintf(stderr,
                        "ERROR: Nothing for consumer after waiting %d times!\n",
                        wait_count_max);
                retval = -1;
                goto done;
        }
        else
        {
                strncpy(word, e->word, WORDSIZE);
                e->word[0] = '\0';
                s->last_consumed = current;
                s->con_count++;
                retval = 0;
                goto done;
        }

 done:
        sem_post(&e->lock);
        /* Notify that queue is nonfull */
        pthread_mutex_lock(&s->nonfull_mutex);
        pthread_cond_signal(&s->queue_nonfull);
        pthread_mutex_unlock(&s->nonfull_mutex);
        return retval;
}

void producer(shared *s, int event_count, int prod_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                pick_word(word);
                queue_word(word, s);

                /* Don't sleep if interval <= 0 */
                if (prod_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (i % prod_interval == 0)
                {
                        fprintf(stderr,
                                "%d: Producer sleeping for 1 second...\n",
                                s->prod_count);
                        sleep(1);
                }
        }

        fprintf(stderr, "Producer finished.\n");
        exit(0);
}

void consumer(shared *s, int event_count, int con_interval)
{
        char word[WORDSIZE];
        int i;

        for (i=0; i < event_count; i++)
        {
                get_next_word(word, s);
                output_word(s->con_count, word);

                /* Don't sleep if interval <= 0 */
                if (con_interval <= 0)
                        continue;
                /* Sleep if we hit our interval */
                if (i % con_interval == 0)
                {
                        fprintf(stderr,
                                "%d: Consumer sleeping for 1 second...\n",
                                s->con_count);
                        sleep(1);
                }
        }

        fprintf(stderr, "Consumer finished.\n");
        exit(0);
}

void init_shared(shared *s)
{
        int i;

        pthread_mutexattr_t mattr;
        pthread_condattr_t cattr;
        
        /* We need to explicitly mark the mutex as shared or 
           risk undefined behavior */
        pthread_mutexattr_init(&mattr);
        pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
        pthread_mutex_init(&s->nonempty_mutex, &mattr);
        pthread_mutex_init(&s->nonfull_mutex, &mattr);

        /* We need to explicitly mark the conditions as shared or 
           risk undefined behavior */

        pthread_condattr_init(&cattr);
        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
        pthread_cond_init(&s->queue_nonempty, &cattr);
        pthread_cond_init(&s->queue_nonfull, &cattr);

        s->last_consumed = -1;
        s->last_produced = -1;

        s->prod_pid = -1;
        s->con_pid  = -1;

        s->prod_count = 0;
        s->con_count  = 0;

        for (i=0; i<QUEUESIZE; i++)
        {
                s->queue[i].word[0] = '\0';
                /* semaphore is shared between processes,
                   and initial value is 1 (unlocked) */
                sem_init(&s->queue[i].lock, 1, 1);
        }
}

int main(int argc, char *argv[])
{
        int pid, count, prod_interval, con_interval;

        shared *s;

        if (argc < 4)
        {
                if (argc < 1)
                {
                        report_error("no command line");
                        usage_exit(argv[0]);
                }
                else
                {
                        report_error("Not enough arguments");
                        usage_exit(argv[0]);
                }
        }

        count = atoi(argv[1]);
        prod_interval = atoi(argv[2]);
        con_interval = atoi(argv[3]);

        s = (shared *) mmap(NULL, sizeof(shared),
                             PROT_READ|PROT_WRITE,
                             MAP_SHARED|MAP_ANONYMOUS, -1, 0);

        if (s == MAP_FAILED)
        {
                fprintf(stderr, "Error: Unable to mmap: %s\n", strerror(errno));
                exit(-1);
        }

        init_shared(s);

        pid = fork();

        if (pid == 0)
        {
                /* Producer */
                s->prod_pid = getpid();
                producer(s, count, prod_interval);
        } else
        {
                /* Consumer */
                s->con_pid = getpid();
                consumer(s, count, con_interval);
        }

        /* This line should never be reached */
        return -1;
}

Makefile

CC       = gcc

CFLAGS   = -O2 -Wall -fno-inline
LDFLAGS   = -pthread

SRC      = $(wildcard *.c)
EXEC     = $(SRC:.c=)

all: $(EXEC)

$(EXEC): $(SRC)
	$(CC) -o $@ $@.c $(CFLAGS) $(LDFLAGS)

.PHONY: clean mrproper

clean:
	@rm -rf *.o
	@rm -rf $(EXEC)