I have been having a lot of trouble reading text a line at a time from a pipe in Python and determining when the socket has been closed on the writing end. I have had the problem when communicating with a subprocess, but to for testing, I wrote some code that reads text lines from the pipe in a thread while code in the main thread writes to the pipe in small chunks.
The issue is that, when reading, I cannot seem to detect when the writer has closed the pipe. I see that the final output (that does not end with a newline character) is received by readline, and the io object presumably detected an EOF to know to do that, but there seems to be no reasonable/clean way for my Python code to know that happened.
Does anyone know the appropriate way to deal with this?
In the code below, the read loop never exits, and thread.join() blocks until I break out of the program using Ctrl+C:
from datetime import datetime
from itertools import batched
import os
from select import select
from threading import Thread
from time import sleep
# Each superscript digit encodes to 3 bytes.
text = '⁰¹²\n³\n⁴\n⁵⁶\n⁷⁸⁹⁰¹²\n³'
text_bytes = bytes(text, 'utf8')
pipe_r, pipe_w = os.pipe()
wb_stream = open(pipe_w, 'wb', buffering=0)
r_stream = open(pipe_r, 'r')
got_chunks = []
def read_loop():
prev_timestamp = datetime.now()
while True:
sleep(1)
cur_timestamp = datetime.now()
print('A', (cur_timestamp - prev_timestamp).total_seconds())
prev_timestamp = cur_timestamp
rr, wr, exc = select([r_stream], [r_stream], [r_stream], 0)
if r_stream.closed:
break
if exc:
break
if not rr:
continue
chunk = r_stream.readline()
print('B', (cur_timestamp - prev_timestamp).total_seconds())
prev_timestamp = cur_timestamp
if chunk:
print('got chunk', repr(chunk))
got_chunks.append(chunk)
thread = Thread(target=read_loop)
thread.start()
for batch in batched(text_bytes, 4):
bytes_chunk = bytes(batch)
sleep(1.6)
wb_stream.write(bytes_chunk)
wb_stream.close()
thread.join()
print(repr(got_chunks))
Output:
A 1.000314
A 1.000593
B 0.0
got chunk '⁰¹²\n'
A 2.200882
A 1.000623
B 0.0
got chunk '³\n'
A 1.001111
A 1.000423
B 0.0
got chunk '⁴\n'
A 1.00083
B 0.0
got chunk '⁵⁶\n'
A 2.398835
A 1.000746
B 0.0
got chunk '⁷⁸⁹⁰¹²\n'
A 5.400978
A 1.000434
B 0.0
got chunk '³'
A 1.000722
B 0.0
A 1.000428
B 0.0
A 1.000404
B 0.0
^CTraceback (most recent call last):
File "/home/stevjorg/tmp2/foo.py", line 58, in <module>
thread.join()
File "/home/stevjorg/.pyenv/versions/3.12.11/lib/python3.12/threading.py", line 1149, in join
self._wait_for_tstate_lock()
File "/home/stevjorg/.pyenv/versions/3.12.11/lib/python3.12/threading.py", line 1169, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt