draining pipes simultaneously

  • Thread starter Dmitry Teslenko
  • Start date
D

Dmitry Teslenko

Hello!
Here's my implementation of a function that executes some command and
drains stdout/stderr invoking other functions for every line of
command output:

def __execute2_drain_pipe(queue, pipe):
for line in pipe:
queue.put(line)
return

def execute2(command, out_filter = None, err_filter = None):
p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
env = os.environ)

qo = Queue.Queue()
qe = Queue.Queue()

to = threading.Thread(target = __execute2_drain_pipe, \
args = (qo, p.stdout))
to.start()
time.sleep(0)
te = threading.Thread(target = __execute2_drain_pipe, \
args = (qe, p.stderr))
te.start()

while to.isAlive() or te.isAlive():
try:
line = qo.get()
if out_filter:
out_filter(line)
qo.task_done()
except Queue.Empty:
pass

try:
line = qe.get()
if err_filter:
err_filter(line)
qe.task_done()
except Queue.Empty:
pass

to.join()
te.join()
return p.wait()

Problem is my implementation is buggy and function hungs when there's
empty stdout/stderr. Can I have your feedback?
 
B

bockman

Hello!
Here's my implementation of a function that executes some command and
drains stdout/stderr invoking other functions for every line of
command output:

def __execute2_drain_pipe(queue, pipe):
        for line in pipe:
                queue.put(line)
        return

def execute2(command, out_filter = None, err_filter = None):
        p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
                stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
                env = os.environ)

        qo = Queue.Queue()
        qe = Queue.Queue()

        to = threading.Thread(target = __execute2_drain_pipe, \
                args = (qo, p.stdout))
        to.start()
        time.sleep(0)
        te = threading.Thread(target = __execute2_drain_pipe, \
                args = (qe, p.stderr))
        te.start()

        while to.isAlive() or te.isAlive():
                try:
                        line = qo.get()
                        if out_filter:
                                out_filter(line)
                        qo.task_done()
                except Queue.Empty:
                        pass

                try:
                        line = qe.get()
                        if err_filter:
                                err_filter(line)
                        qe.task_done()
                except Queue.Empty:
                        pass

        to.join()
        te.join()
        return p.wait()

Problem is my implementation is buggy and function hungs when there's
empty stdout/stderr. Can I have your feedback?

The Queue.get method by default is blocking. The documentation is not
100% clear about that (maybe it should report
the full python definition of the function parameters, which makes
self-evident the default value) but if you do
help(Queue.Queue) in a python shell you will see it.

Hence, try using a timeout or a non-blocking get (but in case of a non
blocking get you should add a delay in the
loop, or you will poll the queues at naximum speed and maybe prevent
the other threads from accessing them).

Ciao
 
D

Dmitry Teslenko

The Queue.get method by default is blocking. The documentation is not
100% clear about that (maybe it should report
the full python definition of the function parameters, which makes
self-evident the default value) but if you do
help(Queue.Queue) in a python shell you will see it.
Hence, try using a timeout or a non-blocking get (but in case of a non
blocking get you should add a delay in the
loop, or you will poll the queues at naximum speed and maybe prevent
the other threads from accessing them).

Thanks for advice! Finally I came up to following loop:

while to.isAlive() or te.isAlive():
try:
while True:
line = qo.get(False)
if out_filter:
out_filter(line)
except Queue.Empty:
pass

try:
while True:
line = qe.get(False)
if err_filter:
err_filter(line)
except Queue.Empty:
pass

Inserting delay in the beginning of the loop causes feeling of command
taking long to start and delay at the end of the loop may cause of
data loss when both thread became inactive during delay.
 
B

bockman

Inserting delay in the beginning of the loop causes feeling of command
taking long to start and delay at the end of the loop may cause of
data loss when both thread became inactive during delay.

time.sleep() pauses ony the thread that executes it, not the
others. And queue objects can hold large amount of data (if you have
the RAM),
so unless your subprocess is outputting data very fast, you should not
have data loss.
Anyway, if it works for you ... :)

Ciao
 
D

Dmitry Teslenko

time.sleep() pauses ony the thread that executes it, not the
others. And queue objects can hold large amount of data (if you have
the RAM),
so unless your subprocess is outputting data very fast, you should not
have data loss.
Anyway, if it works for you ... :)

After some testing I'll agree :) Without time.sleep() in main thread
python eats up all aviable processor time
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,020
Latest member
GenesisGai

Latest Threads

Top