You've asked about two separate things in your question. One is how to pipe the CSV from a COPY FROM into a COPY TO; the other is how to pipe rows from a SELECT query into an INSERT.
Piping rows from a SELECT query into an INSERT is kind of a lie, because while you can stream rows from a SELECT query, you can't stream rows into an INSERT, so you'll have to perform multiple INSERTs in batches. This approach has high overhead due to the INSERTs but has fewer issues with data loss due to round-tripping to CSV. I'll focus on why piping the CSV from a COPY FROM into a COPY TO is tricky and how you can accomplish it.
psycopg2 lets you do COPY command via the (synchronous) copy_expert function. It requires you to pass in a readable file object for COPY FROM and a writable file object for COPY TO. To accomplish what you described, you need two separate threads to run each of the two commands, a file object with a write() method that blocks if the COPY FROM command can't keep up, and a file object with a read() method that blocks if the COPY TO command can't keep up. This is a classic producer-consumer problem, which can be tricky to get right.
Here's one that I wrote quickly (Python 3). It is probably full of bugs. Let me know if you find a deadlock (edits welcome).
from threading import Lock, Condition, Thread
class Output(object):
def __init__(self, pipe):
self.pipe = pipe
def read(self, count):
with self.pipe.lock:
# wait until pipe is still closed or buffer is not empty
while not self.pipe.closed and len(self.pipe.buffer) == 0:
self.pipe.empty_cond.wait()
if len(self.pipe.buffer) == 0:
return ""
count = max(count, len(self.pipe.buffer))
res, self.pipe.buffer = \
self.pipe.buffer[:count], self.pipe.buffer[count:]
self.pipe.full_cond.notify()
return res
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.full_cond.notify()
class Input(object):
def __init__(self, pipe):
self.pipe = pipe
def write(self, s):
with self.pipe.lock:
# wait until pipe is closed or buffer is not full
while not self.pipe.closed \
and len(self.pipe.buffer) > self.pipe.bufsize:
self.pipe.full_cond.wait()
if self.pipe.closed:
raise Exception("pipe closed")
self.pipe.buffer += s
self.pipe.empty_cond.notify()
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.empty_cond.notify()
class FilePipe(object):
def __init__(self, bufsize=4096):
self.buffer = b""
self.bufsize = 4096
self.input = Input(self)
self.output = Output(self)
self.lock = Lock()
self.full_cond = Condition(self.lock)
self.empty_cond = Condition(self.lock)
self.closed = False
Usage example:
def read_thread(conn, f):
conn.cursor().copy_expert("COPY foo TO STDIN;", f)
f.close()
conn.close()
engine.execute(
"CREATE TABLE foo(id int);"
"CREATE TABLE bar(id int);"
"INSERT INTO foo (SELECT generate_series(1, 100000) AS id);"
"COMMIT;")
input_conn = engine.raw_connection()
output_conn = engine.raw_connection()
pipe = FilePipe()
t = Thread(target=read_thread, args=(input_conn, pipe.input))
t.start()
output_cur = output_conn.cursor()
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output)
output_conn.commit()
output_conn.close()
t.join()
print(list(engine.execute("SELECT count(*) FROM bar;"))) # 100000