Search code examples
javamultithreadingconcurrencyconcurrent-programming

How to solve the producer-consumer using semaphores?


I need to code a problem similar to producer-consumer, that must use semaphores. I tried a couple of solutions and none of them worked. First I tried a solution on Wikipedia and it didn't worked. My current code is something like that:

Method run of the consumer:

    public void run() {
    int i=0;
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    String s = new String();
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        try {
            Thread.sleep(1000);///10000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.encheBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i] == null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            QuantidadeBuffer.quantidade--;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            int identificador;
            identificador=buffer[i].getIdentificador()[0];
            s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
            //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            buffer[i]= null;
        }
        // RC
        this.mutex.up();
        //this.esvaziaBuffer.up();
        System.out.println(s);
  //            lock.up();
    }
}

Method run of the producer:

    public void run() {
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    int i=0;
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        // Produz Item
        try {
            Thread.sleep(500);//50000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.esvaziaBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i]!=null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            int identificador[]=new int[Pedido.getTamanho_identificador()];
            identificador[0]=i;
            buffer[i]=new Pedido();
            Produtor.buffer[i].setIdentificador(identificador);
            Produtor.buffer[i].setTexto("pacote de dados");
            QuantidadeBuffer.quantidade++;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            i++;
        }
        // RC
        this.mutex.up();
        //this.encheBuffer.up();
    }
    //this.encheBuffer.up();
}

In the above code it happened of a consumer thread to read a position and then, another thread read the same position without a producer fill that position, something like this:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1

Solution

  • It seems that you are using a mutex not a semaphore?

    In using a mutex you have only binary synchronisation - locking and unlocking one resource. Sempahores have a value that you can signal or acquire.

    You are trying to lock/unlock the entire buffer but that is the wrong way to go because, as you are seeing, either the producer or consumer locks, and when the reader has locked it the producer can't fill the buffer (because it has to lock first).

    You should instead create a Sempahore, then when the producer writes one packet or block of data it can signal the semaphore. The consumers can then be trying to acquire the semaphore so they will be waiting until the producer has signalled a packet has been written. Upon signalling a written packet, one of the consumers will be woken and it will know it can read one packet. It can read a packet, then go back to trying to acquire on the semaphore. If in that time the producer has written another packet it has signalled again and either of the consumers will then go on to read another packet. Etc...

    For example:

    (Producer) - Write one packet - Semaphore.release(1)

    (Consumer xN) - Semaphore.acquire(1) - Read one packet

    If you have multiple consumers then the consumers (not the producer) should lock the buffer when reading the packet (but not when acquiring the semaphore) to prevent race conditions. In the example below the producer also locks the list since everything is on the same JVM.

    import java.util.LinkedList;
    import java.util.concurrent.Semaphore;
    
    public class Semaphores {
    
        static Object LOCK = new Object();
    
        static LinkedList list = new LinkedList();
        static Semaphore sem = new Semaphore(0);
        static Semaphore mutex = new Semaphore(1);
    
        static class Consumer extends Thread {
            String name;
            public Consumer(String name) {
                this.name = name;
            }
            public void run() {
                try {
    
                    while (true) {
                        sem.acquire(1);
                        mutex.acquire();
                        System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                        mutex.release();
                    }
                } catch (Exception x) {
                    x.printStackTrace();
                }
            }
        }
    
        static class Producer extends Thread {
            public void run() {
                try {
    
                    int N = 0;
    
                    while (true) {
                        mutex.acquire();
                        list.add(new Integer(N++));
                        mutex.release();
                        sem.release(1);
                        Thread.sleep(500);
                    }
                } catch (Exception x) {
                    x.printStackTrace();
                }
            }
        }
    
        public static void main(String [] args) {
            new Producer().start();
            new Consumer("Alice").start();
            new Consumer("Bob").start();
        }
    }