Problem with subprocess in threaded enviroment

N

Ningyu Shi

I'm trying to write a multi-task downloader to download files from a
website using multi-threading. I have one thread to analyze the
webpage, get the addresses of the files to be downloaded and put these
in a Queue. Then the main thread will start some threads to get the
address from the queue and download it. To keep the maximum files
downloaded concurrently, I use a semaphore to control this, like at
most 5 downloads at same time.

I tried to use urllib.urlretreive in the download() thread, but from
time to time, it seems that one download thread may freeze the whole
program. Then I gave up and use subprocess to call wget to do the job.
My download thread is like this:

def download( url ):
subprocess.call(["wget", "-q", url])
with print_lock:
print url, 'finished.'
semaphore.realease()

But later I found that after the specific wget job finished
downloading, that download() thread never reach the print url
statement. So I end up with files been downloaded, but the download()
thread never ends and don't realease the semaphore then block the
whole program. My guess is that at the time wget process ends, that
specific download thread is not active and missed the return of the
call.

Any comment and suggestion about this problem? Thanks
 
D

dripton

I'm trying to write a multi-task downloader to download files from a
website using multi-threading. I have one thread to analyze the
webpage, get the addresses of the files to be downloaded and put these
in a Queue. Then the main thread will start some threads to get the
address from the queue and download it. To keep the maximum files
downloaded concurrently, I use a semaphore to control this, like at
most 5 downloads at same time.

You don't need a semaphore for that. Semaphores are low-level and
very hard to get right. (Read http://www.greenteapress.com/semaphores/
if you don't believe me.) You don't need any other type of explicit
lock either. Just use Queues.

Queues are much easier. Make simple threads, each of which typically
reads from one queue and writes to one queue. If you want N-way
concurrency at one stage, start N instances of that thread. If you
need to guarantee non-concurrency at some stage, make that thread a
singleton. Avoid sharing mutable data across threads except through
Queues.
I tried to use urllib.urlretreive in the download() thread, but from
time to time, it seems that one download thread may freeze the whole
program. Then I gave up and use subprocess to call wget to do the job.
My download thread is like this:

def download( url ):
subprocess.call(["wget", "-q", url])
with print_lock:
print url, 'finished.'
semaphore.realease()

If the prints are just for debugging, print_lock is overkill. And in
practice that print statement is probably atomic in CPython because of
the GIL.

But let's assume that it really is critical not to mix the output
streams, and that the print isn't necessarily atomic. So instead of
the lock, use a print_queue and a single PrintThread that just pulls
strings off the queue and prints them.
But later I found that after the specific wget job finished
downloading, that download() thread never reach the print url
statement. So I end up with files been downloaded, but the download()
thread never ends and don't realease the semaphore then block the
whole program. My guess is that at the time wget process ends, that
specific download thread is not active and missed the return of the
call.

The fact that changing libraries didn't fix your problem is a strong
indicator that it's your code, not the libraries. You're probably
introducing deadlocks with your semaphores. Too much locking is as
bad as too little.

Here's the skeleton of how I'd write this program:

main thread: create queues, create and start other threads, find URLs,
put URLs on input_queue

class DownloadThread(threading.Thread):
def run(self):
while True:
url = input_queue.get()
urllib.urlretrieve(url)
print_queue.put("done with %s" % url)

class PrintThread(threading.Thread):
def run(self):
while True:
st = print_queue.get()
print st

Then you just have to worry about clean shutdown. A simple way to
handle this to initialize a counter to the number of URLs in main, and
decrement it in PrintThread. Then have main busy-wait (with a sleep
to avoid consuming too much CPU) for it to hit zero, then join all the
other threads and exit. (This seems to violate the no-shared-mutable-
data rule, but only one thread at a time can mutate it, so it's safe.)

Once you've done this manually, consider using the TaskQueue recipe on
the Python Cookbook site, to simplify it a bit.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,767
Messages
2,569,572
Members
45,045
Latest member
DRCM

Latest Threads

Top