Python: nonblocking read from stdout of threaded subprocess
- by sberry2A
I have a script (worker.py) that prints unbuffered output in the form...
1
2
3
.
.
.
n
where n is some constant number of iterations a loop in this script will make. In another script (service_controller.py) I start a number of threads, each of which starts a subprocess using subprocess.Popen(stdout=subprocess.PIPE, ...); Now, in my main thread (service_controller.py) I want to read the output of each thread's worker.py subprocess and use it to calculate an estimate for the time remaining till completion.
I have all of the logic working that reads the stdout from worker.py and determines the last printed number. The problem is that I can not figure out how to do this in a non-blocking way. If I read a constant bufsize then each read will end up waiting for the same data from each of the workers. I have tried numerous ways including using fcntl, select + os.read, etc. What is my best option here? I can post my source if needed, but I figured the explanation describes the problem well enough.
Thanks for any help here.
EDIT
Adding sample code
I have a worker that starts a subprocess.
class WorkerThread(threading.Thread):
def __init__(self):
self.completed = 0
self.process = None
self.lock = threading.RLock()
threading.Thread.__init__(self)
def run(self):
cmd = ["/path/to/script", "arg1", "arg2"]
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
#flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
#fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
def get_completed(self):
self.lock.acquire();
fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
if fd:
self.data += os.read(fd, 1)
try:
self.completed = int(self.data.split("\n")[-2])
except IndexError:
pass
self.lock.release()
return self.completed
I then have a ThreadManager.
class ThreadManager():
def __init__(self):
self.pool = []
self.running = []
self.lock = threading.Lock()
def clean_pool(self, pool):
for worker in [x for x in pool is not x.isAlive()]:
worker.join()
pool.remove(worker)
del worker
return pool
def run(self, concurrent=5):
while len(self.running) + len(self.pool) > 0:
self.clean_pool(self.running)
n = min(max(concurrent - len(self.running), 0), len(self.pool))
if n > 0:
for worker in self.pool[0:n]:
worker.start()
self.running.extend(self.pool[0:n])
del self.pool[0:n]
time.sleep(.01)
for worker in self.running + self.pool:
worker.join()
and some code to run it.
threadManager = ThreadManager()
for i in xrange(0, 5):
threadManager.pool.append(WorkerThread())
threadManager.run()
I have stripped out a log of the other code in hopes to try to pinpoint the issue.