Python: nonblocking read from stdout of threaded subprocess

Posted by sberry2A on Stack Overflow See other posts from Stack Overflow or by sberry2A
Published on 2010-03-18T14:54:37Z Indexed on 2010/03/18 20:01 UTC
Read the original article Hit count: 1007

Filed under:
|
|
|

I have a script (worker.py) that prints unbuffered output in the form...

1
2
3
.
.
.
n

where n is some constant number of iterations a loop in this script will make. In another script (service_controller.py) I start a number of threads, each of which starts a subprocess using subprocess.Popen(stdout=subprocess.PIPE, ...); Now, in my main thread (service_controller.py) I want to read the output of each thread's worker.py subprocess and use it to calculate an estimate for the time remaining till completion.

I have all of the logic working that reads the stdout from worker.py and determines the last printed number. The problem is that I can not figure out how to do this in a non-blocking way. If I read a constant bufsize then each read will end up waiting for the same data from each of the workers. I have tried numerous ways including using fcntl, select + os.read, etc. What is my best option here? I can post my source if needed, but I figured the explanation describes the problem well enough.

Thanks for any help here.

EDIT
Adding sample code

I have a worker that starts a subprocess.

class WorkerThread(threading.Thread):
    def __init__(self):
        self.completed = 0
        self.process = None
        self.lock = threading.RLock()
        threading.Thread.__init__(self)

    def run(self):
        cmd = ["/path/to/script", "arg1", "arg2"]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
        #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def get_completed(self):
        self.lock.acquire();
        fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
        if fd:
            self.data += os.read(fd, 1)
            try:
                self.completed = int(self.data.split("\n")[-2])
            except IndexError:
                pass
        self.lock.release()
        return self.completed

I then have a ThreadManager.

class ThreadManager():
    def __init__(self):
        self.pool = []
        self.running = []
        self.lock = threading.Lock()

    def clean_pool(self, pool):
        for worker in [x for x in pool is not x.isAlive()]:
            worker.join()
            pool.remove(worker)
            del worker
        return pool

    def run(self, concurrent=5):
        while len(self.running) + len(self.pool) > 0:
            self.clean_pool(self.running)
            n = min(max(concurrent - len(self.running), 0), len(self.pool))
            if n > 0:
                for worker in self.pool[0:n]:
                    worker.start()
                self.running.extend(self.pool[0:n])
                del self.pool[0:n]
            time.sleep(.01)
         for worker in self.running + self.pool:
             worker.join()

and some code to run it.

threadManager = ThreadManager()
for i in xrange(0, 5):
    threadManager.pool.append(WorkerThread())
threadManager.run()

I have stripped out a log of the other code in hopes to try to pinpoint the issue.

© Stack Overflow or respective owner

Related posts about python

Related posts about nonblocking