collect data using threads

Q

Qiangning Hong

A class Collector, it spawns several threads to read from serial port.
Collector.get_data() will get all the data they have read since last
call. Who can tell me whether my implementation correct?

class Collector(object):
def __init__(self):
self.data = []
spawn_work_bees(callback=self.on_received)

def on_received(self, a_piece_of_data):
"""This callback is executed in work bee threads!"""
self.data.append(a_piece_of_data)

def get_data(self):
x = self.data
self.data = []
return x

I am not very sure about the get_data() method. Will it cause data lose
if there is a thread is appending data to self.data at the same time?

Is there a more pythonic/standard recipe to collect thread data?

--
Qiangning Hong

_______________________________________________
< Those who can, do; those who can't, simulate. >
-----------------------------------------------
\ ___-------___
\ _-~~ ~~-_
\ _-~ /~-_
/^\__/^\ /~ \ / \
/| O|| O| / \_______________/ \
| |___||__| / / \ \
| \ / / \ \
| (_______) /______/ \_________ \
| / / \ / \
\ \^\\ \ / \ /
\ || \______________/ _-_ //\__//
\ ||------_-~~-_ ------------- \ --/~ ~\ || __/
~-----||====/~ |==================| |/~~~~~
(_(__/ ./ / \_\ \.
(_(___/ \_____)_)
 
P

Peter Hansen

Qiangning said:
A class Collector, it spawns several threads to read from serial port.
Collector.get_data() will get all the data they have read since last
call. Who can tell me whether my implementation correct? [snip sample with a list]
I am not very sure about the get_data() method. Will it cause data lose
if there is a thread is appending data to self.data at the same time?

That will not work, and you will get data loss, as Jeremy points out.

Normally Python lists are safe, but your key problem (in this code) is
that you are rebinding self.data to a new list! If another thread calls
on_received() just after the line "x = self.data" executes, then the new
data will never be seen.

One option that would work safely** is to change get_data() to look like
this:

def get_data(self):
count = len(self.data)
result = self.data[:count]
del self.data[count:]
return result

This does what yours was trying to do, but safely. Not that it doesn't
reassign self.data, but rather uses a single operation (del) to remove
all the "preserved" elements at once. It's possible that after the
first or second line a call to on_received() will add data, but it
simply won't be seen until the next call to get_data(), rather than
being lost.

** I'm showing you this to help you understand why your own approach was
wrong, not to give you code that you should use. The key problem with
even my approach is that it *assumes things about the implementation*.
Specifically, there are no guarantees in Python the Language (as opposed
to CPython, the implementation) about the thread-safety of working with
lists like this. In fact, in Jython (and possibly other Python
implementations) this would definitely have problems. Unless you are
certain your code will run only under CPython, and you're willing to put
comments in the code about potential thread safety issues, you should
probably just follow Jeremy's advice and use Queue. As a side benefit,
Queues are much easier to work with!

-Peter
 
K

Kent Johnson

Peter said:
Qiangning said:
A class Collector, it spawns several threads to read from serial port.
Collector.get_data() will get all the data they have read since last
call. Who can tell me whether my implementation correct?

[snip sample with a list]
I am not very sure about the get_data() method. Will it cause data lose
if there is a thread is appending data to self.data at the same time?


That will not work, and you will get data loss, as Jeremy points out.

Normally Python lists are safe, but your key problem (in this code) is
that you are rebinding self.data to a new list! If another thread calls
on_received() just after the line "x = self.data" executes, then the new
data will never be seen.

Can you explain why not? self.data is still bound to the same list as x. At least if the execution sequence is
x = self.data
self.data.append(a_piece_of_data)
self.data = []

ISTM it should work.

I'm not arguing in favor of the original code, I'm just trying to understand your specific failure mode.

Thanks,
Kent
 
J

James Tanis

Previously, on Jun 14, Jeremy Jones said:

# Kent Johnson wrote:
#
# > Peter Hansen wrote:
# >
# > > Qiangning Hong wrote:
# > >
# > >
# > > > A class Collector, it spawns several threads to read from serial port.
# > > > Collector.get_data() will get all the data they have read since last
# > > > call. Who can tell me whether my implementation correct?
# > > >
# > > [snip sample with a list]
# > >
# > >
# > > > I am not very sure about the get_data() method. Will it cause data lose
# > > > if there is a thread is appending data to self.data at the same time?
# > > >
# > > That will not work, and you will get data loss, as Jeremy points out.
# > >
# > > Normally Python lists are safe, but your key problem (in this code) is
# > > that you are rebinding self.data to a new list! If another thread calls
# > > on_received() just after the line "x = self.data" executes, then the new
# > > data will never be seen.
# > >
# >
# > Can you explain why not? self.data is still bound to the same list as x. At
# > least if the execution sequence is x = self.data
# > self.data.append(a_piece_of_data)
# > self.data = []
# >
# > ISTM it should work.
# >
# > I'm not arguing in favor of the original code, I'm just trying to understand
# > your specific failure mode.
# >
# > Thanks,
# > Kent
# >
# Here's the original code:
#
# class Collector(object):
# def __init__(self):
# self.data = []
# spawn_work_bees(callback=self.on_received)
#
# def on_received(self, a_piece_of_data):
# """This callback is executed in work bee threads!"""
# self.data.append(a_piece_of_data)
#
# def get_data(self):
# x = self.data
# self.data = []
# return x
#
# The more I look at this, the more I'm not sure whether data loss will occur.
# For me, that's good enough reason to rewrite this code. I'd rather be clear
# and certain than clever anyday.
# So, let's say you a thread T1 which starts in ``get_data()`` and makes it as
# far as ``x = self.data``. Then another thread T2 comes along in
# ``on_received()`` and gets as far as ``self.data.append(a_piece_of_data)``.
# ``x`` in T1's get_data()`` (as you pointed out) is still pointing to the list
# that T2 just appended to and T1 will return that list. But what happens if
# you get multiple guys in ``get_data()`` and multiple guys in
# ``on_received()``? I can't prove it, but it seems like you're going to have
# an uncertain outcome. If you're just dealing with 2 threads, I can't see how
# that would be unsafe. Maybe someone could come up with a use case that would
# disprove that. But if you've got, say, 4 threads, 2 in each method....that's
# gonna get messy.
# And, honestly, I'm trying *really* hard to come up with a scenario that would
# lose data and I can't. Maybe someone like Peter or Aahz or some little 13
# year old in Topeka who's smarter than me can come up with something. But I do
# know this - the more I think about this as to whether this is unsafe or not is
# making my head hurt. If you have a piece of code that you have to spend that
# much time on trying to figure out if it is threadsafe or not, why would you
# leave it as is? Maybe the rest of you are more confident in your thinking and
# programming skills than I am, but I would quickly slap a Queue in there. If
# for nothing else than to rest from simulating in my head 1, 2, 3, 5, 10
# threads in the ``get_data()`` method while various threads are in the
# ``on_received()`` method. Aaaagghhh.....need....motrin......
#
#
# Jeremy Jones
#

I may be wrong here, but shouldn't you just use a stack, or in other
words, use the list as a stack and just pop the data off the top. I
believe there is a method pop() already supplied for you. Since
you wouldn't require an self.data = [] this should allow you to safely
remove the data you've already seen without accidentally removing data
that may have been added in the mean time.
 
P

Peter Hansen

James said:
I may be wrong here, but shouldn't you just use a stack, or in other
words, use the list as a stack and just pop the data off the top. I
believe there is a method pop() already supplied for you.

Just a note on terminology here. I believe the word "stack" generally
refers to a LIFO (last-in first-out) structure, not what the OP needs
which is a FIFO (first-in first-out).

Assuming you would refer to the .append() operation as "putting data on
the bottom", then to pop off the "top" you would use pop(0), not just
pop().

Normally though, I think one would refer to these as the head and tail
(not top and bottom), and probably call the whole thing a queue, rather
than a stack.

-Peter
 
P

Peter Hansen

Kent said:
Peter said:
That will not work, and you will get data loss, as Jeremy points out.
Can you explain why not? self.data is still bound to the same list as x.
At least if the execution sequence is x = self.data
self.data.append(a_piece_of_data)
self.data = []

Ah, since the entire list is being returned, you appear to be correct.
Interesting... this means the OP's code is actually appending things to
a list, over and over (presumably), then returning a reference to that
list and rebinding the internal variable to a new list. If another
thread calls on_received() and causes new data to be appended to "the
list" between those two statements, then it will show up in the returned
list (rather magically, at least to my way of looking at it) and will
not in fact be lost.

Good catch Kent. :)

-Peter
 
P

Peter Hansen

Peter said:
Just a note on terminology here. I believe the word "stack" generally
refers to a LIFO (last-in first-out) structure, not what the OP needs
which is a FIFO (first-in first-out).

Or, perhaps he doesn't need either... as Kent points out (I should have
read his post before replying above) this isn't what I think James and I
both thought it was but something a little less usual...

-Peter
 
Q

Qiangning Hong

James said:
# > > > A class Collector, it spawns several threads to read from serial port.
# > > > Collector.get_data() will get all the data they have read since last
# > > > call. Who can tell me whether my implementation correct?
# > > >
# Here's the original code:
#
# class Collector(object):
# def __init__(self):
# self.data = []
# spawn_work_bees(callback=self.on_received)
#
# def on_received(self, a_piece_of_data):
# """This callback is executed in work bee threads!"""
# self.data.append(a_piece_of_data)
#
# def get_data(self):
# x = self.data
# self.data = []
# return x
#
I may be wrong here, but shouldn't you just use a stack, or in other
words, use the list as a stack and just pop the data off the top. I
believe there is a method pop() already supplied for you. Since
you wouldn't require an self.data = [] this should allow you to safely
remove the data you've already seen without accidentally removing data
that may have been added in the mean time.

I am the original poster.

I actually had considered Queue and pop() before I wrote the above code.
However, because there is a lot of data to get every time I call
get_data(), I want a more CPU friendly way to avoid the while-loop and
empty checking, and then the above code comes out. But I am not very
sure whether it will cause serious problem or not, so I ask here. If
anyone can prove it is correct, I'll use it in my program, else I'll go
back to the Queue solution.

To Jeremy Jones: I am very sorry to take you too much effort on this
weird code. I should make it clear that there is only *one* thread (the
main thread in my application) calls the get_data() method,
periodically, driven by a timer. And for on_received(), there may be up
to 16 threads accessing it simultaneously.


--
Qiangning Hong

___________________________________________________________
/ BOFH Excuse #208: \
| |
| Your mail is being routed through Germany ... and they're |
\ censoring us. /
-----------------------------------------------------------
\ . _ .
\ |\_|/__/|
/ / \/ \ \
/__|O||O|__ \
|/_ \_/\_/ _\ |
| | (____) | ||
\/\___/\__/ //
(_/ ||
| ||
| ||\
\ //_/
\______//
__ || __||
(____(____)
 
T

Toby Dickenson

Kent said:
Peter said:
That will not work, and you will get data loss, as Jeremy points out.
Can you explain why not? self.data is still bound to the same list as x.
At least if the execution sequence is x = self.data
self.data.append(a_piece_of_data)
self.data = []

Ah, since the entire list is being returned, you appear to be correct.
Interesting... this means the OP's code is actually appending things to
a list, over and over (presumably), then returning a reference to that
list and rebinding the internal variable to a new list. If another
thread calls on_received() and causes new data to be appended to "the
list" between those two statements, then it will show up in the returned
list (rather magically, at least to my way of looking at it) and will
not in fact be lost.

But it might not "show up" until too late.

The consumer thread that called get_data presumably does something with that
list, such as iterating over its contents. It might only "show up" after that
iteration has finished, when the consumer has discarded its reference to the
shared list.
 
K

Kent Johnson

Qiangning said:
I actually had considered Queue and pop() before I wrote the above code.
However, because there is a lot of data to get every time I call
get_data(), I want a more CPU friendly way to avoid the while-loop and
empty checking, and then the above code comes out. But I am not very
sure whether it will cause serious problem or not, so I ask here. If
anyone can prove it is correct, I'll use it in my program, else I'll go
back to the Queue solution.

OK, here is a real failure mode. Here is the code and the disassembly: ... def __init__(self):
... self.data = []
... def on_received(self, a_piece_of_data):
... """This callback is executed in work bee threads!"""
... self.data.append(a_piece_of_data)
... def get_data(self):
... x = self.data
... self.data = []
... return x
... 6 0 LOAD_FAST 0 (self)
3 LOAD_ATTR 1 (data)
6 LOAD_ATTR 2 (append)
9 LOAD_FAST 1 (a_piece_of_data)
12 CALL_FUNCTION 1
15 POP_TOP
16 LOAD_CONST 1 (None)
19 RETURN_VALUE 8 0 LOAD_FAST 0 (self)
3 LOAD_ATTR 1 (data)
6 STORE_FAST 1 (x)

9 9 BUILD_LIST 0
12 LOAD_FAST 0 (self)
15 STORE_ATTR 1 (data)

10 18 LOAD_FAST 1 (x)
21 RETURN_VALUE

Imagine the thread calling on_received() gets as far as LOAD_ATTR (data), LOAD_ATTR (append) or LOAD_FAST (a_piece_of_data), so it has a reference to self.data; then it blocks and the get_data() thread runs. The get_data() thread could call get_data() and *finish processing the returned list* before the on_received() thread runs again and actually appends to the list. The appended value will never be processed.

If you want to avoid the overhead of a Queue.get() for each data element you could just put your own mutex into on_received() and get_data().

Kent
 
J

James Tanis

Previously, on Jun 14, Peter Hansen said:

# James Tanis wrote:
# > I may be wrong here, but shouldn't you just use a stack, or in other
# > words, use the list as a stack and just pop the data off the top. I
# > believe there is a method pop() already supplied for you.
#
# Just a note on terminology here. I believe the word "stack" generally
# refers to a LIFO (last-in first-out) structure, not what the OP needs
# which is a FIFO (first-in first-out).

What can I say? Lack of sleep.

#
# Assuming you would refer to the .append() operation as "putting data on
# the bottom", then to pop off the "top" you would use pop(0), not just
# pop().

Right, except I'm not writing his code for him, and I don't think he
expects me too. I was just referring to the existance of a pop()
function, perhaps I should have said pop([int]) to be clearer. Its use
would of course have to be tailored to his code depending on what he
requires.

#
# Normally though, I think one would refer to these as the head and tail
# (not top and bottom), and probably call the whole thing a queue, rather
# than a stack.

I agree, its been a while and I mixed the two names up, nothing more.
 
P

Peter Hansen

Toby said:
But it might not "show up" until too late.

The consumer thread that called get_data presumably does something with that
list, such as iterating over its contents. It might only "show up" after that
iteration has finished, when the consumer has discarded its reference to the
shared list.

I was going to point out that the consuming thread is the one calling
get_data(), and therefore by the time it returns (to iterate over the
contents), self.data has already been rebound to a new list.

That was before Kent correctly analyzed this yet again and shows how the
on_received call can itself be the source of the trouble, via the
separate attribute lookup and append call. (I'm going to hand in my
multi-threading merit badge and report to Aahz for another Queue
"reprogramming" session for missing on this twice.)

-Peter
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,744
Messages
2,569,482
Members
44,901
Latest member
Noble71S45

Latest Threads

Top