Reading into a buffer and writing from it at the same time

A

A. Farber

Hello,

I'm programming an embedded device which
receives data over USB device and saves it to
a file. Currently it isn't done in an effective way:

byte[] readBuffer = new byte[_readBufferSize];
.....
while (true) {
int bytesRead = in.read(readBuffer, offset,
readBuffer.length);
if (bytesRead == -1) {
break;
}
out.write(readBuffer, 0, bytesRead);
}

- because at any point the device
either reads from USB or writes to a file.

I would like to introduce a second thread
which would take over writing to the file.

So I've created this class for the buffer:

public CyclicBuffer(int len)
{
_buf = new byte[len];
_head = 0;
// the buffer is empty
_tail = _head;
}

public read(InputStream in) throws IOException
{
int nbytes;

/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/
if (_tail <= _head)
nbytes = in.read(_buf, _head, _buf.length - _head);
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/
else
nbytes = in.read(_buf, _head, _tail - _head);

if (nbytes <= 0)
return -1;

// advance to the next free position in buffer
_head = (_head + nbytes) % _buf.length;
}

public write(OutputStream out) throws IOException
{
int nbytes;

if (_tail == _head)
throw new IllegalStateException();
/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/
else if (_tail < _head)
nbytes = out.write(_buf, _head, _head - _tail);
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/
else
nbytes = out.write(_buf, _head, _buf.length - _tail);

if (nbytes <= 0)
return -1;

// advance to the next occupied position in buffer
_tail = (_tail + nbytes) % _buf.length;
}
}

My problem is that I can't wrap my head around -
where should I insert wait() and notify() calls
and on what objects, so that 2 threads can work
with this buffer simultaneously.

Also I wonder what length should I select for
this cyclic buffer, provided that reading and
writing take approximately same amount of
time (I've measured it in profiler) and that
USB data arrives in _readBufferSize chunks.

Thank you for any hints
Alex
 
A

A. Farber

Hello,

Won't this:http://java.sun.com/javase/6/docs/api/java/util/concurrent/locks/Read...
suit you better? It is lightweight, lock-free implementation.

unfortunately I don't have ReadWriteLock at the device.

I've come up with a solution that I've listed below,
but it is missing something minor - reports an
IllegalMonitorStateException even though
I have put synchronized(this) around the _writeLock.notify();
You should consider making _head and _tail volatile, you might end up
reading stalled value from cache without memory barrier.
(ReadwriteLock states memory barrier)

Could you explain this?

Regards
Alex

PS: Here is my current try:

CyclicBuffer readBuffer = new CyclicBuffer(2 *
_conn._readBufferSize);
.......
while (true) {
if (readBuffer.read(in) <= 0)
break;
readBuffer.write(out);


final class CyclicBuffer
{
private final static int UNKNOWN = 0;
private final static int EMPTY = 1;
private final static int FULL = 2;

/*
* _head shows to the next free position in the buffer
* _tail shows to the last occupied position in the buffer
* If _tail == _head then buffer is either full or empty (depends on
_state)
*/

private byte[] _buf;
private int _head;
private int _tail;
private int _state;
/* package */ Object _readLock;
/* package */ Object _writeLock;

public CyclicBuffer(int len)
{
_buf = new byte[len];
_head = 0;
_tail = 0;
_state = EMPTY;
_readLock = new Object();
_writeLock = new Object();
}

public int read(InputStream in) throws IOException
{
int nbytes;

if (_state == FULL)
throw new IllegalStateException();

if (_tail <= _head) {
/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/

nbytes = in.read(_buf, _head, _buf.length - _head);
} else {
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/

nbytes = in.read(_buf, _head, _tail - _head);
}

if (nbytes <= 0)
return -1;

// advance to the next free position in buffer
_head = (_head + nbytes) % _buf.length;

synchronized(this) {
if (_tail == _head) {
_state = FULL;
try {
// the buffer is full, so stop reading for now
_readLock.wait();
} catch (InterruptedException e) {
}
} else
_state = UNKNOWN;

// something has been read into buffer, so allow writing
again
_writeLock.notify();
}

return nbytes;
}

public void write(OutputStream out) throws IOException
{
int nbytes;

if (_state == EMPTY)
throw new IllegalStateException();

if (_tail < _head) {
/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/

nbytes = _head - _tail;
out.write(_buf, _tail, nbytes);
} else {
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/

nbytes = _buf.length - _tail;
out.write(_buf, _tail, nbytes);

if (_head != 0) {
out.write(_buf, 0, _head);
nbytes += _head;
}
}

// advance to the next occupied position in buffer
_tail = (_tail + nbytes) % _buf.length;

synchronized(this) {
if (_tail == _head) {
_state = EMPTY;
try {
// the buffer is empty, so stop writing for now
_writeLock.wait();
} catch (InterruptedException e) {
}
} else
_state = UNKNOWN;

// something has been written out, so allow reading again
_readLock.notify();
}
}
}


(and I probably can get rid of the _state later)
 
R

Rzeźnik

Hello,



unfortunately  I don't have ReadWriteLock at the device.

I've come up with a solution that I've listed below,
but it is missing something minor - reports an
IllegalMonitorStateException  even though
I have put synchronized(this) around the _writeLock.notify();

You do not own monitor on which you notify. notify() must be called on
a monitor which you own, you've acquired this' monitor but try to
notify _writeLock's monitor.
Could you explain this?

Not really :) I am not sure whether I can take this apart without
wreaking confusion. Try:
http://jeremymanson.blogspot.com/2008/11/what-volatile-means-in-java.html
Look up 'java memory model'.
(synchronized statement states memory barrier as well, so you probably
do not need to anyway)

By the way, have you seen Patricia's response? Seems to be elegant and
easy to implement.
 
R

Rzeźnik

I have not yet seen your first message in this thread, although I got
the reply to it.

Unfortunately, the BlockingQueue solution may not be available - a
device that lacks ReadWriteLock may be missing the rest of
java.util.concurrent.

Well, you may be right. Didn't think of it. Probably he could simulate
it using Vector.
 
P

Philipp

unfortunately  I don't have ReadWriteLock at the device.

Not sure I interpret this correctly, but if you are running an old
(eg. 1.3) JVM you can use the backport of the util.concurrent library
(or even pick out just the few classes you need)
Find it at http://backport-jsr166.sourceforge.net/

This lets you use the possibilities proposed by Patricia and others

HTH Phil
PS: Getting concurrency right, especially in the old memory model is
not an easy task. If you can, stick to existing libs instead of
building your own.
 
R

Roedy Green

My problem is that I can't wrap my head around -
where should I insert wait() and notify() calls
and on what objects, so that 2 threads can work
with this buffer simultaneously.

For this sort of problem, the trick is to find something in
java.util.concurrent.* to do the thread co-ordinating.

The definitive text is:
Java Concurrency in Practice
by the dream team:
Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes,
Doug Lea

http://www.amazon.com/gp/product/03...mp=1789&creative=9325&creativeASIN=0321349601


I give an overview at
http://mindprod.com/jgloss/threadsafe.html

Follow links, but best, get the book. It is a meaty work. This is
fairly tricky stuff.


--
Roedy Green Canadian Mind Products
http://mindprod.com

I advocate that super programmers who can juggle vastly more complex balls than average guys can, should be banned, by management, from dragging the average crowd into system complexity zones where the whole team will start to drown.
~ Jan V.
 
A

A. Farber

Thank you, but there is unfortunately
no java.util.concurrent.* in my embedded device.
And I can't backport parts of new JVM to finish my task.

I'll try to finish my custom solution. I've switched to using
several buffers of a fixed size as suggested by Patricia,
but I still have some bug there (final buffer is not being
written out and also sometimes it deadlocks).

It's only 50 lines, but tricky to fix it up...

Regards
Alex

while (true) {
if (readBuffer.read(in) <= 0)
break;
}
......

final class CyclicBuffer
{
private static int NBUFFERS = 4;

private byte[][] _buffers;
private int[] _bytesRead;
private int _readIndex;
private int _writeIndex;

public CyclicBuffer(int size)
{
_buffers = new byte[NBUFFERS][size];
_bytesRead = new int[NBUFFERS];
_readIndex = 0;
_writeIndex = 0;
}

public int read(InputStream in) throws IOException
{
_bytesRead[_readIndex] = in.read(_buffers[_readIndex]);

if (_bytesRead[_readIndex] <= 0) {
return -1;
}

synchronized(this) {
_readIndex = (_readIndex + 1) % NBUFFERS;
// something has been read, so allow writing again
notify();

if (_readIndex == _writeIndex) {
try {
// all buffers are full, so stop reading for now
wait();
} catch (InterruptedException e) {
}
}
}

return 1;
}

public int write(OutputStream out) throws IOException
{
if (_bytesRead[_writeIndex] <= 0) {
return -1;
}

out.write(_buffers[_writeIndex], 0, _bytesRead[_writeIndex]);

synchronized(this) {
_writeIndex = (_writeIndex + 1) % NBUFFERS;
// something has been written out, so allow reading again
notify();

if (_readIndex == _writeIndex) {
try {
// all buffers are empty, so stop writing for now
wait();
} catch (InterruptedException e) {
}
}
}

return 1;
}
}
 
M

Marcin Rze¼nicki

Thank you, but there is unfortunately
no  java.util.concurrent.* in my embedded device.
And I can't backport parts of new JVM to finish my task.

I'll try to finish my custom solution. I've switched to using
several buffers of a fixed size as suggested by Patricia,
but I still have some bug there (final buffer is not being
written out and also sometimes it deadlocks).

It's only 50 lines, but tricky to fix it up...

My thoughts:
1. Both read and writes are not in a critical section. in.read and
out.write may end up reading the same buffer concurrently
2. It is not guaranteed that bytesRead array contents are consistent
between threads see JSR-133 "Synchronization ensures that memory
writes by a thread before or during a synchronized block are made
visible in a predictable manner to other threads which synchronize on
the same monitor". This check then
if (_bytesRead[_writeIndex] <= 0) {
return -1;
}
is not guaranteed to see effects of:
_bytesRead[_readIndex] = in.read(_buffers[_readIndex]);
3. Your logic has errors. Consider:
Thread A calls write (writeIndex = 1)
Thread A calls write again (writeIndex = 2)
Thread B calls read (readIndex = 1)
Thread B calls read (readIndex = 2) <= wait on monitor
Thread A calls read (readIndex = 3 ERROR!) B awaken
B enters critical section (readIndex = 4 ERROR!)
 
M

Mike Amling

A. Farber said:
final class CyclicBuffer
{
private static int NBUFFERS = 4;

private byte[][] _buffers;
private int[] _bytesRead;
private int _readIndex;
...
public int read(InputStream in) throws IOException
{
_bytesRead[_readIndex] = in.read(_buffers[_readIndex]);

if (_bytesRead[_readIndex] <= 0) {
return -1;
}

synchronized(this) {
_readIndex = (_readIndex + 1) % NBUFFERS;

I haven't read all your code, but this part does not seem right. If
this read(InputStream) is only called from one Thread, then you don't
need synchronized(). If it's called from more than one Thread, then you
need to refrain from using _readIndex outside the synchronized(). For
example, what prevents some other Thread from bumping _readIndex after
your use of it in the first _bytesRead[_readIndex] and before your use
of it in _buffers[_readIndex] or the second _bytesRead[_readIndex]?

--Mike Amling
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,012
Latest member
RoxanneDzm

Latest Threads

Top