1

I have been having a lot of trouble reading text a line at a time from a pipe in Python and determining when the socket has been closed on the writing end. I have had the problem when communicating with a subprocess, but to for testing, I wrote some code that reads text lines from the pipe in a thread while code in the main thread writes to the pipe in small chunks.

The issue is that, when reading, I cannot seem to detect when the writer has closed the pipe. I see that the final output (that does not end with a newline character) is received by readline, and the io object presumably detected an EOF to know to do that, but there seems to be no reasonable/clean way for my Python code to know that happened.

Does anyone know the appropriate way to deal with this?

In the code below, the read loop never exits, and thread.join() blocks until I break out of the program using Ctrl+C:

from datetime import datetime
from itertools import batched
import os
from select import select
from threading import Thread
from time import sleep


# Each superscript digit encodes to 3 bytes.
text = '⁰¹²\n³\n⁴\n⁵⁶\n⁷⁸⁹⁰¹²\n³'
text_bytes = bytes(text, 'utf8')

pipe_r, pipe_w = os.pipe()

wb_stream = open(pipe_w, 'wb', buffering=0)
r_stream = open(pipe_r, 'r')


got_chunks = []


def read_loop():
    prev_timestamp = datetime.now()
    while True:
        sleep(1)
        cur_timestamp = datetime.now()
        print('A', (cur_timestamp - prev_timestamp).total_seconds())
        prev_timestamp = cur_timestamp

        rr, wr, exc = select([r_stream], [r_stream], [r_stream], 0)
        if r_stream.closed:
            break
        if exc:
            break
        if not rr:
            continue

        chunk = r_stream.readline()
        print('B', (cur_timestamp - prev_timestamp).total_seconds())
        prev_timestamp = cur_timestamp

        if chunk:
            print('got chunk', repr(chunk))
            got_chunks.append(chunk)


thread = Thread(target=read_loop)

thread.start()

for batch in batched(text_bytes, 4):
    bytes_chunk = bytes(batch)
    sleep(1.6)
    wb_stream.write(bytes_chunk)

wb_stream.close()

thread.join()

print(repr(got_chunks))

Output:

A 1.000314
A 1.000593
B 0.0
got chunk '⁰¹²\n'
A 2.200882
A 1.000623
B 0.0
got chunk '³\n'
A 1.001111
A 1.000423
B 0.0
got chunk '⁴\n'
A 1.00083
B 0.0
got chunk '⁵⁶\n'
A 2.398835
A 1.000746
B 0.0
got chunk '⁷⁸⁹⁰¹²\n'
A 5.400978
A 1.000434
B 0.0
got chunk '³'
A 1.000722
B 0.0
A 1.000428
B 0.0
A 1.000404
B 0.0
^CTraceback (most recent call last):
  File "/home/stevjorg/tmp2/foo.py", line 58, in <module>
    thread.join()
  File "/home/stevjorg/.pyenv/versions/3.12.11/lib/python3.12/threading.py", line 1149, in join
    self._wait_for_tstate_lock()
  File "/home/stevjorg/.pyenv/versions/3.12.11/lib/python3.12/threading.py", line 1169, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

1 Answer 1

3

This should work, as soon as readline() returns an empty string, that means the pipe is closed. That then would evaluate to False, so then the loop exits. (Eliminating the if clause):

def read_loop():
    while chunk := r_stream.readline():
        print('got chunk', repr(chunk))
        got_chunks.append(chunk)
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks! As often happens, I was very much over-complicating things. I guess that also means I could say for line in r_stream. It would be nice, of course, if the Python docs for readline in docs.python.org/3/library/io.html#io.IOBase said something about this. I guess one should be able to infer that from the docs for read in docs.python.org/3/library/io.html#io.RawIOBase .

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.