dog.py, the Opposite of the Usual Linux cat Command

"""
The reverse of the usual 'cat' command in Linux.
 
Written by Grant Jenks
http://www.grantjenks.com/
 
DISCLAIMER
THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE
LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR
OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND,
EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU.
SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY
SERVICING, REPAIR OR CORRECTION.
 
Copyright
This work is licensed under the
Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.
To view a copy of this license, visit
http://creativecommons.org/licenses/by-nc-nd/3.0/
or send a letter to Creative Commons, 171 Second Street, Suite 300,
San Francisco, California, 94105, US.A
 
The idea came from here: http://1.61803398874.com/canine/
 
I decided to play with this after seeing the presentation on Python coroutines
here: http://www.dabeaz.com/coroutines/
"""
 
def coroutine(func):
    """
    Decorator exempts us from calling .next()
    Reference: http://www.dabeaz.com/coroutines/coroutine.py
    """
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start
 
from threading import Thread
from Queue import Queue
 
@coroutine
def threaded(target):
    """
    Decorator that manages multi-threading using a queue.
    Reference: http://www.dabeaz.com/coroutines/cothread.py
    Modified to communicate queue length for backoff. This is achieved by
    trying to send None (ignored) to the threaded coroutine.
    """
    messages = Queue()
    def run_target():
        while True:
            item = messages.get()
            if item is GeneratorExit:
                target.close()
                return
            else:
                target.send(item)
    Thread(target=run_target).start()
    try:
        while True:
            item = (yield messages.qsize())
            if item is None:
                pass
            else:
                messages.put(item)
    except GeneratorExit:
        messages.put(GeneratorExit)
 
@coroutine
def write_file(name):
    """
    Coroutine for writing text to a file.
    """
    with open(name, 'w') as file:
        while True:
            text = (yield)
            file.write(text)
 
import sys
from time import sleep
 
def dog(file_names, max_queue_len = 2 ** 4, buffer_len = 2 ** 22):
    """
    Read from standard input and write to multiple files.
 
    Read/write loop pseudocode:
        Fill text from stdin
        Make sure all threads are ready to queue
            If not, exponential backoff
        Queue text on all threads
    """
 
    if len(file_names) == 0:
        raise ValueError("no file names given")
 
    threads = [threaded(write_file(name)) for name in file_names]
 
    try:
        while True:
            text = sys.stdin.read(buffer_len)
 
            backoff = 1
 
            while True:
                max_cnt = max(thread.send(None) for thread in threads)
 
                if max_cnt < max_queue_len:
                    break
                else:
                    sleep(backoff)
                    backoff *= 2
 
            for thread in threads:
                thread.send(text)
 
            if len(text) != buffer_len:
                break
 
    finally:
        for thread in threads:
            thread.send(GeneratorExit)
 
if __name__ == '__main__':
    """
    Usage:
    cat somefile | python dog.py file1 file2...
    """
    file_names = sys.argv[1:]
    dog(file_names)
random/dog.py_the_opposite_of_the_usual_linux_cat_command.txt · Last modified: 2011/12/26 17:12 by grant