Implementing a robust async stream reader

Posted by Jon on Stack Overflow See other posts from Stack Overflow or by Jon
Published on 2010-12-21T23:16:45Z Indexed on 2010/12/21 23:54 UTC
Read the original article Hit count: 406

Filed under:
|
|
|

I recently provided an answer to this question: C# - Realtime console output redirection.

As often happens, explaining stuff (here "stuff" was how I tackled a similar problem) leads you to greater understanding and/or, as is the case here, "oops" moments. I realized that my solution, as implemented, has a bug. The bug has little practical importance, but it has an extremely large importance to me as a developer: I can't rest easy knowing that my code has the potential to blow up.

Squashing the bug is the purpose of this question. I apologize for the long intro, so let's get dirty.

I wanted to build a class that allows me to receive input from a Stream in an event-based manner. The stream, in my scenario, is guaranteed to be a FileStream and there is also an associated StreamReader already present to leverage.

The public interface of the class is this:

public class MyStreamManager {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

Obviously this specific scenario has to do with a console's standard output, but that is a detail and does not play an important role. StartSendingEvents and StopSendingEvents do what they advertise; for the purposes of this discussion, we can assume that events are always being sent without loss of generality.

The class uses these two fields internally:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

The functionality of the class is implemented in the methods below. To get the ball rolling:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

To read data out of the Stream without blocking, and also without requiring a carriage return char, BeginRead is called:

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

The challenging part:

BeginRead requires using a buffer. This means that when reading from the stream, it is possible that the bytes available to read ("incoming chunk") are larger than the buffer. Since we are only handing off data from the stream to a consumer, and that consumer may well have inside knowledge about the size and/or format of these chunks, I want to call event subscribers exactly once for each chunk. Otherwise the abstraction breaks down and the subscribers have to buffer the incoming data and reconstruct the chunks themselves using said knowledge. This is much less convenient to the calling code, and detracts from the usefulness of my class.

To this end, if the buffer is full after EndRead, we don't send its contents to subscribers immediately but instead append them to a StringBuilder. The contents of the StringBuilder are only sent back whenever there is no more to read from the stream (thus preserving the chunks).

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

After any read which is not enough to fill the buffer, all accumulated data is sent to the subscribers:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(I know that as long as there are no subscribers the data gets accumulated forever. This is a deliberate decision).

The good

This scheme works almost perfectly:

  • Async functionality without spawning any threads
  • Very convenient to the calling code (just subscribe to an event)
  • Maintains the "chunkiness" of the data; this allows the calling code to use inside knowledge of the data without doing any extra work
  • Is almost agnostic to the buffer size (it will work correctly with any size buffer irrespective of the data being read)

The bad

That last almost is a very big one. Consider what happens when there is an incoming chunk with length exactly equal to the size of the buffer. The chunk will be read and buffered, but the event will not be triggered. This will be followed up by a BeginRead that expects to find more data belonging to the current chunk in order to send it back all in one piece, but... there will be no more data in the stream.

In fact, as long as data is put into the stream in chunks with length exactly equal to the buffer size, the data will be buffered and the event will never be triggered.

This scenario may be highly unlikely to occur in practice, especially since we can pick any number for the buffer size, but the problem is there.

Solution?

Unfortunately, after checking the available methods on FileStream and StreamReader, I can't find anything which lets me peek into the stream while also allowing async methods to be used on it.

One "solution" would be to have a thread wait on a ManualResetEvent after the "buffer filled" condition is detected. If the event is not signaled (by the async callback) in a small amount of time, then more data from the stream will not be forthcoming and the data accumulated so far should be sent to subscribers. However, this introduces the need for another thread, requires thread synchronization, and is plain inelegant.

Specifying a timeout for BeginRead would also suffice (call back into my code every now and then so I can check if there's data to be sent back; most of the time there will not be anything to do, so I expect the performance hit to be negligible). But it looks like timeouts are not supported in FileStream.

Since I imagine that async calls with timeouts are an option in bare Win32, another approach might be to PInvoke the hell out of the problem. But this is also undesirable as it will introduce complexity and simply be a pain to code.

Is there an elegant way to get around the problem?

Thanks for being patient enough to read all of this.

© Stack Overflow or respective owner

Related posts about c#

Related posts about asynchronous