A little threading problem

A

Alban Hertroys

Hello all,

I need your wisdom again. I'm working on a multi-threaded application
that handles multiple data sources in small batches each time. The idea
is that there are 3 threads that run simultaneously, each read a fixed
number of records, and then they wait for eachother. After that the main
thread does some processing, and the threads are allowed to continue
reading data.

I summarized this part of the application in the attached python script,
which locks up rather early, for reasons that I don't understand (I
don't have a computer science education), and I'm pretty sure the
problem is related to what I'm trying to fix in my application. Can
anybody explain what's happening (Or maybe even show me a better way of
doing this)?

Regards,

Alban Hertroys,
MAG Productions.
 
A

Alban Hertroys

Jeremy said:
Alban Hertroys wrote:
Notify is called before thread B (in this case) hits the
condAllowed.wait() piece of code. So, it sits at that wait() for
forever (because it doesn't get notified, because the notification
already happened), waiting to be notified from the main thread, and the
main thread is waiting on thread B (again, in this case) to call
mainCond.notify(). This approach is a deadlock just wanting to happen
(not waiting, because it already did happen). What is it exactly that
you are trying to accomplish? I'm sure there is a better approach.

Hmm, I already learned something I didn't know by reading through my own
version of the output.
I added an extra counter, printed before waiting in the for-loop in
Main. My wrong assumption was that acquire() would block other threads
from acquiring until release() was called. In that case the for-loop
would wait 3 times (once for each thread), which is what I want.
Unfortunately, in my output I see this:

T-A: acquire mainCond
....
T-B: acquire mainCond
....
T-B: released mainCond
....
T-A: released mainCond

Which is exactly what I was trying to prevent...


But even then, as you pointed out, there is still the possibility that
one of the threads sends a notify() while the main loop isn't yet
waiting, no matter how short the timespan is that it's not waiting...


As for what I'm trying to do; I'm trying to merge three huge XML files
into single seperate database records. Every database record contains
related data from each of the XML files.
For practical purposes this is a two-stage process, where I first store
an uncombined "record" from each XML file into the DB (marked as
'partial'), and then periodicaly merge the related records into one
final record.

I could first store all data as 'partial' and then merge everything, but
I consider it better to do this with relatively small batches at a time
(queries are faster with smaller amounts of data, and the DB stays
smaller too).
The reason I use threads for this is that (to my knowledge) it is not
possible to pause an xml.parsers.xmlproc.xmlproc.Application object once
it starts parsing XML, but I can pause a thread.

This is a timeline of what I'm trying to do:

Main start |combine XML |comb.
|next batch |next
Application A run>..............*| | >...........*| | etc.
Application B run>.........*| | >..............*|
Application C run>................*| >..........*|

Legend:
> = thread is active
* = batch ready, wait()
| = timeline delimiter
 
A

Alban Hertroys

Jeremy said:
(not waiting, because it already did happen). What is it exactly that
you are trying to accomplish? I'm sure there is a better approach.

I think I saw at least a bit of the light, reading up on readers and
writers (A colleague showed up with a book called "Operating system
concepts" that has a chapter on process synchronization).
It looks like I should be writing and reading 3 Queues instead of trying
to halt and pause the threads explicitly. That looks a lot easier...

Thanks for pointing out the problem area.
 
J

Jeremy Jones

Alban said:
I think I saw at least a bit of the light, reading up on readers and
writers (A colleague showed up with a book called "Operating system
concepts" that has a chapter on process synchronization).
It looks like I should be writing and reading 3 Queues instead of
trying to halt and pause the threads explicitly. That looks a lot
easier...

Thanks for pointing out the problem area.

That's actually along the lines of what I was going to recommend after
getting more detail on what you are doing. A couple of things that may
(or may not) help you are:

* the Queue class in the Python standard library has a "maxsize"
parameter. When you create a queue, you can specify how large you want
it to grow. You can have your three threads busily parsing XML and
extracting data from it and putting it into a queue and when there are a
total of "maxsize" items in the queue, the next put() call (to put data
into the queue) will block until the consumer thread has reduced the
number of items in the queue. I've never used
xml.parsers.xmlproc.xmlproc.Application, but looking at the data, it
seems to resemble a SAX parser, so you should have no problem putting
(potentially blocking) calls to the queue into your handler. The only
thing this really buys you won't have read the whole XML file into memory.
* the get method on a queue object has a "block" flag. You can
effectively poll your queues something like this:

#untested code
#a_done, b_done and c_done are just checks to see if that particular
document is done
while not (a_done and b_done and c_done):
got_a, got_b, got_c = False, False, False
item_a, item_b, item_c = None, None, None
while (not a_done) and (not got_a):
try:
item_a = queue_a.get(0) #the 0 says don't block and raise an
Empty exception if there's nothing there
got_a = True
except Queue.Empty:
time.sleep(.3)
while (not b_done) and (not got_b):
try:
item_b = queue_b.get(0)
got_a = True
except Queue.Empty:
time.sleep(.3)
while (not c_done) and (not got_c):
try:
item_c = queue_c.get(0)
got_c = True
except Queue.Empty:
time.sleep(.3)
put_into_database_or_whatever(item_a, item_b, item_c)

This will allow you to deal with one item at a time and if the xml files
are different sizes, it should still work - you'll just pass None to
put_into_database_or_whaver for that particular file.

HTH.

Jeremy Jones
 
A

Alban Hertroys

Jeremy said:
* the get method on a queue object has a "block" flag. You can
effectively poll your queues something like this:

#untested code
#a_done, b_done and c_done are just checks to see if that particular
document is done
while not (a_done and b_done and c_done):
got_a, got_b, got_c = False, False, False
item_a, item_b, item_c = None, None, None
while (not a_done) and (not got_a):
try:
item_a = queue_a.get(0) #the 0 says don't block and raise an
Empty exception if there's nothing there

Actually, it is just fine to let get() block, as long as I put(None) on
the queue when I reach document_end and test for it (removing it from
the "list of queues to read" when get() returns 'None').

I rewrote my test script (the one I sent to the NG) to use Queues this
way, and it works well. It's also a lot easier to read/follow. Currently
I'm implementing it in my application.
I'm glad I don't get paid by the number of lines I write, there are
going to be less lines at the end of today ;)

Thanks a lot for the pointers.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,582
Members
45,066
Latest member
VytoKetoReviews

Latest Threads

Top