Operating Systems 2017F: Tutorial 4
This tutorial is not yet finalized.
In this tutorial you will be experimenting with and extending 3000pc.c and 3000random.c (both listed below).
Tasks/Questions
- Compile and run 3000pc.c. Note that you'll need to add a "-pthread" option to your compile command. First run it with a -1 for both delay intervals, then try other values. When does the producer signal the consumer? When does the consumer signal the producer?
- How can you output the random words to a file so that you only see the signal messages on the console?
- Modify the producer so that it never unlocks each entry (but does lock it). What happens?
- Modify the consumer so that it never unlocks each entry (but does lock it). What happens?
- What happens if you remove all entry locking/unlocking? How does the behavior change?
- Remove the SIGUSR1 signal handler from the producer (so it uses the C library default signal handler). How does the program behave?
- Remove the SIGUSR1 signal handler from the consumer (so it uses the C library default signal handler). How does the program behave?
- Change the producer and consumer so they only sleep for 1000 nanoseconds (1 millisecond) rather than 1 second during each delay interval. How does the behavior of the program change?
- Compile and run 3000random.c. Note that it takes two arguments. Where does this program get its random numbers from?
- Change 3000random.c to use /dev/random rather than /dev/urandom. How does its performance change, especially when generating a large number of random numbers?
- Change 3000pc.c to use random numbers from /dev/urandom rather than using the standard library function random(). How does this change affect the relative speed of the producer and consumer processes?
- (Bonus) Modify 3000pc.c so it takes in an additional command line argument that is the filename of a word list.  This file should have one word per line.  The program should then use this read-in word list rather than the hard coded word list.  Pay attention to:
- where you read the file (in the producer, consumer, or before the fork)
- where and how you store the words in memory (the word list should be arbitrarily long, but the maximum length of a word can be fixed)
 
- (Bonus) Modify 3000pc.c so that it has multiple producers and consumers running concurrently, with the number of each specified on the command line.
Code
3000random.c
/* 3000pc.c */
/* producer-consumer example using signals, processes and mmap */
/* v1 Oct. 15, 2017 */
/* Licenced under the GPLv3, copyright Anil Somayaji */
/* You really shouldn't be incorporating parts of this in any other code,
   it is meant for teaching, not production */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <string.h>
#include <time.h>
#define QUEUESIZE 32
#define WORDSIZE 16
const int wordlist_size = 27;
const char *wordlist[] = {
        "Alpha",
        "Bravo",
        "Charlie",
        "Delta",
        "Echo",
        "Foxtrot",
        "Golf",
        "Hotel",
        "India",
        "Juliet",
        "Kilo",
        "Lima",
        "Mike",
        "November",
        "Oscar",
        "Papa",
        "Quebec",
        "Romeo",
        "Sierra",
        "Tango",
        "Uniform",
        "Victor",
        "Whiskey",
        "X-ray",
        "Yankee",
        "Zulu",
        "Dash"
};
typedef struct entry {
        char word[WORDSIZE];
        sem_t lock;
} entry;
typedef struct shared {
        int prod_waiting;
        int con_waiting;
        entry queue[QUEUESIZE];
        int last_produced;
        int last_consumed;
        pid_t prod_pid;
        pid_t con_pid;
        int prod_count;
        int con_count;
} shared;
void report_error(char *error)
{
        fprintf(stderr, "Error: %s\n", error);
}
void usage_exit(char *progname)
{
        fprintf(stderr,
                "Usage: %s <event count> <prod delay int> <con delay int>\n",
                progname);
        exit(-1);
}
void producer_handler(int the_signal)
{
        if (the_signal == SIGUSR1) {
                fprintf(stderr, "Producer received SIGUSR1.\n");
                return;
        } else {
                fprintf(stderr, "Producer: No handler for for signal %d?!\n",
                        the_signal);
                return;
        }
}
void consumer_handler(int the_signal)
{
        if (the_signal == SIGUSR1) {
                fprintf(stderr, "Consumer received SIGUSR1.\n");
                return;
        } else {
                fprintf(stderr, "Consumer: No handler for for signal %d?!\n",
                        the_signal);
                return;
        }
}
void pick_word(char *word)
{
        int pick;
        pick = random() % wordlist_size;
        strcpy(word, wordlist[pick]);
}
void wait_for_producer(shared *s)
{
        struct timespec delay;
        
        delay.tv_sec = 100;
        delay.tv_nsec = 0;
        s->con_waiting = 1;
        
        while (s->con_waiting) {
                nanosleep(&delay, NULL);
        }
}
void wait_for_consumer(shared *s)
{
        struct timespec delay;
        
        delay.tv_sec = 100;
        delay.tv_nsec = 0;
        s->prod_waiting = 1;
        
        while (s->prod_waiting) {
                nanosleep(&delay, NULL);
        }
}
void wakeup_consumer(shared *s)
{
        if (s->con_waiting) {
                s->con_waiting = 0;
                kill(s->con_pid, SIGUSR1);
        }
}
void wakeup_producer(shared *s)
{
        if (s->prod_waiting) {
                s->prod_waiting = 0;
                kill(s->prod_pid, SIGUSR1);
        }
}
void output_word(int c, char *w)
{
        printf("Word %d: %s\n", c, w);
}
int queue_word(char *word, shared *s)
{
        entry *e;
        int current, retval;
        
        current = (s->last_produced + 1) % QUEUESIZE;
        e = &s->queue[current];
        sem_wait(&e->lock);
        if (e->word[0] != '\0') {
                /* consumer hasn't consumed this entry yet */
                sem_post(&e->lock);
                wait_for_consumer(s);
                sem_wait(&e->lock);
        }
        if (e->word[0] != '\0') {
                fprintf(stderr, "ERROR: No room for producer after waiting!\n");
                retval = -1;
                goto done;
        } else {
                strncpy(e->word, word, WORDSIZE);
                s->last_produced = current;
                s->prod_count++;
                wakeup_consumer(s);
                retval = 0;
                goto done;
        }
 done:
        sem_post(&e->lock);
        return retval;
}
int get_next_word(char *word, shared *s)
{
        entry *e;
        int current, retval;
        current = (s->last_consumed + 1) % QUEUESIZE;
        e = &s->queue[current];
        
        sem_wait(&e->lock);
        if (e->word[0] == '\0') {
                /* producer hasn't filled in this entry yet */
                sem_post(&e->lock);
                wait_for_producer(s);
                sem_wait(&e->lock);
        }
        if (e->word[0] == '\0') {
                fprintf(stderr, "ERROR: Nothing for consumer after waiting!\n");
                retval = -1;
                goto done;
        } else {
                strncpy(word, e->word, WORDSIZE);
                e->word[0] = '\0';
                s->last_consumed = current;
                s->con_count++;
                wakeup_producer(s);
                retval = 0;
                goto done;
        }
        
 done:
        sem_post(&e->lock);
        return retval;
}
void producer(shared *s, int event_count, int producer_delay_interval)
{
        char word[WORDSIZE];
        int i;
        struct sigaction signal_handler_struct;
 
        memset (&signal_handler_struct, 0, sizeof(signal_handler_struct));
        signal_handler_struct.sa_handler = producer_handler;
        if (sigaction(SIGUSR1, &signal_handler_struct, NULL)) {
            fprintf(stderr, "Producer couldn't register SIGUSR1 handler.\n");
        }
        
        for (i=0; i < event_count; i++) {        
                pick_word(word);
                queue_word(word, s);
                if (producer_delay_interval > 0) {
                        if (i % producer_delay_interval == 0) {
                                sleep(1);
                        }
                }
        }
        printf("Producer finished.\n");
        exit(0);
}
void consumer(shared *s, int event_count, int consumer_delay_interval)
{
        char word[WORDSIZE];
        int i;
        struct sigaction signal_handler_struct;
 
        memset (&signal_handler_struct, 0, sizeof(signal_handler_struct));
        signal_handler_struct.sa_handler = consumer_handler;
        if (sigaction(SIGUSR1, &signal_handler_struct, NULL)) {
            fprintf(stderr, "Consumer couldn't register SIGUSR1 handler.\n");
        }
        
        for (i=0; i < event_count; i++) {        
                get_next_word(word, s);
                output_word(s->con_count, word);
                if (consumer_delay_interval > 0) {
                        if (i % consumer_delay_interval == 0) {
                                sleep(1);
                        }
                }
        }
        printf("Consumer finished.\n");
        exit(0);
}
void init_shared(shared *s)
{
        int i;
        
        s->con_waiting = 0;
        s->last_consumed = -1;
        s->prod_waiting = 0;
        s->last_produced = -1;
        
        s->prod_pid = -1;
        s->con_pid = -1;
        s->prod_count = 0;
        s->con_count = 0;
                
        for (i=0; i<QUEUESIZE; i++) {
                s->queue[i].word[0] = '\0';
                /* semaphore is shared between processes,
                   and initial value is 1 (unlocked) */
                sem_init(&s->queue[i].lock, 1, 1); 
        }
}
int main(int argc, char *argv[])
{
        int pid, count, prod_interval, con_interval;
        
        shared *s;
        srandom(42);
        
        if (argc < 4) {
                if (argc < 1) {
                        report_error("no command line");
                        usage_exit(argv[0]);
                } else {
                        report_error("Not enough arguments");
                        usage_exit(argv[0]);
                }
        }
        count = atoi(argv[1]);
        prod_interval = atoi(argv[2]);
        con_interval = atoi(argv[3]);
        s = (shared *) mmap(NULL, sizeof(shared),
                             PROT_READ|PROT_WRITE,
                             MAP_SHARED|MAP_ANONYMOUS, -1, 0);
        
        if (s == MAP_FAILED) {
                report_error(strerror(errno));
        }
        
        init_shared(s);
        
        pid = fork();
        if (pid) {
                /* producer */
                s->prod_pid = getpid();
                producer(s, count, prod_interval);
        } else {
                /* consumer */
                s->con_pid = getpid();
                consumer(s, count, con_interval);
        }
        
        /* This line should never be reached */
        return -1;
}
3000random.c
/* 3000random.c */
/* prints out random numbers obtained from system /dev/urandom */
/* (This is much, much better than using rand() or random())! */
/* v1 Oct. 15, 2017 */
/* Licenced under the GPLv3, copyright Anil Somayaji */
/* You really shouldn't be incorporating parts of this in any other code,
   it is meant for teaching, not production */
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#define BUFSIZE 1024
typedef struct rand_state {
        unsigned long buffer[BUFSIZE];
        int current;
        int fd;
} rand_state;
void fill_rand_buffer(rand_state *r)
{
        ssize_t count;
        count = read(r->fd, (void *) &r->buffer,
                     BUFSIZE * sizeof(unsigned long));
        if (count > sizeof(unsigned long)) {
                r->current = count % sizeof(unsigned long);
        } else {
                fprintf(stderr, "Couldn't fill random buffer.\n");
        }
}
void system_rand_init(rand_state *r)
{
        r->fd = open("/dev/urandom", O_RDONLY);
        fill_rand_buffer(r);
}
long system_rand(long max, rand_state *r)
{        
        unsigned long val;
        if (r->current < 0) {
                fill_rand_buffer(r);                
                if (r->current < 0) {
                        /* fill_rand_buffer should have already
                           reported an error */
                        return 0;
                }
        }
        val = r->buffer[r->current];
        r->current--;
        return val % max;
}
int main(int argc, char *argv[])
{
        int count, i;
        long max, x;
        rand_state r;
        if (argc != 3) {
                fprintf(stderr, "Usage: %s <count> <max>\n", argv[0]);
                exit(-1);
        }
        count = atoi(argv[1]);
        max = atol(argv[2]);
        printf("count = %d, max = %ld\n", count, max);
        
        system_rand_init(&r);
        
        for (i = 0; i < count; i++) {
                x = system_rand(max, &r);
                printf("%ld\n", x);
        }
        
        return 0;
}