Python Multiprocessing Lazy Iterating Map

Python multiprocessing function that processes work in a pool of worker functions. The advantage of this code is that it doesn't consume the entire iterable unlike the standard library. The iterable is consumed on-demand as workers are ready to process items.

from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty
 
def worker(recvq, sendq):
    for func, args in iter(recvq.get, None):
        result = func(*args)
        sendq.put(result)
 
def pool_imap_unordered(function, iterable, procs=cpu_count()):
    # Create queues for sending/receiving items from iterable.
 
    sendq = Queue(procs)
    recvq = Queue()
 
    # Start worker processes.
 
    for rpt in xrange(procs):
        Process(target=worker, args=(sendq, recvq)).start()
 
    # Iterate iterable and communicate with worker processes.
 
    send_len = 0
    recv_len = 0
    itr = iter(iterable)
 
    try:
        value = itr.next()
        while True:
            try:
                sendq.put((function, value), True, 0.1)
                send_len += 1
                value = itr.next()
            except QueueFull:
                while True:
                    try:
                        result = recvq.get(False)
                        recv_len += 1
                        yield result
                    except QueueEmpty:
                        break
    except StopIteration:
        pass
 
    # Collect all remaining results.
 
    while recv_len < send_len:
        result = recvq.get()
        recv_len += 1
        yield result
 
    # Terminate worker processes.
 
    for rpt in xrange(procs):
        sendq.put(None)
random/python_multiprocessing_lazy_iterating_map.txt · Last modified: 2013/12/03 19:14 by grant