i need use concurrent priority queue, , considering adapting simplepriorityqueue<tpriority, tvalue>
sample given in how to: add bounding , blocking functionality collection tutorial on msdn. however, surprised @ severity of bugs said sample seems have. verify whether these issues present?
1) race hazard exists between tryadd
, toarray
, can cause argumentexception
thrown latter. tryadd
method first adds item internal queue, increments m_count
counter. on other hand, toarray
first initializes new array of size m_count
, copies internal queues onto array. if tryadd
called whilst toarray
being executed, toarray
might end attempting copy more items had allocated space in array, causing copyto
call throw argumentexception
.
private concurrentqueue<keyvaluepair<int, tvalue>>[] _queues; private int m_count; // ... // iproducerconsumercollection members public bool tryadd(keyvaluepair<int, tvalue> item) { _queues[item.key].enqueue(item); interlocked.increment(ref m_count); return true; } public int count { { return m_count; } } public keyvaluepair<int, tvalue>[] toarray() { keyvaluepair<int, tvalue>[] result; lock (_queues) { result = new keyvaluepair<int, tvalue>[this.count]; // *** context switch here; new item gets added *** int index = 0; foreach (var q in _queues) { if (q.count > 0) { q.copyto(result, index); // *** argumentexception *** index += q.count; } } return result; } }
2) race hazard exists in getenumerator
method: there no synchronization between updates internal queues.
public ienumerator<keyvaluepair<int, tvalue>> getenumerator() { (int = 0; < prioritycount; i++) { foreach (var item in _queues[i]) yield return item; } }
consider following code snippet, takes item queue , re-adds incremented priority:
if (queue.trytake(out item) && item.key < maxpriority - 1) queue.tryadd(new keyvaluepair<int, string>(item.key + 1, item.value))
if above snippet run concurrently enumeration, 1 expect item appear @ once, either original or incremented priority – or, possibly, not appear @ all. 1 not expect item appear twice, @ both priorities. however, since getenumerator
iterates on internal queues sequentially, not protect against such ordering inconsistencies across queues.
3) public count
property can return stale values, since reads shared m_count
field without memory fences. if consumer accesses property in loop doesn't generate memory fences of own, such below, stuck in infinite loop, despite items being added queue other threads.
while (queue.count == 0) { }
the need memory fences when reading shared variables has been discussed in several other posts:
- how correctly read interlocked.increment'ed int field?
- reading int that's updated interlocked on other threads
- do concurrent interlocked , reads require memory barrier or locking?
4) there no memory barrier between initialization of _queues
array , completion of simplepriorityqueue
constructor. race hazards arise when external consumers on thread call tryadd
, access _queues
before initialization has completed (or appears completed on memory cache). discussed further in other question on constructors , memory barriers.
5) trytake
, toarray
protected through use of lock
keyword. apart being inadequate (due bugs discussed above), defeats entire purpose of designing concurrent collection. given shortcomings, think best approach downgrade internal concurrentqueue
structures plain queue
, add locks everywhere, , start treating non-concurrent thread-safe structure.
i think depends on how going use class. if limit tryadd
, trytake
(which 2 main things blockingcollection<t>
relies on) shouldn't have issues , have fast lock minimal priority queue.
if start using count
, copyto
, or of other methods potentially going run in issues pointed out.
Comments
Post a Comment