Threaded Design Question

Discussion in 'Python' started by half.italian@gmail.com, Aug 9, 2007.

  1. Guest

    Hi all! I'm implementing one of my first multithreaded apps, and have
    gotten to a point where I think I'm going off track from a standard
    idiom. Wondering if anyone can point me in the right direction.

    The script will run as a daemon and watch a given directory for new
    files. Once it determines that a file has finished moving into the
    watch folder, it will kick off a process on one of the files. Several
    of these could be running at any given time up to a max number of
    threads.

    Here's how I have it designed so far. The main thread starts a
    Watch(threading.Thread) class that loops and searches a directory for
    files. It has been passed a Queue.Queue() object (watch_queue), and
    as it finds new files in the watch folder, it adds the file name to
    the queue.

    The main thread then grabs an item off the watch_queue, and kicks off
    processing on that file using another class Worker(threading.thread).

    My problem is with communicating between the threads as to which files
    are currently processing, or are already present in the watch_queue so
    that the Watch thread does not continuously add unneeded files to the
    watch_queue to be processed. For example...Watch() finds a file to be
    processed and adds it to the queue. The main thread sees the file on
    the queue and pops it off and begins processing. Now the file has
    been removed from the watch_queue, and Watch() thread has no way of
    knowing that the other Worker() thread is processing it, and shouldn't
    pick it up again. So it will see the file as new and add it to the
    queue again. PS.. The file is deleted from the watch folder after it
    has finished processing, so that's how i'll know which files to
    process in the long term.

    I made definite progress by creating two queues...watch_queue and
    processing_queue, and then used lists within the classes to store the
    state of which files are processing/watched.

    I think I could pull it off, but it has got very confusing quickly,
    trying to keep each thread's list and the queue always in sync with
    one another. The easiset solution I can see is if my threads could
    read an item from the queue without removing it from the queue and
    only remove it when I tell it to. Then the Watch() thread could then
    just follow what items are on the watch_queue to know what files to
    add, and then the Worker() thread could intentionally remove the item
    from the watch_queue once it has finished processing it.

    Now that I'm writing this out, I see a solution by over-riding or
    wrapping Queue.Queue().get() to give me the behavior I mention above.

    I've noticed .join() and .task_done(), but I'm not sure of how to use
    them properly. Any suggestions would be greatly appreciated.

    ~Sean
     
    , Aug 9, 2007
    #1
    1. Advertising

  2. Justin T. Guest

    On Aug 9, 11:25 am, wrote:
    >
    > Here's how I have it designed so far. The main thread starts a
    > Watch(threading.Thread) class that loops and searches a directory for
    > files. It has been passed a Queue.Queue() object (watch_queue), and
    > as it finds new files in the watch folder, it adds the file name to
    > the queue.
    >
    > The main thread then grabs an item off the watch_queue, and kicks off
    > processing on that file using another class Worker(threading.thread).
    >

    Sounds good.

    >
    > I made definite progress by creating two queues...watch_queue and
    > processing_queue, and then used lists within the classes to store the
    > state of which files are processing/watched.
    >

    This sounds ugly, synchronization is one of those evils of
    multithreaded programming that should be avoided if possible. I see a
    couple of dirt simple solutions:

    1. Have the watch thread move the file into a "Processing" folder that
    it doesn't scan
    2. Have the watch thread copy the file into a python tempfile object
    and push that onto the queue, then delete the real file. This can be
    done efficiently (well, more efficiently than new.write(old.read())
    with shutil.copyfileobj(old, new)

    Both those take very few lines of code, don't require synchronization,
    and don't require extending standard classes.
     
    Justin T., Aug 9, 2007
    #2
    1. Advertising

  3. wrote:
    > Hi all! I'm implementing one of my first multithreaded apps, and have
    > gotten to a point where I think I'm going off track from a standard
    > idiom. Wondering if anyone can point me in the right direction.
    >
    > The script will run as a daemon and watch a given directory for new
    > files. Once it determines that a file has finished moving into the
    > watch folder, it will kick off a process on one of the files. Several
    > of these could be running at any given time up to a max number of
    > threads.
    >
    > Here's how I have it designed so far. The main thread starts a
    > Watch(threading.Thread) class that loops and searches a directory for
    > files. It has been passed a Queue.Queue() object (watch_queue), and
    > as it finds new files in the watch folder, it adds the file name to
    > the queue.
    >
    > The main thread then grabs an item off the watch_queue, and kicks off
    > processing on that file using another class Worker(threading.thread).
    >
    > My problem is with communicating between the threads as to which files
    > are currently processing, or are already present in the watch_queue so
    > that the Watch thread does not continuously add unneeded files to the
    > watch_queue to be processed. For example...Watch() finds a file to be
    > processed and adds it to the queue. The main thread sees the file on
    > the queue and pops it off and begins processing. Now the file has
    > been removed from the watch_queue, and Watch() thread has no way of
    > knowing that the other Worker() thread is processing it, and shouldn't
    > pick it up again. So it will see the file as new and add it to the
    > queue again. PS.. The file is deleted from the watch folder after it
    > has finished processing, so that's how i'll know which files to
    > process in the long term.
    >
    > I made definite progress by creating two queues...watch_queue and
    > processing_queue, and then used lists within the classes to store the
    > state of which files are processing/watched.
    >
    > I think I could pull it off, but it has got very confusing quickly,
    > trying to keep each thread's list and the queue always in sync with
    > one another. The easiset solution I can see is if my threads could
    > read an item from the queue without removing it from the queue and
    > only remove it when I tell it to. Then the Watch() thread could then
    > just follow what items are on the watch_queue to know what files to
    > add, and then the Worker() thread could intentionally remove the item
    > from the watch_queue once it has finished processing it.
    >
    > Now that I'm writing this out, I see a solution by over-riding or
    > wrapping Queue.Queue().get() to give me the behavior I mention above.
    >
    > I've noticed .join() and .task_done(), but I'm not sure of how to use
    > them properly. Any suggestions would be greatly appreciated.
    >
    > ~Sean
    >


    1) Use a (global) hash table and a mutex on it. Before a worker thread
    starts processing a file, have the worker acquire the mutex, add the
    filename into a global dictionary as a key, and release the mutex. Then,
    when the watcher thread attempts to read a file, it only has to check
    whether the filename is on the table.

    If different files may have a same name, you can also use size or some
    other signatures. Also, I think basic operations on primitive types on
    Python are atomic so you don't actually need mutex, but in threaded
    programs, it's always a good habit to use mutexes explicitly.

    2) If the watcher thread doesn't spend much time in "recognizing" files
    to process,
    simply make the watcher thread reads many(or all) files in the directory,
    put them all in the queue, and wait until all items in the queue are
    processed
    using .join(). Workers can indicate that the processing of the last
    dequeued item
    is done by calling .task_done() (As soon as all items are flaged
    "task_done()", join() will
    unblock.) After processing of files on the queue are all done,
    watcher can move or remove processed files and read the directory again.
    Of course,
    you need to keep track of "recognized" files of that turn.
     
    Jun-geun Park, Aug 10, 2007
    #3
  4. MRAB Guest

    On Aug 9, 7:25 pm, wrote:
    > Hi all! I'm implementing one of my first multithreaded apps, and have
    > gotten to a point where I think I'm going off track from a standard
    > idiom. Wondering if anyone can point me in the right direction.
    >
    > The script will run as a daemon and watch a given directory for new
    > files. Once it determines that a file has finished moving into the
    > watch folder, it will kick off a process on one of the files. Several
    > of these could be running at any given time up to a max number of
    > threads.
    >
    > Here's how I have it designed so far. The main thread starts a
    > Watch(threading.Thread) class that loops and searches a directory for
    > files. It has been passed a Queue.Queue() object (watch_queue), and
    > as it finds new files in the watch folder, it adds the file name to
    > the queue.
    >
    > The main thread then grabs an item off the watch_queue, and kicks off
    > processing on that file using another class Worker(threading.thread).
    >
    > My problem is with communicating between the threads as to which files
    > are currently processing, or are already present in the watch_queue so
    > that the Watch thread does not continuously add unneeded files to the
    > watch_queue to be processed. For example...Watch() finds a file to be
    > processed and adds it to the queue. The main thread sees the file on
    > the queue and pops it off and begins processing. Now the file has
    > been removed from the watch_queue, and Watch() thread has no way of
    > knowing that the other Worker() thread is processing it, and shouldn't
    > pick it up again. So it will see the file as new and add it to the
    > queue again. PS.. The file is deleted from the watch folder after it
    > has finished processing, so that's how i'll know which files to
    > process in the long term.
    >

    I would suggest something like the following in the watch thread:

    seen_files = {}

    while True:
    # look for new files
    for name in os.listdir(folder):
    if name not in seen_files:
    process_queue.add(name)
    seen_files[name] = True

    # forget any missing files and mark the others as not seen, ready for
    next time
    seen_files = dict((name, False) for name, seen in seen_files.items()
    if seen)

    time.sleep(1)
     
    MRAB, Aug 10, 2007
    #4
  5. Guest

    On Aug 9, 12:09 pm, "Justin T." <> wrote:
    > On Aug 9, 11:25 am, wrote:
    >
    > > Here's how I have it designed so far. The main thread starts a
    > > Watch(threading.Thread) class that loops and searches a directory for
    > > files. It has been passed a Queue.Queue() object (watch_queue), and
    > > as it finds new files in the watch folder, it adds the file name to
    > > the queue.

    >
    > > The main thread then grabs an item off the watch_queue, and kicks off
    > > processing on that file using another class Worker(threading.thread).

    >
    > Sounds good.
    >
    >
    >
    > > I made definite progress by creating two queues...watch_queue and
    > > processing_queue, and then used lists within the classes to store the
    > > state of which files are processing/watched.

    >
    > This sounds ugly, synchronization is one of those evils of
    > multithreaded programming that should be avoided if possible. I see a
    > couple of dirt simple solutions:
    >
    > 1. Have the watch thread move the file into a "Processing" folder that
    > it doesn't scan
    > 2. Have the watch thread copy the file into a python tempfile object
    > and push that onto the queue, then delete the real file. This can be
    > done efficiently (well, more efficiently than new.write(old.read())
    > with shutil.copyfileobj(old, new)
    >
    > Both those take very few lines of code, don't require synchronization,
    > and don't require extending standard classes.


    Thanks foor the ideas Justin.

    I started subclassing/extending the Queue.Queue object with two
    additional classes. One to return the first item in the list without
    removing it from the queue, and one to return all items of the list
    without removing them. I think this will take me to where I want to
    go. If it doesn't work, I might just use your processing folder
    approach. That sounds the easiest, although I'm still interested in
    any idioms or other proven approaches for this sort of thing.

    ~Sean
     
    , Aug 10, 2007
    #5
  6. Justin T. Guest


    > approach. That sounds the easiest, although I'm still interested in
    > any idioms or other proven approaches for this sort of thing.
    >
    > ~Sean


    Idioms certainly have their place, but in the end you want clear,
    correct code. In the case of multi-threaded programming,
    synchronization adds complexity, both in code and concepts, so
    figuring out a clean design that uses message passing tends to be
    clearer and more robust. Most idioms are just a pattern to which
    somebody found a simple, robust solution, so if you try to think of a
    simple, robust solution, you're probably doing it right. Especially in
    trivial cases like the one above.

    Justin
     
    Justin T., Aug 10, 2007
    #6
  7. Justin T. Guest

    On Aug 9, 5:39 pm, MRAB <> wrote:
    > On Aug 9, 7:25 pm, wrote:
    >
    > > Hi all! I'm implementing one of my first multithreaded apps, and have
    > > gotten to a point where I think I'm going off track from a standard
    > > idiom. Wondering if anyone can point me in the right direction.

    >
    > > The script will run as a daemon and watch a given directory for new
    > > files. Once it determines that a file has finished moving into the
    > > watch folder, it will kick off a process on one of the files. Several
    > > of these could be running at any given time up to a max number of
    > > threads.

    >
    > > Here's how I have it designed so far. The main thread starts a
    > > Watch(threading.Thread) class that loops and searches a directory for
    > > files. It has been passed a Queue.Queue() object (watch_queue), and
    > > as it finds new files in the watch folder, it adds the file name to
    > > the queue.

    >
    > > The main thread then grabs an item off the watch_queue, and kicks off
    > > processing on that file using another class Worker(threading.thread).

    >
    > > My problem is with communicating between the threads as to which files
    > > are currently processing, or are already present in the watch_queue so
    > > that the Watch thread does not continuously add unneeded files to the
    > > watch_queue to be processed. For example...Watch() finds a file to be
    > > processed and adds it to the queue. The main thread sees the file on
    > > the queue and pops it off and begins processing. Now the file has
    > > been removed from the watch_queue, and Watch() thread has no way of
    > > knowing that the other Worker() thread is processing it, and shouldn't
    > > pick it up again. So it will see the file as new and add it to the
    > > queue again. PS.. The file is deleted from the watch folder after it
    > > has finished processing, so that's how i'll know which files to
    > > process in the long term.

    >
    > I would suggest something like the following in the watch thread:
    >
    > seen_files = {}
    >
    > while True:
    > # look for new files
    > for name in os.listdir(folder):
    > if name not in seen_files:
    > process_queue.add(name)
    > seen_files[name] = True
    >
    > # forget any missing files and mark the others as not seen, ready for
    > next time
    > seen_files = dict((name, False) for name, seen in seen_files.items()
    > if seen)
    >
    > time.sleep(1)


    Hmm, this wouldn't work. It's not thread safe and the last line before
    you sleep doesn't make any sense.
     
    Justin T., Aug 10, 2007
    #7
  8. Mark T Guest

    <> wrote in message
    news:...
    > Hi all! I'm implementing one of my first multithreaded apps, and have
    > gotten to a point where I think I'm going off track from a standard
    > idiom. Wondering if anyone can point me in the right direction.
    >
    > The script will run as a daemon and watch a given directory for new
    > files. Once it determines that a file has finished moving into the
    > watch folder, it will kick off a process on one of the files. Several
    > of these could be running at any given time up to a max number of
    > threads.
    >
    > Here's how I have it designed so far. The main thread starts a
    > Watch(threading.Thread) class that loops and searches a directory for
    > files. It has been passed a Queue.Queue() object (watch_queue), and
    > as it finds new files in the watch folder, it adds the file name to
    > the queue.
    >
    > The main thread then grabs an item off the watch_queue, and kicks off
    > processing on that file using another class Worker(threading.thread).
    >
    > My problem is with communicating between the threads as to which files
    > are currently processing, or are already present in the watch_queue so
    > that the Watch thread does not continuously add unneeded files to the
    > watch_queue to be processed. For example...Watch() finds a file to be
    > processed and adds it to the queue. The main thread sees the file on
    > the queue and pops it off and begins processing. Now the file has
    > been removed from the watch_queue, and Watch() thread has no way of
    > knowing that the other Worker() thread is processing it, and shouldn't
    > pick it up again. So it will see the file as new and add it to the
    > queue again. PS.. The file is deleted from the watch folder after it
    > has finished processing, so that's how i'll know which files to
    > process in the long term.
    >
    > I made definite progress by creating two queues...watch_queue and
    > processing_queue, and then used lists within the classes to store the
    > state of which files are processing/watched.
    >
    > I think I could pull it off, but it has got very confusing quickly,
    > trying to keep each thread's list and the queue always in sync with
    > one another. The easiset solution I can see is if my threads could
    > read an item from the queue without removing it from the queue and
    > only remove it when I tell it to. Then the Watch() thread could then
    > just follow what items are on the watch_queue to know what files to
    > add, and then the Worker() thread could intentionally remove the item
    > from the watch_queue once it has finished processing it.
    >
    > Now that I'm writing this out, I see a solution by over-riding or
    > wrapping Queue.Queue().get() to give me the behavior I mention above.
    >
    > I've noticed .join() and .task_done(), but I'm not sure of how to use
    > them properly. Any suggestions would be greatly appreciated.
    >
    > ~Sean
    >


    Just rename the file. We've used that technique in a similar application at
    my work for years where a service looks for files of a particular extension
    to appear in a directory. When the service sees a file, in renames it to a
    different extension and spins off a thread to process the contents.

    -Mark T.
     
    Mark T, Aug 10, 2007
    #8
  9. Graeme Glass Guest

    Using IPC is just adding needles complexity to your program. Instead
    of constantly scanning the directory for files and then adding them to
    a Queue, and then having to worry if that specific file may have
    already been popped off the queue and is currently running by one of
    the workers, just poll the directory for a specific event, ala
    CREATE.

    If you are using a *nix based system, you can use, pyinotify (http://
    pyinotify.sourceforge.net/) using EventsCodes.IN_CREATE event.
    Else if you using a windows system, take a look at:
    http://tgolden.sc.sabren.com/python/win32_how_do_i/watch_directory_for_changes.html,
    Tim has a couple of different ideas, sure one of them will be what you
    need.

    Then all you have to do is added newly created files to the queue,
    fork the off to workers and the workers can delete them when they
    done. No need to worry if someone is working with it already, as it
    will only be added once to the queue, when it is created. HTH.

    Regards,
    Graeme
     
    Graeme Glass, Aug 10, 2007
    #9
  10. Guest

    On Aug 9, 9:45 pm, "Mark T" <> wrote:
    > <> wrote in message
    >
    > news:...
    >
    >
    >
    > > Hi all! I'm implementing one of my first multithreaded apps, and have
    > > gotten to a point where I think I'm going off track from a standard
    > > idiom. Wondering if anyone can point me in the right direction.

    >
    > > The script will run as a daemon and watch a given directory for new
    > > files. Once it determines that a file has finished moving into the
    > > watch folder, it will kick off a process on one of the files. Several
    > > of these could be running at any given time up to a max number of
    > > threads.

    >
    > > Here's how I have it designed so far. The main thread starts a
    > > Watch(threading.Thread) class that loops and searches a directory for
    > > files. It has been passed a Queue.Queue() object (watch_queue), and
    > > as it finds new files in the watch folder, it adds the file name to
    > > the queue.

    >
    > > The main thread then grabs an item off the watch_queue, and kicks off
    > > processing on that file using another class Worker(threading.thread).

    >
    > > My problem is with communicating between the threads as to which files
    > > are currently processing, or are already present in the watch_queue so
    > > that the Watch thread does not continuously add unneeded files to the
    > > watch_queue to be processed. For example...Watch() finds a file to be
    > > processed and adds it to the queue. The main thread sees the file on
    > > the queue and pops it off and begins processing. Now the file has
    > > been removed from the watch_queue, and Watch() thread has no way of
    > > knowing that the other Worker() thread is processing it, and shouldn't
    > > pick it up again. So it will see the file as new and add it to the
    > > queue again. PS.. The file is deleted from the watch folder after it
    > > has finished processing, so that's how i'll know which files to
    > > process in the long term.

    >
    > > I made definite progress by creating two queues...watch_queue and
    > > processing_queue, and then used lists within the classes to store the
    > > state of which files are processing/watched.

    >
    > > I think I could pull it off, but it has got very confusing quickly,
    > > trying to keep each thread's list and the queue always in sync with
    > > one another. The easiset solution I can see is if my threads could
    > > read an item from the queue without removing it from the queue and
    > > only remove it when I tell it to. Then the Watch() thread could then
    > > just follow what items are on the watch_queue to know what files to
    > > add, and then the Worker() thread could intentionally remove the item
    > > from the watch_queue once it has finished processing it.

    >
    > > Now that I'm writing this out, I see a solution by over-riding or
    > > wrapping Queue.Queue().get() to give me the behavior I mention above.

    >
    > > I've noticed .join() and .task_done(), but I'm not sure of how to use
    > > them properly. Any suggestions would be greatly appreciated.

    >
    > > ~Sean

    >
    > Just rename the file. We've used that technique in a similar application at
    > my work for years where a service looks for files of a particular extension
    > to appear in a directory. When the service sees a file, in renames it to a
    > different extension and spins off a thread to process the contents.
    >
    > -Mark T.


    I ended up taking this route for the most part. The worker thread
    first moves the file to be processed into a temp directory, and the
    watch thread never knows about it again. I still had to implement my
    StateQueue(Queue.Queue) so I could implement a function to return all
    the items on the queue without popping them off.

    Thanks all for your great ideas. My current response to multi-
    threading... PITA!

    ~Sean
     
    , Aug 11, 2007
    #10
  11. greg Guest

    wrote:
    > I ended up taking this route for the most part. The worker thread
    > first moves the file to be processed into a temp directory,


    No, the watcher thread should do this itself *before*
    putting it into the work queue. Then there's no chance
    of it picking up the same file twice before a worker
    has gotten around to working on it, and there is no
    need for a custom queue class.

    --
    Greg
     
    greg, Aug 11, 2007
    #11
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Replies:
    6
    Views:
    521
  2. Paul Miller

    threaded system design advice?

    Paul Miller, Mar 3, 2006, in forum: C++
    Replies:
    3
    Views:
    352
    Paul Miller
    Mar 4, 2006
  3. bullockbefriending bard
    Replies:
    15
    Views:
    629
    Bjoern Schliessmann
    Apr 27, 2008
  4. Tom Williams
    Replies:
    0
    Views:
    260
    Tom Williams
    Jul 22, 2004
  5. Tom Williams
    Replies:
    0
    Views:
    178
    Tom Williams
    Feb 6, 2005
Loading...

Share This Page