Threaded Design Question

H

half.italian

Hi all! I'm implementing one of my first multithreaded apps, and have
gotten to a point where I think I'm going off track from a standard
idiom. Wondering if anyone can point me in the right direction.

The script will run as a daemon and watch a given directory for new
files. Once it determines that a file has finished moving into the
watch folder, it will kick off a process on one of the files. Several
of these could be running at any given time up to a max number of
threads.

Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.

The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).

My problem is with communicating between the threads as to which files
are currently processing, or are already present in the watch_queue so
that the Watch thread does not continuously add unneeded files to the
watch_queue to be processed. For example...Watch() finds a file to be
processed and adds it to the queue. The main thread sees the file on
the queue and pops it off and begins processing. Now the file has
been removed from the watch_queue, and Watch() thread has no way of
knowing that the other Worker() thread is processing it, and shouldn't
pick it up again. So it will see the file as new and add it to the
queue again. PS.. The file is deleted from the watch folder after it
has finished processing, so that's how i'll know which files to
process in the long term.

I made definite progress by creating two queues...watch_queue and
processing_queue, and then used lists within the classes to store the
state of which files are processing/watched.

I think I could pull it off, but it has got very confusing quickly,
trying to keep each thread's list and the queue always in sync with
one another. The easiset solution I can see is if my threads could
read an item from the queue without removing it from the queue and
only remove it when I tell it to. Then the Watch() thread could then
just follow what items are on the watch_queue to know what files to
add, and then the Worker() thread could intentionally remove the item
from the watch_queue once it has finished processing it.

Now that I'm writing this out, I see a solution by over-riding or
wrapping Queue.Queue().get() to give me the behavior I mention above.

I've noticed .join() and .task_done(), but I'm not sure of how to use
them properly. Any suggestions would be greatly appreciated.

~Sean
 
J

Justin T.

Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.

The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).
Sounds good.
I made definite progress by creating two queues...watch_queue and
processing_queue, and then used lists within the classes to store the
state of which files are processing/watched.
This sounds ugly, synchronization is one of those evils of
multithreaded programming that should be avoided if possible. I see a
couple of dirt simple solutions:

1. Have the watch thread move the file into a "Processing" folder that
it doesn't scan
2. Have the watch thread copy the file into a python tempfile object
and push that onto the queue, then delete the real file. This can be
done efficiently (well, more efficiently than new.write(old.read())
with shutil.copyfileobj(old, new)

Both those take very few lines of code, don't require synchronization,
and don't require extending standard classes.
 
J

Jun-geun Park

Hi all! I'm implementing one of my first multithreaded apps, and have
gotten to a point where I think I'm going off track from a standard
idiom. Wondering if anyone can point me in the right direction.

The script will run as a daemon and watch a given directory for new
files. Once it determines that a file has finished moving into the
watch folder, it will kick off a process on one of the files. Several
of these could be running at any given time up to a max number of
threads.

Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.

The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).

My problem is with communicating between the threads as to which files
are currently processing, or are already present in the watch_queue so
that the Watch thread does not continuously add unneeded files to the
watch_queue to be processed. For example...Watch() finds a file to be
processed and adds it to the queue. The main thread sees the file on
the queue and pops it off and begins processing. Now the file has
been removed from the watch_queue, and Watch() thread has no way of
knowing that the other Worker() thread is processing it, and shouldn't
pick it up again. So it will see the file as new and add it to the
queue again. PS.. The file is deleted from the watch folder after it
has finished processing, so that's how i'll know which files to
process in the long term.

I made definite progress by creating two queues...watch_queue and
processing_queue, and then used lists within the classes to store the
state of which files are processing/watched.

I think I could pull it off, but it has got very confusing quickly,
trying to keep each thread's list and the queue always in sync with
one another. The easiset solution I can see is if my threads could
read an item from the queue without removing it from the queue and
only remove it when I tell it to. Then the Watch() thread could then
just follow what items are on the watch_queue to know what files to
add, and then the Worker() thread could intentionally remove the item
from the watch_queue once it has finished processing it.

Now that I'm writing this out, I see a solution by over-riding or
wrapping Queue.Queue().get() to give me the behavior I mention above.

I've noticed .join() and .task_done(), but I'm not sure of how to use
them properly. Any suggestions would be greatly appreciated.

~Sean

1) Use a (global) hash table and a mutex on it. Before a worker thread
starts processing a file, have the worker acquire the mutex, add the
filename into a global dictionary as a key, and release the mutex. Then,
when the watcher thread attempts to read a file, it only has to check
whether the filename is on the table.

If different files may have a same name, you can also use size or some
other signatures. Also, I think basic operations on primitive types on
Python are atomic so you don't actually need mutex, but in threaded
programs, it's always a good habit to use mutexes explicitly.

2) If the watcher thread doesn't spend much time in "recognizing" files
to process,
simply make the watcher thread reads many(or all) files in the directory,
put them all in the queue, and wait until all items in the queue are
processed
using .join(). Workers can indicate that the processing of the last
dequeued item
is done by calling .task_done() (As soon as all items are flaged
"task_done()", join() will
unblock.) After processing of files on the queue are all done,
watcher can move or remove processed files and read the directory again.
Of course,
you need to keep track of "recognized" files of that turn.
 
M

MRAB

Hi all! I'm implementing one of my first multithreaded apps, and have
gotten to a point where I think I'm going off track from a standard
idiom. Wondering if anyone can point me in the right direction.

The script will run as a daemon and watch a given directory for new
files. Once it determines that a file has finished moving into the
watch folder, it will kick off a process on one of the files. Several
of these could be running at any given time up to a max number of
threads.

Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.

The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).

My problem is with communicating between the threads as to which files
are currently processing, or are already present in the watch_queue so
that the Watch thread does not continuously add unneeded files to the
watch_queue to be processed. For example...Watch() finds a file to be
processed and adds it to the queue. The main thread sees the file on
the queue and pops it off and begins processing. Now the file has
been removed from the watch_queue, and Watch() thread has no way of
knowing that the other Worker() thread is processing it, and shouldn't
pick it up again. So it will see the file as new and add it to the
queue again. PS.. The file is deleted from the watch folder after it
has finished processing, so that's how i'll know which files to
process in the long term.
I would suggest something like the following in the watch thread:

seen_files = {}

while True:
# look for new files
for name in os.listdir(folder):
if name not in seen_files:
process_queue.add(name)
seen_files[name] = True

# forget any missing files and mark the others as not seen, ready for
next time
seen_files = dict((name, False) for name, seen in seen_files.items()
if seen)

time.sleep(1)
 
H

half.italian

Sounds good.




This sounds ugly, synchronization is one of those evils of
multithreaded programming that should be avoided if possible. I see a
couple of dirt simple solutions:

1. Have the watch thread move the file into a "Processing" folder that
it doesn't scan
2. Have the watch thread copy the file into a python tempfile object
and push that onto the queue, then delete the real file. This can be
done efficiently (well, more efficiently than new.write(old.read())
with shutil.copyfileobj(old, new)

Both those take very few lines of code, don't require synchronization,
and don't require extending standard classes.

Thanks foor the ideas Justin.

I started subclassing/extending the Queue.Queue object with two
additional classes. One to return the first item in the list without
removing it from the queue, and one to return all items of the list
without removing them. I think this will take me to where I want to
go. If it doesn't work, I might just use your processing folder
approach. That sounds the easiest, although I'm still interested in
any idioms or other proven approaches for this sort of thing.

~Sean
 
J

Justin T.

approach. That sounds the easiest, although I'm still interested in
any idioms or other proven approaches for this sort of thing.

~Sean

Idioms certainly have their place, but in the end you want clear,
correct code. In the case of multi-threaded programming,
synchronization adds complexity, both in code and concepts, so
figuring out a clean design that uses message passing tends to be
clearer and more robust. Most idioms are just a pattern to which
somebody found a simple, robust solution, so if you try to think of a
simple, robust solution, you're probably doing it right. Especially in
trivial cases like the one above.

Justin
 
J

Justin T.

Hi all! I'm implementing one of my first multithreaded apps, and have
gotten to a point where I think I'm going off track from a standard
idiom. Wondering if anyone can point me in the right direction.
The script will run as a daemon and watch a given directory for new
files. Once it determines that a file has finished moving into the
watch folder, it will kick off a process on one of the files. Several
of these could be running at any given time up to a max number of
threads.
Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.
The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).
My problem is with communicating between the threads as to which files
are currently processing, or are already present in the watch_queue so
that the Watch thread does not continuously add unneeded files to the
watch_queue to be processed. For example...Watch() finds a file to be
processed and adds it to the queue. The main thread sees the file on
the queue and pops it off and begins processing. Now the file has
been removed from the watch_queue, and Watch() thread has no way of
knowing that the other Worker() thread is processing it, and shouldn't
pick it up again. So it will see the file as new and add it to the
queue again. PS.. The file is deleted from the watch folder after it
has finished processing, so that's how i'll know which files to
process in the long term.

I would suggest something like the following in the watch thread:

seen_files = {}

while True:
# look for new files
for name in os.listdir(folder):
if name not in seen_files:
process_queue.add(name)
seen_files[name] = True

# forget any missing files and mark the others as not seen, ready for
next time
seen_files = dict((name, False) for name, seen in seen_files.items()
if seen)

time.sleep(1)

Hmm, this wouldn't work. It's not thread safe and the last line before
you sleep doesn't make any sense.
 
M

Mark T

Hi all! I'm implementing one of my first multithreaded apps, and have
gotten to a point where I think I'm going off track from a standard
idiom. Wondering if anyone can point me in the right direction.

The script will run as a daemon and watch a given directory for new
files. Once it determines that a file has finished moving into the
watch folder, it will kick off a process on one of the files. Several
of these could be running at any given time up to a max number of
threads.

Here's how I have it designed so far. The main thread starts a
Watch(threading.Thread) class that loops and searches a directory for
files. It has been passed a Queue.Queue() object (watch_queue), and
as it finds new files in the watch folder, it adds the file name to
the queue.

The main thread then grabs an item off the watch_queue, and kicks off
processing on that file using another class Worker(threading.thread).

My problem is with communicating between the threads as to which files
are currently processing, or are already present in the watch_queue so
that the Watch thread does not continuously add unneeded files to the
watch_queue to be processed. For example...Watch() finds a file to be
processed and adds it to the queue. The main thread sees the file on
the queue and pops it off and begins processing. Now the file has
been removed from the watch_queue, and Watch() thread has no way of
knowing that the other Worker() thread is processing it, and shouldn't
pick it up again. So it will see the file as new and add it to the
queue again. PS.. The file is deleted from the watch folder after it
has finished processing, so that's how i'll know which files to
process in the long term.

I made definite progress by creating two queues...watch_queue and
processing_queue, and then used lists within the classes to store the
state of which files are processing/watched.

I think I could pull it off, but it has got very confusing quickly,
trying to keep each thread's list and the queue always in sync with
one another. The easiset solution I can see is if my threads could
read an item from the queue without removing it from the queue and
only remove it when I tell it to. Then the Watch() thread could then
just follow what items are on the watch_queue to know what files to
add, and then the Worker() thread could intentionally remove the item
from the watch_queue once it has finished processing it.

Now that I'm writing this out, I see a solution by over-riding or
wrapping Queue.Queue().get() to give me the behavior I mention above.

I've noticed .join() and .task_done(), but I'm not sure of how to use
them properly. Any suggestions would be greatly appreciated.

~Sean

Just rename the file. We've used that technique in a similar application at
my work for years where a service looks for files of a particular extension
to appear in a directory. When the service sees a file, in renames it to a
different extension and spins off a thread to process the contents.

-Mark T.
 
G

Graeme Glass

Using IPC is just adding needles complexity to your program. Instead
of constantly scanning the directory for files and then adding them to
a Queue, and then having to worry if that specific file may have
already been popped off the queue and is currently running by one of
the workers, just poll the directory for a specific event, ala
CREATE.

If you are using a *nix based system, you can use, pyinotify (http://
pyinotify.sourceforge.net/) using EventsCodes.IN_CREATE event.
Else if you using a windows system, take a look at:
http://tgolden.sc.sabren.com/python/win32_how_do_i/watch_directory_for_changes.html,
Tim has a couple of different ideas, sure one of them will be what you
need.

Then all you have to do is added newly created files to the queue,
fork the off to workers and the workers can delete them when they
done. No need to worry if someone is working with it already, as it
will only be added once to the queue, when it is created. HTH.

Regards,
Graeme
 
H

half.italian

Just rename the file. We've used that technique in a similar application at
my work for years where a service looks for files of a particular extension
to appear in a directory. When the service sees a file, in renames it to a
different extension and spins off a thread to process the contents.

-Mark T.

I ended up taking this route for the most part. The worker thread
first moves the file to be processed into a temp directory, and the
watch thread never knows about it again. I still had to implement my
StateQueue(Queue.Queue) so I could implement a function to return all
the items on the queue without popping them off.

Thanks all for your great ideas. My current response to multi-
threading... PITA!

~Sean
 
G

greg

I ended up taking this route for the most part. The worker thread
first moves the file to be processed into a temp directory,

No, the watcher thread should do this itself *before*
putting it into the work queue. Then there's no chance
of it picking up the same file twice before a worker
has gotten around to working on it, and there is no
need for a custom queue class.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,930
Messages
2,570,072
Members
46,522
Latest member
Mad-Ram

Latest Threads

Top