1

I'm attempting to use threads to offload the insertion of three different data types into a Postgres database, but are getting stuck on the borrow checker.

I've attempted a few different approaches, and ended up in a place where i did not know what I was doing anymore. So I've tried to simplify everything, removed all the attributes of my dbm struct and set it so that each method takes a reference to PgPool as an argument and creating clones of the initial PgPool for each of the method calls. All the methods take a &self argument. I've also tried to make copies of dbm for each of the operations which did not work (in addition to feeling extraordinarily clumsy).

I expected this to work, either as shown in the code sample here or in the variant where i used a reference of dbm or a cloned version. Instead I get the following message from the borrow checker, and while understanding it, I am not sure how to best accommodate it.

76 |           let aton_handle = task::spawn(dbm.insert_aton_data(
   |  _______________________________________^
77 | |             connection_pool.clone(),
78 | |             split_messages.aton_data,
79 | |             &log_id,
80 | |         ));
   | |         ^
   | |         |
   | |_________borrowed value does not live long enough
   |           argument requires that `dbm` is borrowed for `'static`
...

Code snippet that show the offending code:

let connection_pool = PgPool::connect(&connection_string)
        .await
        .expect("Failed to connect to Postgres");
    let dbm = DbMethods {};

    // Make API calls etc..


    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        // TODO: Create thread for each message type and do DB inserts.
        let aton_handle = task::spawn(dbm.insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            &log_id,
        ));
        // ... other handles.

        let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
    }

Method:

pub async fn insert_aton_data(
        &self,
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: &Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // let pool = PgPool::connect(&self.connection_string).await?;
        let tx = db_pool.begin().await?;

        for data in aton_data {
            sqlx::query!(
                "INSERT INTO ais.ais_aton_data (
                    type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
                    type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
                data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
                data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
                data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
            ).execute(&db_pool).await?;
        }
        tx.commit().await?;

        Ok(())
    }

Current solution

I decided to move away from the pattern of having the database operations as methods on a struct and instead have them as functions in order to keep moving forward. The threading now works. Hope someone can explain to me how I can achieve having them as methods.

Here is the code I ended up with for now in case anyone else would want to do something like this. Not saying that it is a good idea, it might be well worth having a look at using sqlx and Postgres UNNESTING for bulk inserting.

async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        let aton_handle = task::spawn(insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            log_id.clone(),
        ));
        let static_handle = task::spawn(insert_static_data(
            connection_pool.clone(),
            split_messages.static_data,
            log_id.clone(),
        ));
        let position_handle = task::spawn(insert_position_data(
            connection_pool.clone(),
            split_messages.position_data,
            log_id.clone(),
        ));

        let res = tokio::try_join!(aton_handle, static_handle, position_handle);
        match res {
            Ok(..) => {
                debug!("Threads completed");
            }
            Err(error) => warn!("There was an error in one of the threads: {:?}", error)
        }
    }

    Ok(())
}
6
  • What is dbm.insert_aton_data? The error clearly specifies that dbm is being borrowed by the future returned by insert_aton_data. Commented Sep 28, 2023 at 12:09
  • @ColonelThirtyTwo Added the entire method to the question instead of just the signature. Have I just ended up complicating things by wanting to use methods instead of functions in this case? Commented Sep 28, 2023 at 13:07
  • Why do you want them as methods to begin with? You're not using self anywhere. There's several ways to make this work, but which one is the right one for your problem depends on why you want an object with methods instead of just a module with functions for example. Commented Sep 28, 2023 at 19:03
  • @cafce25 To be honest I started with a misguided preconception on how I should approach the problem, but also because I really do like the method syntax as it is a pattern that feels familiar. Commented Sep 28, 2023 at 20:53
  • Threading won't help much, if at all, of not make it worse. There is still only one network and one database server. You would be better off investigating using batches. Commented Sep 29, 2023 at 4:09

1 Answer 1

2

If just the method syntax is what you're after then you can get around the compiler error by letting the compiler do constant promotion just add a borrow where you declare dbm:

let dbm = &Dbm {};

or implement Copy for Dbm and change the receiver from &self to self.

But I'd really advise against using a method for it's sake, you can instead group your functions into a module and call them with the modules name for a small change from . to :::

mod dbm {
    pub async fn insert_aton_data(
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 
        //…
    }
}

// and later call it like
let handle = task::spawn(dbm::insert_aton_data(
    connection_pool.clone(),
    split_messages.aton_data,
    log_id.clone(),
));

Note: I also changed from &Uuid to Uuid as that's likely to cause the same problem

Another note: If you declare Dbm as struct Dbm; you don't have to add the {} when creating an instance of it.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.