0

I am trying to write and read to a stream without loading everything into memory at once. Here's what I would imagine working:

import io

stream = io.BytesIO()

def process_stream(stream):
  while True:
    chunk = stream.read(5).decode('utf-8')
    if not chunk:
      return
    yield chunk

# this would be a separate thread, but here we just do it in serial:
for i in range(3):
  stream.write(b'asdf')

for chunk in process_stream(stream):
  print('I read', chunk)

But this actually doesn't print out anything. I can get it working, but only with the following two changes, either of which requires that all the bytes are held in memory at once:

  • initializing stream = io.BytesIO(b'asdf' * 3) instead of incrementally writing
  • using stream.getvalue() instead of incrementally reading

I'm quite baffled that incremental writing can only be read by batch reading, and that incremental reading only works for batch writing. How can a get a constant-memory (assuming process_stream outpaces writing) solution working?

1
  • dask.org Commented May 20, 2021 at 23:38

1 Answer 1

1

When you write to the stream using for loop. Your seek ends up in the last position.

asdfasdfasdf|
            ^ (Seek)            

So when you try to read, well there is nothing after the last character, therefore you get nothing when reading the stream. A solution is to reposition the seek to the beginning of the stream so you can read it. For that we can use stream.seek(0)

|asdfasdfasdf
^ (Seek after calling stream.seek(0))            

Code:

import io

stream = io.BytesIO()


def process_stream(stream, chunk_size=5):
    while True:
        chunk = stream.read(chunk_size).decode('utf-8')
        if not chunk:
            return
        yield chunk


# this would be a separate thread, but here we just do it in serial:
for i in range(3):
    stream.write(b'asdf')

stream.seek(0) # Reset the seek so it is at the beginning
for chunk in process_stream(stream):
    print('I read', chunk)

Output:

I read asdfa
I read sdfas
I read df

More information: How the write(), read() and getvalue() methods of Python io.BytesIO work?

Sign up to request clarification or add additional context in comments.

1 Comment

This helps, but I'm still looking for a constant-memory solution. It seems that whenever .write is called, the cursor is moved to the end of the stream again. For instance, if I write 'a', seek 0, read, write 'b', read, I get ''. And if I seek 0 again after writing 'b', I get 'ab'. I'm looking for a solution where the 2nd read just gives 'b', the remaining unread bytes, and 'a' is freed from memory. Is BytesIO just not the right tool?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.