Friday, September 27, 2013

top n integers from a stream of integers

Keep a min-heap of length n (for array-backed min-heap, root is minimum and index 0 is root). If the next integer from stream is greater than root, swap it with the root, and min-siftdown.

For positive integers only, initialize the heap with -1. For positive and negative integers, initialize the heap with null and treat null as lesser than any non-null (extra logic for nulls in blue).
int?[] findTopNIntegersFromStream(n, stream):
    heap = int?[n]
    heap[0..n] = null -1
    foreach x in stream:
        if x > heap[0] || heap[0] == null: 
            heap[0] = x // delete root in O(1) time
            min-siftdown(heap, 0) // siftdown in O(logn) time
    return heap
    
void min-siftdown(int?[] heap, i): 
    left = 2*i + 1
    right = 2*i + 2
    min = i
    if left < heap.length && 
        (heap[left] < heap[min] || heap[left] == null):
        min = left
    if right < heap.length && 
        (heap[right] < heap[min] || heap[right] == null):
        min = right
    if min != i:
        swap(heap, i, min)
        min-siftdown(heap, min)

void print(int?[] heap):
    for(i = 0 to < n): 
        if heap[i] != null -1:
            print(heap[i])

Another way is to min-siftdown only when there are more than n integers and min-heapify the heap once full.
x[] findTopNIntegersFromStream(n, stream):
    heap = new [n]
    i = 0
    foreach x in stream:
        if i >= heap.length:
            if x > heap[0]:
                heap[0] = x
                min-siftdown(heap, 0)
        else
            heap[i] = x
            i++
            if i == heap.length:
                min-heapify(heap)
    if i < heap.length:
        heapnew = new [i]
        copy(heap, heapnew, 0, i)
        heap = heapnew
        //min-heapify(heap) // min-heapify if needed
    return heap

min-siftdown(heap, i):
    left = 2*i +1
    right = 2*i +2
    min = i
    if left < heap.length && heap[left] < heap[min]:
        min = left
    if right < heap.length && heap[right] < heap[min]:
        min = right
    if min != i:
        swap(heap, i, min)
        min-siftdown(heap, min)

min-heapify(heap):
    end = heap.length/2 -1 // last parent
    while end >= 0:
        min-siftdown(heap, end)
        end--

The code is easy to understand when the heap functions are pulled into its own class (min-heap implementation). The heap class handles when it's not full. Both, siftup() and siftdown() are used.
x[] top(stream, n):
    if stream == null || n < 0:
        throw
    if n == 0:
        return []
    heap = new min-heap(capacity: n)
    for x in stream:
        if !heap.isfull(): // heap.count() < n
            heap.insert(x)
            continue
        if x > heap.peek():
            heap.displace(x)
    for x in heap.foreach():
        yield return x

// min-heap interface
min-heap:
    insert(x):
    // capture top to return, replace top with x, siftdown
    x displace(x):
    x peek():
    x[] foreach():
    bool isfull():
With delayed heapify:
// for delayed heapify,
    append(x):
        // ...
    //heapify using siftdown() O(n) 
    heapify():
    isFull():

// used as,
    for x in items:
        heap.append(x)
    heap.heapify()

[] top(stream, n):
    if stream == null || n < 0:
        throw
    if n == 0:
        return []
    heap = new min-heap(capacity: n)
    for x in stream:
        if !heap.isFull():
            heap.append(x)
            if heap.isFull():
                heap.heapify()
            continue
        if x > heap.peek():
            heap.displace(x)
    if !heap.isFull():
        heap.heapify()
    for x in heap.foreach():
        yield x

Time: O(mlogn) for stream size m, and n top integers to find.

[Hat tip to M]

No comments:

Post a Comment