- Uses a blocking queue as backing store.
- Cleanly exit the blocked producer or consumer threads when
stop() is called. Calling stop() is needed if not wrapped in using as that stops the blocking queue through queue.dispose() call. There is no need for finalize() (destructor) as there is no unmanaged resource to clean. It is disposable to avoid forgetting calling stop().- Uses the stopping/stopped pattern to stop (see below).
main():
using pc = new producer-consumer(threads: 1000, maxsize: 10):
pc.go()
sleep(10s)
pc.stop() // not required
// creates threads for consume/produce tasks
producer-consumer: idisposable
private readonly q // used for lock
nthreads
// ctor
producer-consumer(threads, maxsize):
nthreads = threads
q = new blocking-queue(maxsize)
// work
go():
range(0, nthreads)
.select(x => new thread(produce))
.foreach(t => t.start())
range(0, nthreads)
.select(x => new thread(consume))
.foreach(t => t.start())
// consume/produce
consume():
while !q.isstopping():
//i = q.dequeue()
//print(i)
// OR
i
if q.trydequeue(out i):
print(i)
sleep(random(500))
produce():
i = // randomly generated
while !q.isstopping():
//q.enqueue(i)
//print(i)
// OR
if q.tryenqueue(i):
print(i)
sleep(random(500))
// stop
stop():
q.dispose()
// dispose - simply stop, no need for finalize/destructor
dispose():
stop()
// queue blocks when size is maxsize
blocking-queue: idisposable
private readonly q // used for lock
maxsize
// ctor
blocking-queue(maxsize):
maxsize = maxsize
q = new queue()
// en/de/tryde/tryen-queue
enqueue(item):
lock(q):
while q.count == maxsize:
monitor.wait(q)
q.enqueue(item)
if q.count == 1:
monitor.pulseall(q)
dequeue():
lock(q):
while q.count == 0:
monitor.wait(q)
item = q.dequeue()
if q.count == maxsize-1:
monitor.pulseall(q)
return item
// for graceful exit of blocked consumers
trydequeue(out item):
lock(q):
while q.count == 0:
// on stop, unblock blocked consumers
if stopping: // ok to read stopping directly due to lock(q)
item = null
return false
monitor.wait(q)
item = q.dequeue()
if q.count == maxsize-1:
monitor.pulseall(q)
return true
// for graceful exit of blocked producers
tryenqueue(item):
lock(q):
while q.count == maxsize:
// on stop, unblock blocked producers
if stopping: // ok to read stopping directly due to lock(q)
return false
monitor.wait(q)
q.enqueue(item)
if q.count == 1:
monitor.pulseall(q)
return true
// stop
// stopping is true when stop() is called
// stopped is true when stop() finishes
stopping = false
isstopping():
lock(q):
return stopping
stop():
lock(q):
if !stopping:
stopping = true
monitor.pulseall(q)
stopped = true
stopped = false
stopped():
lock(q):
return stopped
// dispose - simply stop, no need for finalize/destructor
dispose():
stop()
There is a delay between when stop() is called and till it finishes. The blocked threads will exit during this delay. stopping is set to true when stop() is called, stopped is set when stop() finishes. Internally, stopping is used to start shutdown. stopped is just for calling code for debugging to indicate that stop has finished.
// stopping/stopped pattern
stopping = false
stop():
lock(lock):
if !stopping: // cannot use stopped to avoid double clean up
stopping = true
// clean up here
stopped = true
stopped = false
stopped():
lock(lock):
return stopped
[Hat tip to MG, JS]
No comments:
Post a Comment