0

I'm attempting to write an iterator to take in a query and export rows in batches as a list of dicts. Here's a complete example of what I'm working with:

class QueryStream(collections.Iterator):
    def __init__(self, conn_details, query, max_rows=None, batch_size=2000):
        # Initialize vars.
        self.engine = dst.get_connection(conn_details)
        self.query = query
        self.max_rows = max_rows
        self.batch_size = batch_size
        self.fetched_rows = 0

        # Create a database cursor from query.
        self.conn = self.engine.raw_connection()
        self.cursor = self.conn.cursor()
        self.cursor.execute(self.query)

    def next(self):
        try:
            if self.max_rows:
                if self.max_rows <= self.fetched_rows:
                    # Maximum rows has been fetched, so stop iterating.
                    raise StopIteration
                elif self.max_rows <= self.batch_size:
                    # Max rowset is small enough to be done in one batch.
                    batch_size = self.max_rows
                elif self.max_rows - self.fetched_rows < self.batch_size:
                    # On the final batch, must fetch only remaining rows.
                    batch_size = self.max_rows - self.fetched_rows
            else:
                # Get default batch size.
                batch_size = self.batch_size

            batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
            if len(batch):  # 0 rows were returned, so we're probs at the end.
                self.fetched_rows += len(batch)
                print('Fetch {} rows so far.'.format(repr(self.fetched_rows)), file=sys.stderr)
                return batch
            else:
                raise StopIteration
        except StopIteration:
            self.cursor.close()
            self.conn.close()
            self.engine.dispose()
            raise StopIteration

The problem is this line:

batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]

throws this error:

AttributeError: 'tuple' object has no attribute 'items'

I was under the impression that a RowProxy object would be returned for every result in the result set, which supports dict-like operations (as is mentioned in this post), but it appears the results are just plain tuples. The SQLAlchemy docs are not 100% clear on what the expected type of the results from a cursor is, only offering this example of usage.

Question: Am I doing something wrong with my cursor usage? I need the results to be a list of dicts with column names as keys, but I don't know if that's possible without having RowProxy instead of tuples.

1 Answer 1

0

Well, it seems the problem has to do with using a raw connection:

        # Create a database cursor from query.
        self.conn = self.engine.raw_connection()
        self.cursor = self.conn.cursor()
        self.cursor.execute(self.query)

        ...

        batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]

was replaced with

        # Create a database cursor from query.
        self.engine = dst.get_connection(workspace_uuid, project_id)
        self.stream = self.engine.execution_options(stream_results=True).execute(self.query)

        ...

        batch = [dict(row.items()) for row in self.stream.fetchmany(batch_size)]

And all is well. I'm fortunately using a driver that supports stream_results (psycopg2):

stream_results – Available on: Connection, statement. Indicate to the dialect that results should be “streamed” and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the psycopg2, mysqldb and pymysql dialects.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.