3

I have a python program that connects to a PostGreSQL database. In this database I have quite a lot of data (around 1.2 billion rows). Luckily I don't have to analyse all of those rows at the same time.

Those 1.2 billion rows are spread on several tables (around 30). Currently I am accessing a table called table_3, in which I want to access all the rows that has a specific "did" value (as the column is called).

I have counted the rows using a SQL command:

SELECT count(*) FROM table_3 WHERE did='356002062376054';

which returns with 157 million rows.

I will perform some "analysis" on all of these rows (extracting 2 specific values) and doing some calculations on these values, followed by writing them to a dictionary and then save them back on the PostGreSQL in a different table.

The problem is I'm am creating a lot of lists and dictionaries in managing all this I end up running out of memory even though I am using Python 3 64 bit and have 64 GB of RAM.

Some code:

CONNECTION = psycopg2.connect('<psycopg2 formatted string>')
CURSOR = CONNECTION.cursor()

DID_LIST = ["357139052424715",
            "353224061929963",
            "356002064810514",
            "356002064810183",
            "358188051768472",
            "358188050598029",
            "356002061925067",
            "358188056470108",
            "356002062376054",
            "357460064130045"]

SENSOR_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9,
               10, 11, 12, 13, 801, 900, 901,
               902, 903, 904, 905, 906, 907,
               908, 909, 910, 911]

for did in did_list:
    table_name = did
    for sensor_id in sensor_list:
        rows = get_data(did, sensor_id)
        list_object = create_standard_list(sensor_id, rows)  # Happens here
        formatted_list = format_table_dictionary(list_object) # Or here
        pushed_rows = write_to_table(table_name, formatted_list) #write_to_table method is omitted as that is not my problem.

def get_data(did, table_id):
    """Getting data from postgresql."""
    table_name = "table_{0}".format(table_id)
    query = """SELECT * FROM {0} WHERE did='{1}'
               ORDER BY timestamp""".format(table_name, did)

    CURSOR.execute(query)
    CONNECTION.commit()
    
    return CURSOR

def create_standard_list(sensor_id, data):
    """Formats DB data to dictionary"""
    list_object = []

    print("Create standard list")
    for row in data: # data is the psycopg2 CURSOR
        row_timestamp = row[2]
        row_data = row[3]

        temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
                       "data": row_data}

        list_object.append(temp_object)

    return list_object


def format_table_dictionary(list_dict):
    """Formats dictionary to simple data
       table_name = (dates, data_count, first row)"""
    print("Formatting dict to DB")
    temp_today = 0
    dict_list = []
    first_row = {}
    count = 1

    for elem in list_dict:
        # convert to seconds
        date = datetime.fromtimestamp(elem['timestamp'] / 1000)
        today = int(date.strftime('%d'))
        if temp_today is not today:
            if not first_row:
                first_row = elem['data']
            first_row_str = str(first_row)
            dict_object = {"sensor_id": elem['sensor_id'],
                           "date": date.strftime('%d/%m-%Y'),
                           "reading_count": count,
                           # size in MB of data
                           "approx_data_size": (count*len(first_row_str)/1000),
                           "time": date.strftime('%H:%M:%S'),
                           "first_row": first_row}

            dict_list.append(dict_object)
            first_row = {}
            temp_today = today
            count = 0
        else:
            count += 1

    return dict_list

My error happens somewhere around creating either of the two lists as marked with comments in my code. And it represents with my computer stopping responding, and eventually logging me out. I am running windows 10 if that is some importance.

I know the first list I create with the "create_standard_list" method could be excluded and that code could be run in the "format_table_dictionary" code, and thereby avoid a list with 157 mio element in memory, but I think that some of the other tables that I will run into will have similar problems and might be even larger, so I thought of optimizing it all right now, but I am unsure of what I could do?

I guess writing to a file wouldn't really help a whole lot as I would have to read that file and thereby putting it back into memory all again?

Minimalist example

I have a table

---------------------------------------------------------------
|Row 1 | did | timestamp | data | unused value | unused value |
|Row 2 | did | timestamp | data | unused value | unused value |
....
---------------------------------

table = [{ values from above row1 }, { values from above row2},...]

connection = psycopg2.connect(<connection string>)
cursor = connection.cursor()

table = cursor.execute("""SELECT * FROM table_3 WHERE did='356002062376054'
                          ORDER BY timestamp""")

extracted_list = extract(table)
calculated_list = calculate(extracted_list)
... write to db ...

def extract(table):
    """extract all but unused values"""
    new_list = []
    for row in table:
        did = row[0]
        timestamp = row[1]
        data = row[2]

        a_dict = {'did': did, 'timestamp': timestamp, 'data': data}
        new_list.append(a_dict)

    return new_list


def calculate(a_list):
    """perform calculations on values"""
    dict_list = []
    temp_today = 0
    count = 0
    for row in a_list:
        date = datetime.fromtimestamp(row['timestamp'] / 1000) # from ms to sec
        today = int(date.strfime('%d'))
        if temp_today is not today:
            new_dict = {'date': date.strftime('%d/%m-%Y'),
                        'reading_count': count,
                        'time': date.strftime('%H:%M:%S')}
            dict_list.append(new_dict)

    return dict_list
        
        
5
  • Could direct usage of a dict cursor class like those provided in extra psycopg2 package be enough for your requirements? Commented Jan 27, 2017 at 10:19
  • 1
    You are soaking a huge amount of data out of the DB, do some rather simple processing, and push the result back into the DB. Have you considered doing all this in the DB, with pure SQL? It would be more efficient, depending on whether your actual row-wise processing is as simple as your example suggests. Commented Jan 27, 2017 at 10:33
  • @SergeBallesta It might be. I will have a look at the documentation. Commented Jan 27, 2017 at 10:35
  • @dnswlt I am not that firm in SQL other than requesting and writing, tbh. I fact I didn't know that was possible. Commented Jan 27, 2017 at 10:36
  • @Zeliax, it's very possible :) I'm afk now, but if still relevant I might create some sketch SQL script later. Commented Jan 27, 2017 at 11:04

2 Answers 2

7

create_standard_list() and format_table_dictionary() could build generators (yielding each item instead of returning the full lists), this stops holding the whole lists in memory and so should solve your issue, for example:

def create_standard_list(sensor_id, data):
    for row in data:
        row_timestamp = row[2]
        row_data = row[3]

        temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
                       "data": row_data}
        yield temp_object
       #^ yield each item instead of appending to a list

Further information on generators and the yield keyword.

Sign up to request clarification or add additional context in comments.

11 Comments

So I should rewrite both functions to yield their results and then instantly write that result to my DB? I have read about generators and the yield function, and as far as I can understand I will have to create variables yielded1=create_standard_list(data) and yielded2=format_table_dictionary(yielded1) and then directly write them to the database. But I then assume that I have to create a for-loop using the generator?
Yes, to the write the data you can iterate over the generators and write them item by item, something like: for item in yielded1: out_file.write(item)
So in case I use the yield in both function I have to create a nested for-loop I assume to iterate over both results? (I recognize my misunderstand in above comment and it should be as you suggested instead.) I read through some of the documentation, but didn't find whether code after the yield function is also run. Is it?
@Zeliax And yes yield does not terminate the function (like return does). yield is more like print in that sense
@Zeliax Yep, that will yield a tuple of the values
|
3

What you are trying to do here, IIUC, is to emulate an SQL GROUP BY expression in Python code. This can never be as quick and memory as efficient as doing it directly in the database. Your example code seems to have some issues, but I understand it as: you want to compute the count of rows per day, for each day that occurs for a given did. Also, you are interested in the minimum (or maximum, or median, it does not matter) time of day for each group of values, i.e. for each day.

Let's set up a small example table (tested on Oracle):

create table t1 (id number primary key, created timestamp, did number, other_data varchar2(200));  

insert into t1 values (1, to_timestamp('2017-01-31 17:00:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some text');
insert into t1 values (2, to_timestamp('2017-01-31 19:53:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some more text');
insert into t1 values (3, to_timestamp('2017-02-01 08:10:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day');
insert into t1 values (4, to_timestamp('2017-02-01 15:55:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day, rainy afternoon');
insert into t1 values (5, to_timestamp('2017-02-01 15:59:00', 'YYYY-MM-DD HH24:MI:SS'), 9002, 'different did');
insert into t1 values (6, to_timestamp('2017-02-03 01:01:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'night shift');

We have some rows, spread over several days, for did 9001. There's also a value for did 9002, which we'll ignore. Now let's get the rows that you want to write into your second table as a simple SELECT .. GROUP BY:

select 
    count(*) cnt, 
    to_char(created, 'YYYY-MM-DD') day, 
    min(to_char(created, 'HH24:MI:SS')) min_time 
from t1 
where did = 9001
group by to_char(created, 'YYYY-MM-DD')
;

We are grouping all rows by the day of their created column (a timestamp). We select the number of rows per group, the day itself, and - just for fun - the minimum time part of each group. Result:

cnt day         min_time
2   2017-02-01  08:10:00
1   2017-02-03  01:01:00
2   2017-01-31  17:00:00

So now you have your second table as a SELECT. Creating a table from it is trivial:

create table t2 as
select
    ... as above
;

HTH!

2 Comments

Thanks and yes. You understood correctly. Well almost, as the time I am getting is the time of the first point of data I got within a given date. I'll try this out and hopefully I can get something to work. I'll let you know. And thanks! :D
Nice to hear! And "the time of the first point of data" within each given date is exactly what my 'min' above will select. Good luck!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.