0

In an ETL process I want to regularly query database "A" (for, e.g., all rows with timestamps greater than the last run of the program) and move that data into database "B" for further processing. Both are PostgreSQL databases. I'd like to do this data transfer in a Python script, using SQLAlchemy to connect to both databases. What's the least messy, least fragile way to do so?

I know that Postgres's COPY TO and COPY FROM commands allow table rows and query results to be transferred from one database server to another via an intermediate file (see here). From a Unix command line you can even pipe the output of database A as input to database B without the potentially large intermediate file (see excellent instructions here). What I'd like to know is how to do that last trick in a Python script using two SQLAlchemy connections, rather than using subprocess to run a shell command.

import sqlalchemy
dbA = sqlalchemy.create_engine(connection_string_A)
dbB = sqlalchemy.create_engine(connection_string_B)

# how do I do this part?
dbA.execute('SELECT (column) FROM widgets...') # somehow pipe output into...
dbB.execute('INSERT INTO widgets (column) ...') # without holding lots of data in memory or on disk

For the record I'm not using any ORM features of SQLAlchemy at this point, just naked SQL queries.

2
  • there are many records to be migrated? Commented May 30, 2017 at 17:53
  • Eventually there will be several such tasks in my ETL routine, some may have many records and others may only have a few. So I'm looking for a robust solution that will work even at large scale. Commented May 31, 2017 at 3:01

2 Answers 2

1

You've asked about two separate things in your question. One is how to pipe the CSV from a COPY FROM into a COPY TO; the other is how to pipe rows from a SELECT query into an INSERT.

Piping rows from a SELECT query into an INSERT is kind of a lie, because while you can stream rows from a SELECT query, you can't stream rows into an INSERT, so you'll have to perform multiple INSERTs in batches. This approach has high overhead due to the INSERTs but has fewer issues with data loss due to round-tripping to CSV. I'll focus on why piping the CSV from a COPY FROM into a COPY TO is tricky and how you can accomplish it.

psycopg2 lets you do COPY command via the (synchronous) copy_expert function. It requires you to pass in a readable file object for COPY FROM and a writable file object for COPY TO. To accomplish what you described, you need two separate threads to run each of the two commands, a file object with a write() method that blocks if the COPY FROM command can't keep up, and a file object with a read() method that blocks if the COPY TO command can't keep up. This is a classic producer-consumer problem, which can be tricky to get right.

Here's one that I wrote quickly (Python 3). It is probably full of bugs. Let me know if you find a deadlock (edits welcome).

from threading import Lock, Condition, Thread


class Output(object):
    def __init__(self, pipe):
        self.pipe = pipe

    def read(self, count):
        with self.pipe.lock:
            # wait until pipe is still closed or buffer is not empty
            while not self.pipe.closed and len(self.pipe.buffer) == 0:
                self.pipe.empty_cond.wait()

            if len(self.pipe.buffer) == 0:
                return ""

            count = max(count, len(self.pipe.buffer))
            res, self.pipe.buffer = \
                self.pipe.buffer[:count], self.pipe.buffer[count:]
            self.pipe.full_cond.notify()
        return res

    def close(self):
        with self.pipe.lock:
            self.pipe.closed = True
            self.pipe.full_cond.notify()


class Input(object):
    def __init__(self, pipe):
        self.pipe = pipe

    def write(self, s):
        with self.pipe.lock:
            # wait until pipe is closed or buffer is not full
            while not self.pipe.closed \
                    and len(self.pipe.buffer) > self.pipe.bufsize:
                self.pipe.full_cond.wait()

            if self.pipe.closed:
                raise Exception("pipe closed")

            self.pipe.buffer += s
            self.pipe.empty_cond.notify()

    def close(self):
        with self.pipe.lock:
            self.pipe.closed = True
            self.pipe.empty_cond.notify()


class FilePipe(object):
    def __init__(self, bufsize=4096):
        self.buffer = b""
        self.bufsize = 4096
        self.input = Input(self)
        self.output = Output(self)
        self.lock = Lock()
        self.full_cond = Condition(self.lock)
        self.empty_cond = Condition(self.lock)
        self.closed = False

Usage example:

def read_thread(conn, f):
    conn.cursor().copy_expert("COPY foo TO STDIN;", f)
    f.close()
    conn.close()

engine.execute(
    "CREATE TABLE foo(id int);"
    "CREATE TABLE bar(id int);"
    "INSERT INTO foo (SELECT generate_series(1, 100000) AS id);"
    "COMMIT;")
input_conn = engine.raw_connection()
output_conn = engine.raw_connection()
pipe = FilePipe()

t = Thread(target=read_thread, args=(input_conn, pipe.input))
t.start()
output_cur = output_conn.cursor()
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output)
output_conn.commit()
output_conn.close()
t.join()

print(list(engine.execute("SELECT count(*) FROM bar;")))  # 100000
Sign up to request clarification or add additional context in comments.

1 Comment

It'd be more accurate to say that I'm asking one question, but with two suggestions or clues as to what the answer might look like. You've offered a fascinating solution, but I suppose if it's this complicated, better to just use a Unix shell command with a pipe.
0

If the data is not very large (can be hold within main memory of single host), you can try my open source ETL toolkit based on pandas/python3/sqlalchemy, bailaohe/parade, I’ve provided a tutorial. You can make use of pandas to do transformation on your data and return the result dataframe directly. With a little configuration, the pandas dataframe can be dumped to different target connection.

For your issue, you can use parade to generate a simple sql-type task as follows:

# -*- coding:utf-8 -*-
from parade.core.task import SqlETLTask
from parade.type import stdtypes


class CopyPostgres(SqlETLTask):

    @property
    def target_conn(self):
        """
        the target connection to write the result
        :return:
        """
        return 'target_postgres'

    @property
    def source_conn(self):
        """
        the source connection to write the result
        :return:
        """
        return 'source_postgres'

    @property
    def etl_sql(self):
        """
        the single sql statement to process etl
        :return:
        """
        return """SELECT (column) FROM widgets"""

You can even compose a DAG-workflow with multiple tasks and schedule the workflow directly with Parade. Hope this will be helpful.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.