- Uses a blocking queue as backing store.
- Cleanly exit the blocked producer or consumer threads when
stop()
is called. Calling stop()
is needed if not wrapped in using
as that stops the blocking queue through queue.dispose()
call. There is no need for finalize()
(destructor) as there is no unmanaged resource to clean. It is disposable to avoid forgetting calling stop()
.- Uses the stopping/stopped pattern to stop (see below).
main(): using pc = new producer-consumer(threads: 1000, maxsize: 10): pc.go() sleep(10s) pc.stop() // not required // creates threads for consume/produce tasks producer-consumer: idisposable private readonly q // used for lock nthreads // ctor producer-consumer(threads, maxsize): nthreads = threads q = new blocking-queue(maxsize) // work go(): range(0, nthreads) .select(x => new thread(produce)) .foreach(t => t.start()) range(0, nthreads) .select(x => new thread(consume)) .foreach(t => t.start()) // consume/produce consume(): while !q.isstopping(): //i = q.dequeue() //print(i) // OR i if q.trydequeue(out i): print(i) sleep(random(500)) produce(): i = // randomly generated while !q.isstopping(): //q.enqueue(i) //print(i) // OR if q.tryenqueue(i): print(i) sleep(random(500)) // stop stop(): q.dispose() // dispose - simply stop, no need for finalize/destructor dispose(): stop() // queue blocks when size is maxsize blocking-queue: idisposable private readonly q // used for lock maxsize // ctor blocking-queue(maxsize): maxsize = maxsize q = new queue() // en/de/tryde/tryen-queue enqueue(item): lock(q): while q.count == maxsize: monitor.wait(q) q.enqueue(item) if q.count == 1: monitor.pulseall(q) dequeue(): lock(q): while q.count == 0: monitor.wait(q) item = q.dequeue() if q.count == maxsize-1: monitor.pulseall(q) return item // for graceful exit of blocked consumers trydequeue(out item): lock(q): while q.count == 0: // on stop, unblock blocked consumers if stopping: // ok to read stopping directly due to lock(q) item = null return false monitor.wait(q) item = q.dequeue() if q.count == maxsize-1: monitor.pulseall(q) return true // for graceful exit of blocked producers tryenqueue(item): lock(q): while q.count == maxsize: // on stop, unblock blocked producers if stopping: // ok to read stopping directly due to lock(q) return false monitor.wait(q) q.enqueue(item) if q.count == 1: monitor.pulseall(q) return true // stop // stopping is true when stop() is called // stopped is true when stop() finishes stopping = false isstopping(): lock(q): return stopping stop(): lock(q): if !stopping: stopping = true monitor.pulseall(q) stopped = true stopped = false stopped(): lock(q): return stopped // dispose - simply stop, no need for finalize/destructor dispose(): stop()
There is a delay between when stop() is called and till it finishes. The blocked threads will exit during this delay. stopping is set to true when stop() is called, stopped is set when stop() finishes. Internally, stopping is used to start shutdown. stopped is just for calling code for debugging to indicate that stop has finished.
// stopping/stopped pattern stopping = false stop(): lock(lock): if !stopping: // cannot use stopped to avoid double clean up stopping = true // clean up here stopped = true stopped = false stopped(): lock(lock): return stopped
[Hat tip to MG, JS]
No comments:
Post a Comment