Refactor a buffered class...

L

lh84777

Hello,

i'm looking for this behaviour and i write a piece of code which works,
but it looks odd to me. can someone help me to refactor it ?

i would like to walk across a list of items by series of N (N=3 below)
of these. i had explicit mark of end of a sequence (here it is '.')
which may be any length and is composed of words.

for: s = "this . is a . test to . check if it . works . well . it looks
.. like ."
the output should be (if grouping by 3) like:

=> this .
=> this . is a .
=> this . is a . test to .
=> is a . test to . check if it .
=> test to . check if it . works .
=> check if it . works . well .
=> works . well . it looks .
=> well . it looks . like .

my piece of code :

import sys

class MyBuffer:
def __init__(self):
self.acc = []
self.sentries = [0, ]
def append(self, item):
self.acc.append(item)
def addSentry(self):
self.sentries.append(len(self.acc))
print >> sys.stderr, "\t", self.sentries
def checkSentry(self, size, keepFirst):
n = len(self.sentries) - 1
if keepFirst and n < size:
return self.acc
if n % size == 0:
result = self.acc
first = self.sentries[1]
self.acc = self.acc[first:]
self.sentries = [x - first for x in self.sentries]
self.sentries.pop(0)
return result

s = "this . is a . test to . check if it . works . well . it looks .
like ."
l = s.split()
print l

mb = MyBuffer()
n = 0
for x in l:
mb.append(x)
if x == '.':
# end of something
print "+", n
n += 1
mb.addSentry()
current = mb.checkSentry(3, True) # GROUPING BY 3
if current:
print "=>", current
 
M

Michael Spencer

Hello,

i'm looking for this behaviour and i write a piece of code which works,
but it looks odd to me. can someone help me to refactor it ?

i would like to walk across a list of items by series of N (N=3 below)
of these. i had explicit mark of end of a sequence (here it is '.')
which may be any length and is composed of words.

for: s = "this . is a . test to . check if it . works . well . it looks
. like ."
the output should be (if grouping by 3) like:

=> this .
=> this . is a .
=> this . is a . test to .
=> is a . test to . check if it .
=> test to . check if it . works .
=> check if it . works . well .
=> works . well . it looks .
=> well . it looks . like .

my piece of code :

import sys

class MyBuffer:
def __init__(self):
self.acc = []
self.sentries = [0, ]
def append(self, item):
self.acc.append(item)
def addSentry(self):
self.sentries.append(len(self.acc))
print >> sys.stderr, "\t", self.sentries
def checkSentry(self, size, keepFirst):
n = len(self.sentries) - 1
if keepFirst and n < size:
return self.acc
if n % size == 0:
result = self.acc
first = self.sentries[1]
self.acc = self.acc[first:]
self.sentries = [x - first for x in self.sentries]
self.sentries.pop(0)
return result

s = "this . is a . test to . check if it . works . well . it looks .
like ."
l = s.split()
print l

mb = MyBuffer()
n = 0
for x in l:
mb.append(x)
if x == '.':
# end of something
print "+", n
n += 1
mb.addSentry()
current = mb.checkSentry(3, True) # GROUPING BY 3
if current:
print "=>", current
If you just need to 'walk across a list of items', then your buffer class and
helper function seem unnecessary complex. A generator would do the trick,
something like:
.... buffer=[]
.... sentry_count = 0
.... for item in s:
.... buffer.append(item)
.... if item == sentry:
.... sentry_count += 1
.... if sentry_count > chunk_size:
.... del buffer[:buffer.index(sentry)+1]
.... yield buffer
....

....
this .
this . is a .
this . is a . test to .
is a . test to . check if it .
test to . check if it . works .
check if it . works . well .
works . well . it looks. like .
HTH
Michael
 
L

lh84777

Michael Spencer a écrit :
If you just need to 'walk across a list of items', then your buffer class and
helper function seem unnecessary complex. A generator would do the trick,
something like:

actually for the example i have used only one sentry condition by they
are more numerous and complex, also i need to work on a huge amount on
data (each word are a line with many features readed from a file)

but generators are interesting stuff that i'm going to look closer.

thanks.
 
L

lh84777

Here is another version,

class ChunkeredBuffer:
def __init__(self):
self.buffer = []
self.sentries = []
def append(self, item):
self.buffer.append(item)
def chunk(self, chunkSize, keepFirst = False):
self.sentries.append(len(self.buffer))
forget = self.sentries[:-chunkSize]
if not keepFirst and len(self.sentries) < chunkSize:
return
if forget != []:
last = forget[-1]
self.buffer = self.buffer[last:]
self.sentries = [x - last for x in self.sentries[1:]]
print >> sys.stderr, self.sentries, len(self.sentries), forget
return self.buffer

but i was wondering how i could add, the last items if needed:

it looks . like .
like .

to the previous:

this .
this . is a .
this . is a . test to .
is a . test to . check if it .
test to . check if it . works .
check if it . works . well .
works . well . it looks like .

to have:

this .
this . is a .
this . is a . test to .
is a . test to . check if it .
test to . check if it . works .
check if it . works . well .
works . well . it looks like .
it looks . like .
like .
 
L

lh84777

oops
to have:

this .
this . is a .
this . is a . test to .
is a . test to . check if it .
test to . check if it . works .
check if it . works . well .
works . well . it looks like .
well . it looks like .
it looks like .
 
M

Michael Spencer

actually for the example i have used only one sentry condition by they
are more numerous and complex, also i need to work on a huge amount on
data (each word are a line with many features readed from a file)
An open (text) file is a line-based iterator that can be fed directly to
'chunker'. As for different sentry conditions, I imagine they can be coded in
either model. How much is a 'huge amount' of data?
oops

well . it looks like .
it looks like .
Here's a small update to the generator that allows optional handling of the head
and the tail:

def chunker(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[]
sentry_count = 0

for item in s:
buffer.append(item)
if item == sentry:
sentry_count += 1
if sentry_count < chunk_size:
if keep_first:
yield buffer
else:
yield buffer
del buffer[:buffer.index(sentry)+1]

if keep_last:
while buffer:
yield buffer
del buffer[:buffer.index(sentry)+1]

".join(p)
....
this .
this . is a .
this . is a . test to .
is a . test to . check if it .
test to . check if it . works .
check if it . works . well .
works . well . it looks like .
well . it looks like .
it looks like .
 
G

George Sakkis

Michael said:
Here's a small update to the generator that allows optional handling of the head
and the tail:

def chunker(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[]
sentry_count = 0

for item in s:
buffer.append(item)
if item == sentry:
sentry_count += 1
if sentry_count < chunk_size:
if keep_first:
yield buffer
else:
yield buffer
del buffer[:buffer.index(sentry)+1]

if keep_last:
while buffer:
yield buffer
del buffer[:buffer.index(sentry)+1]


And here's a (probably) more efficient version, using a deque as a
buffer:

from itertools import islice
from collections import deque

def chunker(seq, sentry='.', chunk_size=3, keep_first=False,
keep_last=False):
def format_chunks(chunks):
return [' '.join(chunk) for chunk in chunks]
iterchunks = itersplit(seq,sentry)
buf = deque()
for chunk in islice(iterchunks, chunk_size-1):
buf.append(chunk)
if keep_first:
yield format_chunks(buf)
for chunk in iterchunks:
buf.append(chunk)
yield format_chunks(buf)
buf.popleft()
if keep_last:
while buf:
yield format_chunks(buf)
buf.popleft()


def itersplit(seq, sentry='.'):
# split the iterable `seq` on each `sentry`
buf = []
for x in seq:
if x != sentry:
buf.append(x)
else:
yield buf
buf = []
if buf:
yield buf


s = " this . is a . test to . check if it . works . well . it looks .
like ."
for p in chunker(s.split(), keep_last=True, keep_first=True):
print p


George
 
M

Michael Spencer

George said:
Michael said:
Here's a small update to the generator that allows optional handling of the head
and the tail:

def chunker(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[]
....

And here's a (probably) more efficient version, using a deque as a
buffer:

Perhaps the deque-based solution is more efficient under some conditions, but
it's significantly slower for all the cases I tested:

Here are some typical results:

Using George's deque buffer:'get_chunks(...) 35 iterations, 14.41msec per call'

Using the list buffer
Test functions follow:

def make_seq(groups = 1000, words_per_group = 3, word_length = 76, sentry = "."):
"""Make a sequence of test input for chunker
['WW', 'WW', 'WW', 'WW', 'WW', '%', 'WW', 'WW', 'WW', 'WW', 'WW', '%',
'WW', 'WW', 'WW', 'WW', 'WW', '%', 'WW', 'WW', 'WW', 'WW', 'WW', '%',
'WW', 'WW', 'WW', 'WW', 'WW', '%']
"""
word = "W"*word_length
group = [word]*words_per_group+[sentry]
return group*groups

def time_chunkers(chunk_func, groups = 1000, words_per_group=10, chunk_size=3):
"""Test harness for chunker functions"""
seq = make_seq(groups)
def get_chunks(chunk_func, seq):
return list(chunk_func(seq))
return timefunc(get_chunks, chunk_func, seq)

def _get_timer():
import sys
import time
if sys.platform == "win32":
return time.clock
else:
return time.time
return

def timefunc(func, *args, **kwds):
timer = _get_timer()
count, totaltime = 0, 0
while totaltime < 0.5:
t1 = timer()
res = func(*args, **kwds)
t2 = timer()
totaltime += (t2-t1)
count += 1
if count > 1000:
unit = "usec"
timeper = totaltime * 1000000 / count
else:
unit = "msec"
timeper = totaltime * 1000 / count
return "%s(...) %s iterations, %.2f%s per call" % \
(func.__name__, count, timeper, unit)
 
P

Paul Rubin

for: s = "this . is a . test to . check if it . works . well . it looks
. like ."
the output should be (if grouping by 3) like:

=> this .
=> this . is a .

I don't understand, you mean you have all the items in advance?
Can't you do something like this? I got bleary trying to figure out
the question, so I'm sorry if I didn't grok it correctly.

def f(n, items):
t = len(items)
for i in xrange(-(n-1), t-n):
print items[max(i,0):max(i+n,0)]

s = 'this . is a . test to . check if it . works . well . it looks . like .'
f(3, s.split('.'))
['this ']
['this ', ' is a ']
['this ', ' is a ', ' test to ']
[' is a ', ' test to ', ' check if it ']
[' test to ', ' check if it ', ' works ']
[' check if it ', ' works ', ' well ']
[' works ', ' well ', ' it looks ']
[' well ', ' it looks ', ' like ']
 
G

George Sakkis

Michael said:
George said:
Michael said:
Here's a small update to the generator that allows optional handling of the head
and the tail:

def chunker(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[]
...

And here's a (probably) more efficient version, using a deque as a
buffer:

Perhaps the deque-based solution is more efficient under some conditions, but
it's significantly slower for all the cases I tested:

First off, if you're going to profile something, better use the
standard timeit module rather than a quick, dirty and most likely buggy
handmade function. From "Dive into python": "The most important thing
you need to know about optimizing Python code is that you shouldn't
write your own timing function.". As it turns out, none of chunk_size,
words_per_group and word_length are taken into account in your tests;
they all have their default values. By the way, word_length is
irrelevant since the input is already a sequence, not a big string to
be split.

Second, the output of the two functions is different, so you're not
comparing apples to apples:....
['this', '.']
['this', '.', 'is', 'a', '.']
['this', '.', 'is', 'a', '.', 'test', 'to', '.']
['is', 'a', '.', 'test', 'to', '.', 'check', 'if', 'it', '.']
['test', 'to', '.', 'check', 'if', 'it', '.', 'works', '.']
['check', 'if', 'it', '.', 'works', '.', 'well', '.']
['works', '.', 'well', '.', 'it', 'looks', '.']
['well', '.', 'it', 'looks', '.', 'like', '.']
['it', 'looks', '.', 'like', '.']
['like', '.']....
['this']
['this', 'is a']
['this', 'is a', 'test to']
['is a', 'test to', 'check if it']
['test to', 'check if it', 'works']
['check if it', 'works', 'well']
['works', 'well', 'it looks']
['well', 'it looks', 'like']
['it looks', 'like']
['like']

Third, and most important for the measured difference, is that the
performance hit in my function came from joining the words of each
group (['check', 'if', 'it'] -> 'check if it') every time it is
yielded. If the groups are left unjoined as in Michael's version, the
results are quite different:

* chunk_size=3
chunkerGS: 0.98 seconds
chunkerMS: 1.04 seconds
* chunk_size=30
chunkerGS: 0.98 seconds
chunkerMS: 1.17 seconds
* chunk_size=300
chunkerGS: 0.98 seconds
chunkerMS: 2.44 seconds

As expected, the deque solution is not affected by chunk_size. Here is
the full test:

# chunkers.py
from itertools import islice
from collections import deque

def chunkerMS(s, chunk_size=3, sentry=".", keep_first = False,
keep_last = False):
buffer=[]
sentry_count = 0
for item in s:
buffer.append(item)
if item == sentry:
sentry_count += 1
if sentry_count < chunk_size:
if keep_first:
yield buffer
else:
yield buffer
del buffer[:buffer.index(sentry)+1]
if keep_last:
while buffer:
yield buffer
del buffer[:buffer.index(sentry)+1]

def chunkerGS(seq, sentry='.', chunk_size=3, keep_first=False,
keep_last=False):
buf = deque()
iterchunks = itersplit(seq,sentry)
for chunk in islice(iterchunks, chunk_size-1):
buf.append(chunk)
if keep_first:
yield buf
for chunk in iterchunks:
buf.append(chunk)
yield buf
buf.popleft()
if keep_last:
while buf:
yield buf
buf.popleft()

def itersplit(seq, sentry='.'):
# split the iterable `seq` on each `sentry`
buf = []
for x in seq:
if x != sentry:
buf.append(x)
else:
yield buf
buf = []
if buf:
yield buf

def make_seq(groups=1000, words_per_group=3, sentry='.'):
group = ['w']*words_per_group
group.append(sentry)
return group*groups


if __name__ == '__main__':
import timeit
for chunk_size in 3,30,300:
print "* chunk_size=%d" % chunk_size
for func in chunkerGS,chunkerMS:
name = func.__name__
t = timeit.Timer(
"for p in %s(s, chunk_size=%d, keep_last=True,"
"keep_first=True):pass" %
(name,chunk_size),
"from chunkers import make_seq,chunkerGS,chunkerMS;"
"s=make_seq(groups=5000, words_per_group=500)")
print "%s: %.2f seconds" % (name, min(t.repeat(3,1)))
 
M

Michael Spencer

George said:
Michael said:
George said:
Michael Spencer wrote:

def chunker(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[] ...
And here's a (probably) more efficient version, using a deque as a
buffer:
Perhaps the deque-based solution is more efficient under some conditions, but
it's significantly slower for all the cases I tested:

As it turns out, none of chunk_size,
words_per_group and word_length are taken into account in your tests;
they all have their default values.

Hello George

Yep, you're right, the test was broken. chunkerGS beats chunkerMS handily, in
some cases, in particular for large chunk_size.

Second, the output of the two functions is different, so you're not
comparing apples to apples:

And, to be fair, neither meets the OP spec for joined output
> Third, and most important for the measured difference, is that the
> performance hit in my function came from joining the words of each
> group (['check', 'if', 'it'] -> 'check if it') every time it is
> yielded. If the groups are left unjoined as in Michael's version, the
> results are quite different:


Second and Third are basically the same point i.e., join dominates the
comparison. But your function *needs* an extra join to get the OP's specified
output.

I think the two versions below each give the 'correct' output wrt to the OP's
single test case. I measure chunkerMS2 to be faster than chunkerGS2 across all
chunk sizes, but this is all about the joins.

I conclude that chunkerGS's deque beats chunkerMS's list for large chunk_size (~
>100). But for joined output, chunkerMS2 beats chunkerGS2 because it does less joining.

> if you're going to profile something, better use the
> standard timeit module
....
OT: I will when timeit grows a capability for testing live objects rather than
'small code snippets'. Requiring source code input and passing arguments by
string substitution makes it too painful for interactive work. The need to
specify the number of repeats is an additional annoyance.


Cheers

Michael


#Revised functions with joined output, per OP spec

def chunkerMS2(s, chunk_size=3, sentry=".", keep_first = False, keep_last = False):
buffer=[]
sentry_count = 0

for item in s:
buffer.append(item)
if item == sentry:
sentry_count += 1
if sentry_count < chunk_size:
if keep_first:
yield " ".join(buffer)
else:
yield " ".join(buffer)
del buffer[:buffer.index(sentry)+1]

if keep_last:
while buffer:
yield " ".join(buffer)
del buffer[:buffer.index(sentry)+1]

def chunkerGS2(seq, sentry='.', chunk_size=3, keep_first=False,
keep_last=False):
def format_chunks(chunks):
return " . ".join(' '.join(chunk) for chunk in chunks) + " ."
iterchunks = itersplit(seq,sentry)
buf = deque()
for chunk in islice(iterchunks, chunk_size-1):
buf.append(chunk)
if keep_first:
yield format_chunks(buf)
for chunk in iterchunks:
buf.append(chunk)
yield format_chunks(buf)
buf.popleft()
if keep_last:
while buf:
yield format_chunks(buf)
buf.popleft()
 
G

George Sakkis

Michael said:
I think the two versions below each give the 'correct' output wrt to the OP's
single test case. I measure chunkerMS2 to be faster than chunkerGS2 across all
chunk sizes, but this is all about the joins.

I conclude that chunkerGS's deque beats chunkerMS's list for large chunk_size (~
joining.

Although I speculate that the OP is really concerned about the chunking
algorithm rather than an exact output format, chunkerGS2 can do better
even when the chunks must be joined. Joining each chunk can (and
should) be done only once, not every time the chunk is yielded.
chunkerGS3 outperforms chunkerMS2 even more than the original versions:

* chunk_size=3
chunkerGS3: 1.17 seconds
chunkerMS2: 1.56 seconds
* chunk_size=30
chunkerGS3: 1.26 seconds
chunkerMS2: 6.35 seconds
* chunk_size=300
chunkerGS3: 2.20 seconds
chunkerMS2: 54.51 seconds


def chunkerGS3(seq, sentry='.', chunk_size=3, keep_first=False,
keep_last=False):
iterchunks = itersplit(seq,sentry)
buf = deque()
join = ' '.join
def append(chunk):
chunk.append(sentry)
buf.append(join(chunk))
for chunk in islice(iterchunks, chunk_size-1):
append(chunk)
if keep_first:
yield join(buf)
for chunk in iterchunks:
append(chunk)
yield join(buf)
buf.popleft()
if keep_last:
while buf:
yield join(buf)
buf.popleft()

...
OT: I will when timeit grows a capability for testing live objects rather than
'small code snippets'. Requiring source code input and passing arguments by
string substitution makes it too painful for interactive work. The need to
specify the number of repeats is an additional annoyance.

timeit is indeed somewhat cumbersome, but having a robust bug-free
timing function is worth the inconvenience IMO.

Best,
George
 
L

lh84777

thanks a lot to all, i help me to learn a lot !

(i finally use the generator trick, it is great...)

best regards.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,537
Members
45,021
Latest member
AkilahJaim

Latest Threads

Top