Real-world use of concurrent.futures

Discussion in 'Python' started by Andrew McLean, May 8, 2014.

  1. I have a problem that would benefit from a multithreaded implementation
    and having trouble understanding how to approach it using
    concurrent.futures.

    The details don't really matter, but it will probably help to be
    explicit. I have a large CSV file that contains a lot of fields, amongst
    them one containing email addresses. I want to write a program that
    validates the email addresses by checking that the domain names have a
    valid MX record. The output will be a copy of the file with any invalid
    email addresses removed. Because of latency in the DNS lookup this could
    benefit from multithreading.

    I have written similar code in the past using explicit threads
    communicating via queues. For this example, I could have a thread that
    read the file using csv.DictReader, putting dicts containing records
    from the input file into a (finite length) queue. Then I would have a
    number of worker threads reading the queue, performing the validation
    and putting validated results in a second queue. A final thread would
    read from the second queue writing the results to the output file.

    So far so good. However, I thought this would be an opportunity to
    explore concurrent.futures and to see whether it offered any benefits
    over the more explicit approach discussed above. The problem I am having
    is that all the discussions I can find of the use of concurrent.futures
    show use with toy problems involving just a few tasks. The url
    downloader in the documentation is typical, it proceeds as follows:

    1. Get an instance of concurrent.futuresThreadPoolExecutor
    2. Submit a few tasks to the executer
    3. Iterate over the results using concurrent.futures.as_completed

    That's fine, but I suspect that isn't a helpful pattern if I have a very
    large number of tasks. In my case I could run out of memory if I tried
    submitting all of the tasks to the executor before processing any of the
    results.

    I'm guessing what I want to do is, submit tasks in batches of perhaps a
    few hundred, iterate over the results until most are complete, then
    submit some more tasks and so on. I'm struggling to see how to do this
    elegantly without a lot of messy code just there to do "bookkeeping".
    This can't be an uncommon scenario. Am I missing something, or is this
    just not a job suitable for futures?

    Regards,

    Andrew
     
    Andrew McLean, May 8, 2014
    #1
    1. Advertisements

  2. This is related to flow control. You'll need an object for each flow
    (transaction). When new work comes in from the network, you'll have to
    see if you are hitting the maximum number of pending transactions, and
    not start another one before previous transactions have been processed.

    Whenever a transaction is completed, you pull in more work.


    Marko
     
    Marko Rauhamaa, May 8, 2014
    #2
    1. Advertisements

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments (here). After that, you can post your question and our members will help you out.