Saturday, May 09, 2015

producer consumer

Note:
- Uses a blocking queue as backing store.
- Cleanly exit the blocked producer or consumer threads when stop() is called. Calling stop() is needed if not wrapped in using as that stops the blocking queue through queue.dispose() call. There is no need for finalize() (destructor) as there is no unmanaged resource to clean. It is disposable to avoid forgetting calling stop().
- Uses the stopping/stopped pattern to stop (see below).
main():
    using pc = new producer-consumer(threads: 1000, maxsize: 10):
        pc.go()
        sleep(10s)
        pc.stop() // not required

// creates threads for consume/produce tasks
producer-consumer: idisposable
    private readonly q // used for lock
    nthreads
    
    // ctor
    producer-consumer(threads, maxsize):
        nthreads = threads
        q = new blocking-queue(maxsize)
    
    // work
    go():
        range(0, nthreads)
            .select(x => new thread(produce))
            .foreach(t => t.start())
        range(0, nthreads)
            .select(x => new thread(consume))
            .foreach(t => t.start())
    
    // consume/produce
    consume():
        while !q.isstopping():
            //i = q.dequeue()
            //print(i)
            // OR 
            i
            if q.trydequeue(out i):
                print(i)
                sleep(random(500))
    produce():
        i = // randomly generated
        while !q.isstopping():
            //q.enqueue(i)
            //print(i)
            // OR
            if q.tryenqueue(i):
                print(i)
                sleep(random(500))
    
    // stop
    stop():
        q.dispose()
    // dispose - simply stop, no need for finalize/destructor
    dispose():
        stop()

// queue blocks when size is maxsize
blocking-queue: idisposable
    private readonly q // used for lock
    maxsize
    
    // ctor
    blocking-queue(maxsize):
        maxsize = maxsize
        q = new queue()
    
    // en/de/tryde/tryen-queue
    enqueue(item):
        lock(q):
            while q.count == maxsize:
                monitor.wait(q)
            q.enqueue(item)
            if q.count == 1:
                monitor.pulseall(q)
    dequeue():
        lock(q):
            while q.count == 0:
                monitor.wait(q)
            item = q.dequeue()
            if q.count == maxsize-1:
                monitor.pulseall(q)
            return item
    // for graceful exit of blocked consumers
    trydequeue(out item):
        lock(q):
            while q.count == 0:
                // on stop, unblock blocked consumers
                if stopping: // ok to read stopping directly due to lock(q)
                    item = null
                    return false
                monitor.wait(q)
            item = q.dequeue()
            if q.count == maxsize-1:
                monitor.pulseall(q)
            return true
    // for graceful exit of blocked producers
    tryenqueue(item):
        lock(q):
            while q.count == maxsize:
                // on stop, unblock blocked producers
                if stopping: // ok to read stopping directly due to lock(q)
                    return false
                monitor.wait(q)
            q.enqueue(item)
            if q.count == 1:
                monitor.pulseall(q)
            return true
    
    // stop
    // stopping is true when stop() is called
    // stopped is true when stop() finishes
    stopping = false
    isstopping():
        lock(q):
            return stopping
    stop():
        lock(q):
            if !stopping:
                stopping = true
                monitor.pulseall(q)
                stopped = true
    stopped = false
    stopped():
        lock(q):
            return stopped
    
    // dispose - simply stop, no need for finalize/destructor
    dispose():
        stop()

There is a delay between when stop() is called and till it finishes. The blocked threads will exit during this delay. stopping is set to true when stop() is called, stopped is set when stop() finishes. Internally, stopping is used to start shutdown. stopped is just for calling code for debugging to indicate that stop has finished.
// stopping/stopped pattern
stopping = false
stop():
    lock(lock):
        if !stopping: // cannot use stopped to avoid double clean up
            stopping = true
            // clean up here
            stopped = true
stopped = false
stopped():
    lock(lock):
        return stopped

[Hat tip to MG, JS]

No comments:

Post a Comment