draining pipes simultaneously

Discussion in 'Python' started by Dmitry Teslenko, Mar 5, 2008.

  1. Hello!
    Here's my implementation of a function that executes some command and
    drains stdout/stderr invoking other functions for every line of
    command output:

    def __execute2_drain_pipe(queue, pipe):
    for line in pipe:
    queue.put(line)
    return

    def execute2(command, out_filter = None, err_filter = None):
    p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
    stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
    env = os.environ)

    qo = Queue.Queue()
    qe = Queue.Queue()

    to = threading.Thread(target = __execute2_drain_pipe, \
    args = (qo, p.stdout))
    to.start()
    time.sleep(0)
    te = threading.Thread(target = __execute2_drain_pipe, \
    args = (qe, p.stderr))
    te.start()

    while to.isAlive() or te.isAlive():
    try:
    line = qo.get()
    if out_filter:
    out_filter(line)
    qo.task_done()
    except Queue.Empty:
    pass

    try:
    line = qe.get()
    if err_filter:
    err_filter(line)
    qe.task_done()
    except Queue.Empty:
    pass

    to.join()
    te.join()
    return p.wait()

    Problem is my implementation is buggy and function hungs when there's
    empty stdout/stderr. Can I have your feedback?
    Dmitry Teslenko, Mar 5, 2008
    #1
    1. Advertising

  2. Dmitry Teslenko

    Guest

    On 5 Mar, 10:33, "Dmitry Teslenko" <> wrote:
    > Hello!
    > Here's my implementation of a function that executes some command and
    > drains stdout/stderr invoking other functions for every line of
    > command output:
    >
    > def __execute2_drain_pipe(queue, pipe):
    >         for line in pipe:
    >                 queue.put(line)
    >         return
    >
    > def execute2(command, out_filter = None, err_filter = None):
    >         p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
    >                 stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
    >                 env = os.environ)
    >
    >         qo = Queue.Queue()
    >         qe = Queue.Queue()
    >
    >         to = threading.Thread(target = __execute2_drain_pipe, \
    >                 args = (qo, p.stdout))
    >         to.start()
    >         time.sleep(0)
    >         te = threading.Thread(target = __execute2_drain_pipe, \
    >                 args = (qe, p.stderr))
    >         te.start()
    >
    >         while to.isAlive() or te.isAlive():
    >                 try:
    >                         line = qo.get()
    >                         if out_filter:
    >                                 out_filter(line)
    >                         qo.task_done()
    >                 except Queue.Empty:
    >                         pass
    >
    >                 try:
    >                         line = qe.get()
    >                         if err_filter:
    >                                 err_filter(line)
    >                         qe.task_done()
    >                 except Queue.Empty:
    >                         pass
    >
    >         to.join()
    >         te.join()
    >         return p.wait()
    >
    > Problem is my implementation is buggy and function hungs when there's
    > empty stdout/stderr. Can I have your feedback?


    The Queue.get method by default is blocking. The documentation is not
    100% clear about that (maybe it should report
    the full python definition of the function parameters, which makes
    self-evident the default value) but if you do
    help(Queue.Queue) in a python shell you will see it.

    Hence, try using a timeout or a non-blocking get (but in case of a non
    blocking get you should add a delay in the
    loop, or you will poll the queues at naximum speed and maybe prevent
    the other threads from accessing them).

    Ciao
    -----
    FB
    , Mar 5, 2008
    #2
    1. Advertising

  3. On Wed, Mar 5, 2008 at 1:34 PM, <> wrote:
    > The Queue.get method by default is blocking. The documentation is not
    > 100% clear about that (maybe it should report
    > the full python definition of the function parameters, which makes
    > self-evident the default value) but if you do
    > help(Queue.Queue) in a python shell you will see it.


    > Hence, try using a timeout or a non-blocking get (but in case of a non
    > blocking get you should add a delay in the
    > loop, or you will poll the queues at naximum speed and maybe prevent
    > the other threads from accessing them).


    Thanks for advice! Finally I came up to following loop:

    while to.isAlive() or te.isAlive():
    try:
    while True:
    line = qo.get(False)
    if out_filter:
    out_filter(line)
    except Queue.Empty:
    pass

    try:
    while True:
    line = qe.get(False)
    if err_filter:
    err_filter(line)
    except Queue.Empty:
    pass

    Inserting delay in the beginning of the loop causes feeling of command
    taking long to start and delay at the end of the loop may cause of
    data loss when both thread became inactive during delay.
    Dmitry Teslenko, Mar 5, 2008
    #3
  4. Dmitry Teslenko

    Guest


    >
    > Inserting delay in the beginning of the loop causes feeling of command
    > taking long to start and delay at the end of the loop may cause of
    > data loss when both thread became inactive during delay.


    time.sleep() pauses ony the thread that executes it, not the
    others. And queue objects can hold large amount of data (if you have
    the RAM),
    so unless your subprocess is outputting data very fast, you should not
    have data loss.
    Anyway, if it works for you ... :)

    Ciao
    -----
    FB
    , Mar 5, 2008
    #4
  5. On Wed, Mar 5, 2008 at 3:39 PM, <> wrote:
    > time.sleep() pauses ony the thread that executes it, not the
    > others. And queue objects can hold large amount of data (if you have
    > the RAM),
    > so unless your subprocess is outputting data very fast, you should not
    > have data loss.
    > Anyway, if it works for you ... :)


    After some testing I'll agree :) Without time.sleep() in main thread
    python eats up all aviable processor time
    Dmitry Teslenko, Mar 5, 2008
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Jimmy
    Replies:
    1
    Views:
    461
    bruce barker
    Jul 8, 2003
  2. Chad McCune
    Replies:
    2
    Views:
    624
    Chad McCune
    Jan 20, 2004
  3. Farooq Khan
    Replies:
    2
    Views:
    408
    Joseph E Shook [MVP - ADSI]
    Oct 1, 2004
  4. =?Utf-8?B?QWw=?=

    Download > 1 files simultaneously from 1 IP

    =?Utf-8?B?QWw=?=, Oct 6, 2004, in forum: ASP .Net
    Replies:
    1
    Views:
    359
    Bruno Piovan
    Oct 6, 2004
  5. Andreas Zita

    1.x and 2.0 simultaneously?

    Andreas Zita, Jan 3, 2006, in forum: ASP .Net
    Replies:
    1
    Views:
    363
    Thom Little
    Jan 3, 2006
Loading...

Share This Page