2009-10-22

An Infinite Stream Of Bytes

No, I'm not about to wax poetic about the deep ontological issues raised in The Matrix, or speak meaningfully about how transient the modern world of communication is and how the artifacts of our lifetime have become ephemeral such that our posterity will not be able to remember us, even if they wanted to.

Instead I'm going to post a code snippet that solves an annoying little scenario that comes up every now and again when writing parsers.

Basically, it goes like this:

You're writing a parser, and you need to check every byte in a stream of bytes coming from a file/network/etc.. You might need to read forward or read backward a little, to match a multi-byte pattern or value within n bytes of another value. You figure instead of "peeking and seeking" against the stream (what it's read-only!?!?), your parser can just stored the state, and still only look at a single byte at a time. That's great and all, and you do a quick implementation using stream.ReadByte, which seems to work...

Except it's slow. You know from experience that block reads are way faster, and you want to read a block of data that's say 1k or 4k from your stream, and then parse that, fetch another block, parse that, etc... but what if your pattern straddles two blocks? What if the first byte of a two byte sequence is the last byte in a block and the next block's first byte is the second character? Now your parser needs to stop what it's doing, exit the loop, go grab some more data, then restart it's iteration over that.. You could build all that behaviour into your parser (for every parser that you write).. but it's non-trival to deal with. In fact it's a real pain in the butt to refactor a parser to work that way.

Also, you think to yourself "Man... It would be SOOOOooooo much nicer if I could just write a foreach loop, and like get every byte in the stream in one bit long iteration... Why doesn't System.IO.Stream implement IEnumerable?!?" It totally makes sense that it should...

Anyhow, story's over. Here's the code to solve it:


public static IEnumerable<byte> GetBytesFromStream(Stream stream)
{
const int blockSize = 1024;

byte[] buffer = new byte[blockSize];
int bytesRead;

while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
{
for (int i = 0; i < bytesRead; i++)
{
yield return buffer[i];
}
}
}


And in case it's not obvious, I'll explain what this little guy does. It does a block read from the stream (adjust your blocksize to suit or make it a parameter), iterates over the block, uses the yield keyword to return bytes via the IEnumerable<T> interface. The while loop checks the return value of stream.Read() to see if it returns zero, which means, basically, the stream is done (EOF). If there was a partial read (e.g. less than your blocksize buffer) bytesRead will be the amount that DID successfully read, and so your for loop that is iterating over the block uses bytesRead to ensure we only return valid data (if we had used buffer.Length or blockSize, and had a partial read, the stuff after the "new data" would be data from the last read. NOT COOL!).

You could stick this method in your utility class if you'd like, or make a wrapper class that wraps Stream and implements IEnumerable<byte>... whatever you want. Maybe you want to be all modern and cool and make it an extension method for Stream.

Here's an example wrapper class:


public class EnumerableStream : Stream, IEnumerable<byte>
{
private readonly Stream _baseStream;

public EnumerableStream(Stream stream)
{
_baseStream = stream;
}

public IEnumerator<byte> GetEnumerator()
{
var bytes = GetBytesFromStream(_baseStream);
return bytes.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

private static IEnumerable<byte> GetBytesFromStream(Stream stream)
{
const int blockSize = 1024;

byte[] buffer = new byte[blockSize];
int bytesRead;

while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
{
for (int i = 0; i < bytesRead; i++)
{
yield return buffer[i];
}
}
}

public override bool CanRead
{
get { return _baseStream.CanRead; }
}

public override bool CanSeek
{
get { return _baseStream.CanSeek; }
}

public override bool CanWrite
{
get { return _baseStream.CanWrite; }
}

public override void Flush()
{
_baseStream.Flush();
}

public override long Length
{
get { return _baseStream.Length; }
}

public override long Position
{
get
{
return _baseStream.Position;
}
set
{
_baseStream.Position = value;
}
}

public override int Read(byte[] buffer, int offset, int count)
{
return _baseStream.Read(buffer, offset, count);
}

public override long Seek(long offset, SeekOrigin origin)
{
return _baseStream.Seek(offset, origin);
}

public override void SetLength(long value)
{
_baseStream.SetLength(value);
}

public override void Write(byte[] buffer, int offset, int count)
{
_baseStream.Write(buffer, offset, count);
}
}


And an example of the extension method way...


public static class StreamExtensions
{
public static IEnumerable<byte> GetBytes(this Stream stream)
{
const int blockSize = 1024;

byte[] buffer = new byte[blockSize];
int bytesRead;

while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
{
for (int i = 0; i < bytesRead; i++)
{
yield return buffer[i];
}
}
}
}


Enjoy!

1 comment:

contextfree said...

I have a bunch of abstractions over Stream for use by my parsers that let you do stuff like ReadUntil(Regex regex) or multi-character peeks (saving to a buffer). I'm not sure if all of them were good ideas in retrospect.