Real-world use of concurrent.futures

A

Andrew McLean

I have a problem that would benefit from a multithreaded implementation
and having trouble understanding how to approach it using
concurrent.futures.

The details don't really matter, but it will probably help to be
explicit. I have a large CSV file that contains a lot of fields, amongst
them one containing email addresses. I want to write a program that
validates the email addresses by checking that the domain names have a
valid MX record. The output will be a copy of the file with any invalid
email addresses removed. Because of latency in the DNS lookup this could
benefit from multithreading.

I have written similar code in the past using explicit threads
communicating via queues. For this example, I could have a thread that
read the file using csv.DictReader, putting dicts containing records
from the input file into a (finite length) queue. Then I would have a
number of worker threads reading the queue, performing the validation
and putting validated results in a second queue. A final thread would
read from the second queue writing the results to the output file.

So far so good. However, I thought this would be an opportunity to
explore concurrent.futures and to see whether it offered any benefits
over the more explicit approach discussed above. The problem I am having
is that all the discussions I can find of the use of concurrent.futures
show use with toy problems involving just a few tasks. The url
downloader in the documentation is typical, it proceeds as follows:

1. Get an instance of concurrent.futuresThreadPoolExecutor
2. Submit a few tasks to the executer
3. Iterate over the results using concurrent.futures.as_completed

That's fine, but I suspect that isn't a helpful pattern if I have a very
large number of tasks. In my case I could run out of memory if I tried
submitting all of the tasks to the executor before processing any of the
results.

I'm guessing what I want to do is, submit tasks in batches of perhaps a
few hundred, iterate over the results until most are complete, then
submit some more tasks and so on. I'm struggling to see how to do this
elegantly without a lot of messy code just there to do "bookkeeping".
This can't be an uncommon scenario. Am I missing something, or is this
just not a job suitable for futures?

Regards,

Andrew
 
M

Marko Rauhamaa

Andrew McLean said:
That's fine, but I suspect that isn't a helpful pattern if I have a
very large number of tasks. In my case I could run out of memory if I
tried submitting all of the tasks to the executor before processing
any of the results.

This is related to flow control. You'll need an object for each flow
(transaction). When new work comes in from the network, you'll have to
see if you are hitting the maximum number of pending transactions, and
not start another one before previous transactions have been processed.

Whenever a transaction is completed, you pull in more work.


Marko
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,020
Latest member
GenesisGai

Latest Threads

Top