Operating Systems 2021F: Tutorial 8

From Soma-notes
Jump to navigation Jump to search

In this tutorial you'll be learning about two implementations of the producer-consumer problem, a classic example of a concurrency problem. The class textbook covers concurrency in great detail in Chapters 25-34, and the producer-consumer problem is covered in Chapter 30 (Condition Variables) and Chapter 31 (Semaphores). While you should look at this part of the textbook, note that we will not be covering this material in the same level of detail, as should be clear from this tutorial.

A: Getting Started

  1. Make sure you system has the a recent version of bcc-tools installed. The class VM has this already installed; on other systems, you'll have to install it yourself.
  2. Download 3000pc.zip, unpack, and run make to compile 3000pc-fifo and 3000pc-rendezvous.
  3. Note that these programs take 3 arguments each:
    • The number of events to process
    • The number of events to produce before the producer sleeps for 1 second
    • The number of events to consume before the consumer sleeps for 1 second
  4. Run both programs with the same arguments of your choice. Do this a few times. Do they behave the same way? Do you notice any differences?
  5. Repeat the above experiment, this time running each program under strace (with the -f flag to trace children too). Do you notice any difference in the system calls each program makes?
  6. In the next question, you will be playing around with trace from bcc-tools. trace is very similar to bpftrace. As any tool based on eBPF, it needs to be run with root privileges. (sudo should work on the class VM)
  7. Here are some example trace commands to play with. You can learn more about trace from its man page. Note these assume you unpacked the 3000pc.zip in the Documents folder.
Report on the string passed to pick_word() on its return:

   trace 'r:/home/student/Documents/3000pc/3000pc-fifo:pick_word "%s", arg1'

Print the buffer passed to the read function as a string (on read's return) for process 6312 (running 3000pc-fifo, won't work on 3000pc-rendezvous):

   trace -p 6312 'r:pthread:read "%s", arg2'

(Normally read and write are in the main C library and so would be denoted by r:c:read; however we are linking against the pthread library as well and it replaces the read implementation.)

Print calls to sem_wait in process 6312 and the value of the semaphore at function entry and return.

   trace -p 6312 'p:pthread:sem_wait(int *sem) "entry: %d", *sem' 'r:pthread:sem_wait(int *sem) "exit:  %d", *sem'
 


If you want to automatically trace the producer process, use 

   trace -p `pidof -s 3000pc-fifo`  ...

To trace the consumer, use

   trace -p `pidof 3000pc-fifo | awk '{print $2}'` ...

B: Producer/Consumer with Pipes

  1. Examine the source code of 3000pc-fifo. Explain the following:
    1. What does the call to pipe(pipefd) on line 192 do? Hint: Look at the manpage for pipe(2)
    2. How does the consumer receive words to consume?
    3. How does the producer send words to the consumer?
    4. Why is the call to srandom(time(NULL)) on line 169 necessary?
  2. Replace the call to srandom(time(NULL)) on line 169 with srandom(42). What differences do you notice in 3000pc-fifo's behavior?
  3. What descriptors are opened by 3000pc-fifo? What are they for and what files do the point to? How did you find this information? Hint: Try checking in /proc.
  4. What do you think would happen if you had multiple producers and multiple consumers in 3000pc-fifo? Would things break or work normally?
  5. (Advanced) Implement multiple producers and consumers in 3000pc-fifo. Compare your results with your answer to question 4.

C: Producer/Consumer with Shared Memory

  1. Examine the source code of 3000pc-rendezvous. Explain the following:
    1. What are a few ways that 3000pc-rendezvous is different from 3000pc-fifo?
    2. What does the call to mmap on line 335 do? How did you figure this out?
    3. How does the producer notify the consumer that the queue is no longer empty?
    4. How does the consumer notify the producer that the queue is no longer full?
  2. What arguments can you provide to make the producer wait for the consumer? Hint: Check the size of the queue.
  3. What arguments can you provide to make the consumer wait for the producer?
  4. Another student tells you that the difference between processes and threads is that processes never share memory, while threads do. Is this statement correct or incorrect? How can the behavior of 3000pc-rendezvous help justify your answer?
  5. Note that 3000pc-rendezvous uses /dev/urandom to pick its words instead of the C library's random functions. How do these two methods differ? You may wish to look at man urandom(4).
  6. Change the calls to pthread_mutexattr_setpshared and pthread_condattr_setpshared to take 0 instead of 1. How does the behavior or 3000pc-rendezvous change? Does anything break? Demonstrate this using your answers to question 2 and question 3.
  7. 3000pc-rendezvous can deadlock when run for long enough. 3000pc-rendezvous-timeout should not deadlock, but it may pause sometimes. What is the difference between these two programs? Why would this effect the tendency to deadlock?
  8. Benchmark all three programs using the time command. For example, run 3000fifo as follows:

      time (./3000pc-fifo 100000 0 0 > fifo-output.log)

    Which one is fastest? Why?
  9. (Advanced) Implement multiple producers and consumers in 3000pc-rendezvous. Compare your results to what you did in 3000pc-fifo.

Code

source zip file

3000pc-fifo.c

  1 /* 3000pc-fifo.c  Simple producer-consumer with a fifo
  2  * Original Version Copyright (C) 2017  Anil Somayaji
  3  * Modified Version Copyright (C) 2020  William Findlay
  4  *
  5  * This program is free software: you can redistribute it and/or modify
  6  * it under the terms of the GNU General Public License as published by
  7  * the Free Software Foundation, either version 3 of the License, or
  8  * (at your option) any later version.
  9  *
 10  * This program is distributed in the hope that it will be useful,
 11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13  * GNU General Public License for more details.
 14  *
 15  * You should have received a copy of the GNU General Public License
 16  * along with this program.  If not, see <https://www.gnu.org/licenses/>. */
 17 
 18 /* You really shouldn't be incorporating parts of this in any other code,
 19    it is meant for teaching, not production */
 20 
 21 #include <stdio.h>
 22 #include <stdlib.h>
 23 #include <unistd.h>
 24 #include <sys/mman.h>
 25 #include <errno.h>
 26 #include <string.h>
 27 #include <time.h>
 28 
 29 #define QUEUESIZE 32
 30 #define WORDSIZE 16
 31 
 32 const int wordlist_size = 27;
 33 const char *wordlist[] = {
 34         "Alpha",
 35         "Bravo",
 36         "Charlie",
 37         "Delta",
 38         "Echo",
 39         "Foxtrot",
 40         "Golf",
 41         "Hotel",
 42         "India",
 43         "Juliet",
 44         "Kilo",
 45         "Lima",
 46         "Mike",
 47         "November",
 48         "Oscar",
 49         "Papa",
 50         "Quebec",
 51         "Romeo",
 52         "Sierra",
 53         "Tango",
 54         "Uniform",
 55         "Victor",
 56         "Whiskey",
 57         "X-ray",
 58         "Yankee",
 59         "Zulu",
 60         "Dash"
 61 };
 62 
 63 void report_error(char *error)
 64 {
 65         fprintf(stderr, "Error: %s\n", error);
 66 }
 67 
 68 void usage_exit(char *progname)
 69 {
 70         fprintf(stderr,
 71                 "Usage: %s <event count> <prod interval int> <con interval int>\n",
 72                 progname);
 73         exit(-1);
 74 }
 75 
 76 void pick_word(char *word)
 77 {
 78         int pick;
 79 
 80         pick = random() % wordlist_size;
 81 
 82         strcpy(word, wordlist[pick]);
 83 }
 84 
 85 void output_word(int c, char *w)
 86 {
 87         printf("Word %d: %s\n", c, w);
 88 }
 89 
 90 int queue_word(char *word, int pipefd_write)
 91 {
 92         if (write(pipefd_write, word, WORDSIZE) == -1)
 93         {
 94                 fprintf(stderr, "Error: Unable to write to pipe: %s\n", strerror(errno));
 95                 return -1;
 96         }
 97 
 98         return 0;
 99 }
100 
101 int get_next_word(char *word, int pipefd_read)
102 {
103         if (read(pipefd_read, word, WORDSIZE) == -1)
104         {
105                 fprintf(stderr, "Error: Unable to read from pipe: %s\n", strerror(errno));
106                 return -1;
107         }
108 
109         return 0;
110 }
111 
112 void producer(int event_count, int pipefd_write, int prod_interval)
113 {
114         char word[WORDSIZE];
115         int i;
116 
117         for (i=0; i < event_count; i++)
118         {
119                 pick_word(word);
120                 queue_word(word, pipefd_write);
121 
122                 /* Don't sleep if interval <= 0 */
123                 if (prod_interval <= 0)
124                         continue;
125                 /* Sleep if we hit our interval */
126                 if (prod_interval > 0 && i % prod_interval == 0)
127                 {
128                         fprintf(stderr, "Producer sleeping for 1 second...\n");
129                         sleep(1);
130                 }
131         }
132 
133         close(pipefd_write);
134         fprintf(stderr, "Producer finished.\n");
135         exit(0);
136 }
137 
138 void consumer(int event_count, int pipefd_read, int con_interval)
139 {
140         char word[WORDSIZE];
141         int i;
142 
143         for (i=0; i < event_count; i++)
144         {
145                 get_next_word(word, pipefd_read);
146                 output_word(i, word);
147 
148                 /* Don't sleep if interval <= 0 */
149                 if (con_interval <= 0)
150                         continue;
151                 /* Sleep if we hit our interval */
152                 if (con_interval > 0 && i % con_interval == 0)
153                 {
154                         fprintf(stderr, "Consumer sleeping for 1 second...\n");
155                         sleep(1);
156                 }
157         }
158 
159         close(pipefd_read);
160         fprintf(stderr, "Consumer finished.\n");
161         exit(0);
162 }
163 
164 int main(int argc, char *argv[])
165 {
166         int pid, count, prod_interval, con_interval;
167         int pipefd[2];
168 
169         srandom(time(NULL));
170 
171         if (argc < 4)
172         {
173                 if (argc < 1)
174                 {
175                         report_error("no command line");
176                         usage_exit(argv[0]);
177                 }
178                 else
179                 {
180                         report_error("Not enough arguments");
181                         usage_exit(argv[0]);
182                 }
183         }
184 
185         count = atoi(argv[1]);
186         prod_interval = atoi(argv[2]);
187         con_interval = atoi(argv[3]);
188 
189         /* Open a fifo
190          * pipefd[0] will be open for reading, and
191          * pipefd[1] will be open for writing */
192         if (pipe(pipefd))
193         {
194                 fprintf(stderr, "Error: Unable to open pipe: %s\n", strerror(errno));
195                 exit(-1);
196         }
197 
198         pid = fork();
199 
200         if (pid == 0)
201         {
202                 /* Producer */
203                 producer(count, pipefd[1], prod_interval);
204         }
205         else
206         {
207                 /* Consumer */
208                 consumer(count, pipefd[0], con_interval);
209         }
210 
211         /* This line should never be reached */
212         return -1;
213 }

3000pc-rendezvous.c

  1 /* 3000pc-rendezvous.c  More complex producer-consumer using mmap shared memory and pthread_cond_wait
  2  * Original Version Copyright (C) 2017  Anil Somayaji
  3  * Modified Version Copyright (C) 2020  William Findlay
  4  *
  5  * This program is free software: you can redistribute it and/or modify
  6  * it under the terms of the GNU General Public License as published by
  7  * the Free Software Foundation, either version 3 of the License, or
  8  * (at your option) any later version.
  9  *
 10  * This program is distributed in the hope that it will be useful,
 11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13  * GNU General Public License for more details.
 14  *
 15  * You should have received a copy of the GNU General Public License
 16  * along with this program.  If not, see <https://www.gnu.org/licenses/>. */
 17 
 18 /* You really shouldn't be incorporating parts of this in any other code,
 19    it is meant for teaching, not production */
 20 
 21 #include <stdio.h>
 22 #include <stdlib.h>
 23 #include <unistd.h>
 24 #include <sys/types.h>
 25 #include <sys/stat.h>
 26 #include <fcntl.h>
 27 #include <sys/mman.h>
 28 #include <errno.h>
 29 #include <string.h>
 30 #include <sys/types.h>
 31 #include <pthread.h>
 32 #include <semaphore.h>
 33 
 34 #define QUEUESIZE 32
 35 #define WORDSIZE 16
 36 
 37 const int wordlist_size = 27;
 38 const char *wordlist[] = {
 39         "Alpha",
 40         "Bravo",
 41         "Charlie",
 42         "Delta",
 43         "Echo",
 44         "Foxtrot",
 45         "Golf",
 46         "Hotel",
 47         "India",
 48         "Juliet",
 49         "Kilo",
 50         "Lima",
 51         "Mike",
 52         "November",
 53         "Oscar",
 54         "Papa",
 55         "Quebec",
 56         "Romeo",
 57         "Sierra",
 58         "Tango",
 59         "Uniform",
 60         "Victor",
 61         "Whiskey",
 62         "X-ray",
 63         "Yankee",
 64         "Zulu",
 65         "Dash"
 66 };
 67 
 68 typedef struct entry {
 69         char word[WORDSIZE];
 70         sem_t lock;
 71 } entry;
 72 
 73 typedef struct shared {
 74         pthread_mutex_t cond_mutex;
 75         pthread_cond_t  queue_nonempty;
 76         pthread_cond_t  queue_nonfull;
 77         entry queue[QUEUESIZE];
 78         int last_produced;
 79         int last_consumed;
 80         pid_t prod_pid;
 81         pid_t con_pid;
 82         int prod_count;
 83         int con_count;
 84 } shared;
 85 
 86 
 87 void report_error(char *error)
 88 {
 89         fprintf(stderr, "Error: %s\n", error);
 90 }
 91 
 92 void usage_exit(char *progname)
 93 {
 94         fprintf(stderr,
 95                 "Usage: %s <event count> <prod interval int> <con interval int>\n",
 96                 progname);
 97         exit(-1);
 98 }
 99 
100 void pick_word(char *word)
101 {
102         unsigned int pick;
103 
104         /* Open /dev/urandom for reading */
105         int fd = open("/dev/urandom", O_RDONLY);
106         if (fd < 0)
107         {
108                 fprintf(stderr, "Error: Unable to open /dev/urandom for reading: %s\n", strerror(errno));
109                 pick = 0;
110         }
111         else if (read(fd, (void *)&pick, sizeof(pick)) == -1)
112         {
113                 fprintf(stderr, "Error: Unable to read from /dev/urandom: %s\n",strerror(errno));
114                 pick = 0;
115         }
116 
117         pick = pick % wordlist_size;
118 
119         strcpy(word, wordlist[pick]);
120 
121         close(fd);
122 }
123 
124 void wait_for_producer(shared *s)
125 {
126         pthread_mutex_lock(&s->cond_mutex);
127         /* fprintf(stderr, "Waiting for producer...\n"); */
128         pthread_cond_wait(&s->queue_nonempty, &s->cond_mutex);
129         pthread_mutex_unlock(&s->cond_mutex);
130 }
131 
132 void wait_for_consumer(shared *s)
133 {
134         pthread_mutex_lock(&s->cond_mutex);
135         /* fprintf(stderr, "Waiting for consumer...\n"); */
136         pthread_cond_wait(&s->queue_nonfull, &s->cond_mutex);
137         pthread_mutex_unlock(&s->cond_mutex);
138 }
139 
140 void output_word(int c, char *w)
141 {
142         printf("Word %d: %s\n", c, w);
143 }
144 
145 int queue_word(char *word, shared *s)
146 {
147         entry *e;
148         int current, retval;
149 
150         current = (s->last_produced + 1) % QUEUESIZE;
151 
152         e = &s->queue[current];
153 
154         sem_wait(&e->lock);
155 
156         if (e->word[0] != '\0')
157         {
158                 /* consumer hasn't consumed this entry yet */
159                 sem_post(&e->lock);
160                 wait_for_consumer(s);
161                 sem_wait(&e->lock);
162         }
163 
164         if (e->word[0] != '\0')
165         {
166                 fprintf(stderr, "ERROR: No room for producer after waiting!\n");
167                 retval = -1;
168                 goto done;
169         }
170         else
171         {
172                 strncpy(e->word, word, WORDSIZE);
173                 s->last_produced = current;
174                 s->prod_count++;
175                 /* Notify that queue is nonempty */
176                 pthread_cond_signal(&s->queue_nonempty);
177                 retval = 0;
178                 goto done;
179         }
180 
181  done:
182         sem_post(&e->lock);
183         return retval;
184 }
185 
186 int get_next_word(char *word, shared *s)
187 {
188         entry *e;
189         int current, retval;
190 
191         current = (s->last_consumed + 1) % QUEUESIZE;
192 
193         e = &s->queue[current];
194 
195         sem_wait(&e->lock);
196 
197         if (e->word[0] == '\0')
198         {
199                 /* producer hasn't filled in this entry yet */
200                 sem_post(&e->lock);
201                 wait_for_producer(s);
202                 sem_wait(&e->lock);
203         }
204 
205         if (e->word[0] == '\0')
206         {
207                 fprintf(stderr, "ERROR: Nothing for consumer after waiting!\n");
208                 retval = -1;
209                 goto done;
210         }
211         else
212         {
213                 strncpy(word, e->word, WORDSIZE);
214                 e->word[0] = '\0';
215                 s->last_consumed = current;
216                 s->con_count++;
217                 /* Notify that queue is nonfull */
218                 pthread_cond_signal(&s->queue_nonfull);
219                 retval = 0;
220                 goto done;
221         }
222 
223  done:
224         sem_post(&e->lock);
225         return retval;
226 }
227 
228 void producer(shared *s, int event_count, int prod_interval)
229 {
230         char word[WORDSIZE];
231         int i;
232 
233         for (i=0; i < event_count; i++)
234         {
235                 pick_word(word);
236                 queue_word(word, s);
237 
238                 /* Don't sleep if interval <= 0 */
239                 if (prod_interval <= 0)
240                         continue;
241                 /* Sleep if we hit our interval */
242                 if (i % prod_interval == 0)
243                 {
244                         fprintf(stderr, "Producer sleeping for 1 second...\n");
245                         sleep(1);
246                 }
247         }
248 
249         fprintf(stderr, "Producer finished.\n");
250         exit(0);
251 }
252 
253 void consumer(shared *s, int event_count, int con_interval)
254 {
255         char word[WORDSIZE];
256         int i;
257 
258         for (i=0; i < event_count; i++)
259         {
260                 get_next_word(word, s);
261                 output_word(s->con_count, word);
262 
263                 /* Don't sleep if interval <= 0 */
264                 if (con_interval <= 0)
265                         continue;
266                 /* Sleep if we hit our interval */
267                 if (i % con_interval == 0)
268                 {
269                         fprintf(stderr, "Consumer sleeping for 1 second...\n");
270                         sleep(1);
271                 }
272         }
273 
274         fprintf(stderr, "Consumer finished.\n");
275         exit(0);
276 }
277 
278 void init_shared(shared *s)
279 {
280         int i;
281 
282         /* We need to explicitly mark the mutex as shared or risk undefined behavior */
283         pthread_mutexattr_t mattr = {};
284         pthread_mutexattr_setpshared(&mattr, 1);
285         pthread_mutex_init(&s->cond_mutex, &mattr);
286 
287         /* We need to explicitly mark the conditions as shared or risk undefined behavior */
288         pthread_condattr_t cattr = {};
289         pthread_condattr_setpshared(&cattr, 1);
290         pthread_cond_init(&s->queue_nonempty, &cattr);
291         pthread_cond_init(&s->queue_nonfull, &cattr);
292 
293         s->last_consumed = -1;
294         s->last_produced = -1;
295 
296         s->prod_pid = -1;
297         s->con_pid  = -1;
298 
299         s->prod_count = 0;
300         s->con_count  = 0;
301 
302         for (i=0; i<QUEUESIZE; i++)
303         {
304                 s->queue[i].word[0] = '\0';
305                 /* semaphore is shared between processes,
306                    and initial value is 1 (unlocked) */
307                 sem_init(&s->queue[i].lock, 1, 1);
308         }
309 }
310 
311 int main(int argc, char *argv[])
312 {
313         int pid, count, prod_interval, con_interval;
314 
315         shared *s;
316 
317         if (argc < 4)
318         {
319                 if (argc < 1)
320                 {
321                         report_error("no command line");
322                         usage_exit(argv[0]);
323                 }
324                 else
325                 {
326                         report_error("Not enough arguments");
327                         usage_exit(argv[0]);
328                 }
329         }
330 
331         count = atoi(argv[1]);
332         prod_interval = atoi(argv[2]);
333         con_interval = atoi(argv[3]);
334 
335         s = (shared *) mmap(NULL, sizeof(shared),
336                              PROT_READ|PROT_WRITE,
337                              MAP_SHARED|MAP_ANONYMOUS, -1, 0);
338 
339         if (s == MAP_FAILED)
340         {
341                 fprintf(stderr, "Error: Unable to mmap: %s\n", strerror(errno));
342                 exit(-1);
343         }
344 
345         init_shared(s);
346 
347         pid = fork();
348 
349         if (pid == 0)
350         {
351                 /* Producer */
352                 s->prod_pid = getpid();
353                 producer(s, count, prod_interval);
354         } else
355         {
356                 /* Consumer */
357                 s->con_pid = getpid();
358                 consumer(s, count, con_interval);
359         }
360 
361         /* This line should never be reached */
362         return -1;
363 }

3000pc-rendezvous-timeout.c

  1 /* 3000pc-rendezvous-timeout.c  More complex producer-consumer using 
  2    mmap shared memory and pthread_cond_wait
  3  * Original Version Copyright (C) 2017  Anil Somayaji
  4  * Modified Version Copyright (C) 2020  William Findlay & Anil Somayaji
  5  *
  6  * This program is free software: you can redistribute it and/or modify
  7  * it under the terms of the GNU General Public License as published by
  8  * the Free Software Foundation, either version 3 of the License, or
  9  * (at your option) any later version.
 10  *
 11  * This program is distributed in the hope that it will be useful,
 12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 14  * GNU General Public License for more details.
 15  *
 16  * You should have received a copy of the GNU General Public License
 17  * along with this program.  If not, see <https://www.gnu.org/licenses/>. */
 18 
 19 /* You really shouldn't be incorporating parts of this in any other code,
 20    it is meant for teaching, not production */
 21 
 22 #include <stdio.h>
 23 #include <stdlib.h>
 24 #include <unistd.h>
 25 #include <sys/types.h>
 26 #include <sys/stat.h>
 27 #include <fcntl.h>
 28 #include <sys/mman.h>
 29 #include <errno.h>
 30 #include <string.h>
 31 #include <sys/types.h>
 32 #include <pthread.h>
 33 #include <semaphore.h>
 34 #include <sys/time.h>
 35 
 36 #define QUEUESIZE 32
 37 #define WORDSIZE 16
 38 
 39 const int wordlist_size = 27;
 40 const char *wordlist[] = {
 41         "Alpha",
 42         "Bravo",
 43         "Charlie",
 44         "Delta",
 45         "Echo",
 46         "Foxtrot",
 47         "Golf",
 48         "Hotel",
 49         "India",
 50         "Juliet",
 51         "Kilo",
 52         "Lima",
 53         "Mike",
 54         "November",
 55         "Oscar",
 56         "Papa",
 57         "Quebec",
 58         "Romeo",
 59         "Sierra",
 60         "Tango",
 61         "Uniform",
 62         "Victor",
 63         "Whiskey",
 64         "X-ray",
 65         "Yankee",
 66         "Zulu",
 67         "Dash"
 68 };
 69 
 70 typedef struct entry {
 71         char word[WORDSIZE];
 72         sem_t lock;
 73 } entry;
 74 
 75 typedef struct shared {
 76         pthread_mutex_t nonempty_mutex;
 77         pthread_mutex_t nonfull_mutex;
 78         pthread_cond_t  queue_nonempty;
 79         pthread_cond_t  queue_nonfull;
 80         entry queue[QUEUESIZE];
 81         int last_produced;
 82         int last_consumed;
 83         pid_t prod_pid;
 84         pid_t con_pid;
 85         int prod_count;
 86         int con_count;
 87 } shared;
 88 
 89 
 90 void report_error(char *error)
 91 {
 92         fprintf(stderr, "Error: %s\n", error);
 93 }
 94 
 95 void usage_exit(char *progname)
 96 {
 97         fprintf(stderr,
 98                 "Usage: %s <event count> <prod interval int> <con interval int>\n",
 99                 progname);
100         exit(-1);
101 }
102 
103 void pick_word(char *word)
104 {
105         unsigned int pick;
106 
107         /* Open /dev/urandom for reading */
108         int fd = open("/dev/urandom", O_RDONLY);
109         if (fd < 0)
110         {
111                 fprintf(stderr,
112                         "Error: Unable to open /dev/urandom for reading: %s\n",
113                         strerror(errno));
114                 pick = 0;
115         }
116         else if (read(fd, (void *) &pick, sizeof(pick)) == -1)
117         {
118                 fprintf(stderr,
119                         "Error: Unable to read from /dev/urandom: %s\n",
120                         strerror(errno));
121                 pick = 0;
122         }
123 
124         pick = pick % wordlist_size;
125 
126         strcpy(word, wordlist[pick]);
127 
128         close(fd);
129 }
130 
131 void wait_for_producer(shared *s)
132 {
133         struct timeval now;
134         struct timespec timeout;
135 
136         /* fprintf(stderr, "%d: Waiting for producer...\n", s->con_count); */
137 
138         gettimeofday(&now, NULL);
139         /* 10 millisecond from now = 10,000,000 nanoseconds */
140         timeout.tv_sec = now.tv_sec;
141         timeout.tv_nsec = (now.tv_usec * 1000) + 10000000;
142 
143         pthread_mutex_lock(&s->nonempty_mutex);
144         pthread_cond_timedwait(&s->queue_nonempty,
145                                &s->nonempty_mutex,
146                                &timeout);
147         pthread_mutex_unlock(&s->nonempty_mutex);
148 }
149 
150 void wait_for_consumer(shared *s)
151 {
152         struct timeval now;
153         struct timespec timeout;
154 
155         /* fprintf(stderr, "%d: Waiting for consumer...\n", s->prod_count); */
156 
157         gettimeofday(&now, NULL);
158         /* 10 millisecond from now = 10,000,000 nanoseconds */
159         timeout.tv_sec = now.tv_sec;
160         timeout.tv_nsec = (now.tv_usec * 1000) + 10000000;
161         
162         pthread_mutex_lock(&s->nonfull_mutex);
163         pthread_cond_timedwait(&s->queue_nonfull,
164                                &s->nonfull_mutex,
165                                &timeout);
166         pthread_mutex_unlock(&s->nonfull_mutex);
167 }
168 
169 void output_word(int c, char *w)
170 {
171         printf("Word %d: %s\n", c, w);
172 }
173 
174 int queue_word(char *word, shared *s)
175 {
176         entry *e;
177         int current, retval;
178         int wait_count = 0;
179         int wait_count_max = 2000; /* 2000 1-millisecond waits */
180         
181         current = (s->last_produced + 1) % QUEUESIZE;
182 
183         e = &s->queue[current];
184 
185         sem_wait(&e->lock);
186 
187         while (e->word[0] != '\0')
188         {
189                 /* consumer hasn't consumed this entry yet */
190                 wait_count++;
191                 sem_post(&e->lock);
192                 usleep(1);
193                 wait_for_consumer(s);
194                 sem_wait(&e->lock);
195                 if (wait_count >= wait_count_max) {
196                         break;
197                 }
198         }
199 
200         if (e->word[0] != '\0')
201         {
202                 fprintf(stderr,
203                         "ERROR: No room for producer after waiting %d times!\n",
204                         wait_count_max);
205                 retval = -1;
206                 goto done;
207         }
208         else
209         {
210                 strncpy(e->word, word, WORDSIZE);
211                 s->last_produced = current;
212                 s->prod_count++;
213                 retval = 0;
214                 goto done;
215         }
216 
217  done:
218         sem_post(&e->lock);
219         /* Notify that queue is nonempty */
220         pthread_mutex_lock(&s->nonempty_mutex);
221         pthread_cond_signal(&s->queue_nonempty);
222         pthread_mutex_unlock(&s->nonempty_mutex);
223 
224         return retval;
225 }
226 
227 int get_next_word(char *word, shared *s)
228 {
229         entry *e;
230         int current, retval;
231         int wait_count = 0;
232         int wait_count_max = 2000;
233 
234         current = (s->last_consumed + 1) % QUEUESIZE;
235 
236         e = &s->queue[current];
237 
238         sem_wait(&e->lock);
239         
240         while (e->word[0] == '\0')
241         {
242                 /* producer hasn't filled in this entry yet */
243                 wait_count++;
244                 sem_post(&e->lock);
245                 usleep(1);
246                 wait_for_producer(s);
247                 sem_wait(&e->lock);
248                 if (wait_count >= wait_count_max) {
249                         break;
250                 }
251         }
252 
253         if (e->word[0] == '\0')
254         {
255                 fprintf(stderr,
256                         "ERROR: Nothing for consumer after waiting %d times!\n",
257                         wait_count_max);
258                 retval = -1;
259                 goto done;
260         }
261         else
262         {
263                 strncpy(word, e->word, WORDSIZE);
264                 e->word[0] = '\0';
265                 s->last_consumed = current;
266                 s->con_count++;
267                 retval = 0;
268                 goto done;
269         }
270 
271  done:
272         sem_post(&e->lock);
273         /* Notify that queue is nonfull */
274         pthread_mutex_lock(&s->nonfull_mutex);
275         pthread_cond_signal(&s->queue_nonfull);
276         pthread_mutex_unlock(&s->nonfull_mutex);
277         return retval;
278 }
279 
280 void producer(shared *s, int event_count, int prod_interval)
281 {
282         char word[WORDSIZE];
283         int i;
284 
285         for (i=0; i < event_count; i++)
286         {
287                 pick_word(word);
288                 queue_word(word, s);
289 
290                 /* Don't sleep if interval <= 0 */
291                 if (prod_interval <= 0)
292                         continue;
293                 /* Sleep if we hit our interval */
294                 if (i % prod_interval == 0)
295                 {
296                         fprintf(stderr,
297                                 "%d: Producer sleeping for 1 second...\n",
298                                 s->prod_count);
299                         sleep(1);
300                 }
301         }
302 
303         fprintf(stderr, "Producer finished.\n");
304         exit(0);
305 }
306 
307 void consumer(shared *s, int event_count, int con_interval)
308 {
309         char word[WORDSIZE];
310         int i;
311 
312         for (i=0; i < event_count; i++)
313         {
314                 get_next_word(word, s);
315                 output_word(s->con_count, word);
316 
317                 /* Don't sleep if interval <= 0 */
318                 if (con_interval <= 0)
319                         continue;
320                 /* Sleep if we hit our interval */
321                 if (i % con_interval == 0)
322                 {
323                         fprintf(stderr,
324                                 "%d: Consumer sleeping for 1 second...\n",
325                                 s->con_count);
326                         sleep(1);
327                 }
328         }
329 
330         fprintf(stderr, "Consumer finished.\n");
331         exit(0);
332 }
333 
334 void init_shared(shared *s)
335 {
336         int i;
337 
338         pthread_mutexattr_t mattr;
339         pthread_condattr_t cattr;
340         
341         /* We need to explicitly mark the mutex as shared or 
342            risk undefined behavior */
343         pthread_mutexattr_init(&mattr);
344         pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
345         pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
346         pthread_mutex_init(&s->nonempty_mutex, &mattr);
347         pthread_mutex_init(&s->nonfull_mutex, &mattr);
348 
349         /* We need to explicitly mark the conditions as shared or 
350            risk undefined behavior */
351 
352         pthread_condattr_init(&cattr);
353         pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
354         pthread_cond_init(&s->queue_nonempty, &cattr);
355         pthread_cond_init(&s->queue_nonfull, &cattr);
356 
357         s->last_consumed = -1;
358         s->last_produced = -1;
359 
360         s->prod_pid = -1;
361         s->con_pid  = -1;
362 
363         s->prod_count = 0;
364         s->con_count  = 0;
365 
366         for (i=0; i<QUEUESIZE; i++)
367         {
368                 s->queue[i].word[0] = '\0';
369                 /* semaphore is shared between processes,
370                    and initial value is 1 (unlocked) */
371                 sem_init(&s->queue[i].lock, 1, 1);
372         }
373 }
374 
375 int main(int argc, char *argv[])
376 {
377         int pid, count, prod_interval, con_interval;
378 
379         shared *s;
380 
381         if (argc < 4)
382         {
383                 if (argc < 1)
384                 {
385                         report_error("no command line");
386                         usage_exit(argv[0]);
387                 }
388                 else
389                 {
390                         report_error("Not enough arguments");
391                         usage_exit(argv[0]);
392                 }
393         }
394 
395         count = atoi(argv[1]);
396         prod_interval = atoi(argv[2]);
397         con_interval = atoi(argv[3]);
398 
399         s = (shared *) mmap(NULL, sizeof(shared),
400                              PROT_READ|PROT_WRITE,
401                              MAP_SHARED|MAP_ANONYMOUS, -1, 0);
402 
403         if (s == MAP_FAILED)
404         {
405                 fprintf(stderr, "Error: Unable to mmap: %s\n", strerror(errno));
406                 exit(-1);
407         }
408 
409         init_shared(s);
410 
411         pid = fork();
412 
413         if (pid == 0)
414         {
415                 /* Producer */
416                 s->prod_pid = getpid();
417                 producer(s, count, prod_interval);
418         } else
419         {
420                 /* Consumer */
421                 s->con_pid = getpid();
422                 consumer(s, count, con_interval);
423         }
424 
425         /* This line should never be reached */
426         return -1;
427 }
428 </source>
429 
430 ===Makefile===
431 
432 <source lang="makefile" line>
433 CC       = gcc
434 
435 CFLAGS   = -O2 -Wall -fno-inline
436 LDFLAGS   = -pthread
437 
438 SRC      = $(wildcard *.c)
439 EXEC     = $(SRC:.c=)
440 
441 all: $(EXEC)
442 
443 $(EXEC): $(SRC)
444 	$(CC) -o $@ $@.c $(CFLAGS) $(LDFLAGS)
445 
446 .PHONY: clean mrproper
447 
448 clean:
449 	@rm -rf *.o
450 	@rm -rf $(EXEC)