0

I'm working on a Tauri application that uses tauri-specta for type safety and I can't figure out how to properly serialize dates. This is the file where most of the serialization and deserialization takes place:

use arrow_array::{Date64Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use chrono::Utc;
use futures::TryStreamExt;
use lancedb::query::{ExecutableQuery, QueryBase};
use serde::{Deserialize, Serialize};
use serde_arrow::{from_record_batch, marrow::array::TimestampArray};
use std::{ops::Index, sync::Arc};

use crate::core::{
    database::tables::table_paths::DatabaseTables,
    types::{
        errors::errors::{FlusterError, FlusterResult},
        traits::db_entity::DbEntity,
        FlusterDb,
    },
};

use super::{get_snippet_params::GetSnippetsParams, snippet_model::SnippetModel};

#[derive(Serialize, Deserialize, Debug)]
pub struct SnippetEntity {}

impl SnippetEntity {
    pub async fn delete_by_id(id: String, conn: FlusterDb<'_>) -> FlusterResult<()> {
        let tbl = conn
            .open_table(DatabaseTables::Snippets.to_string())
            .execute()
            .await
            .map_err(|_| FlusterError::FailToFind)?;
        tbl.delete(&format!("id = \"{}\"", id))
            .await
            .map_err(|_| FlusterError::FailToDelete)?;
        Ok(())
    }
    pub async fn get_by_id(id: String, conn: FlusterDb<'_>) -> FlusterResult<SnippetModel> {
        let tbl = conn
            .open_table(DatabaseTables::Snippets.to_string())
            .execute()
            .await
            .map_err(|_| FlusterError::FailToFind)?;
        let res = tbl
            .query()
            .only_if(format!("id = \"{}\"", id))
            .execute()
            .await
            .map_err(|_| FlusterError::FailToFind)?
            .try_collect::<Vec<_>>()
            .await
            .map_err(|_| FlusterError::FailToFind)?;

        if res.is_empty() {
            return Err(FlusterError::FailToFind);
        }

        if res.len() > 1 {
            return Err(FlusterError::DuplicateId);
        }

        let batch = res.index(0);
        let items: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
            println!("Failed to serialize: {:?}", e);
            FlusterError::FailToSerialize
        })?;

        let len = items.len();
        match len {
            0 => Err(FlusterError::FailToFind),
            1 => Ok(items.index(0).clone()),
            _ => Err(FlusterError::DuplicateId),
        }
    }
    pub async fn save_many(
        &self,
        items: Vec<SnippetModel>,
        db: FlusterDb<'_>,
    ) -> FlusterResult<()> {
        let schema = SnippetEntity::arrow_schema();
        let tbl = db
            .open_table(DatabaseTables::Snippets.to_string())
            .execute()
            .await
            .map_err(|_| FlusterError::FailToOpenTable)?;
        let batches: Vec<Result<RecordBatch, ArrowError>> = items
            .iter()
            .map(|x| Ok(SnippetEntity::to_record_batch(x, schema.clone())))
            .collect();
        let stream = Box::new(RecordBatchIterator::new(
            batches.into_iter(),
            schema.clone(),
        ));
        // RESUME: Come back here when back online and able to look at the docs for querying
        // with strings. This needs to turn into an upsert statement.
        // tbl.merge_insert(j)
        let primary_key: &[&str] = &["id"];
        tbl.merge_insert(primary_key)
            .when_matched_update_all(None)
            .when_not_matched_insert_all()
            .clone()
            .execute(stream)
            .await
            .map_err(|_| FlusterError::FailToCreateEntity)?;
        Ok(())
    }

    async fn get_many_with_langs(
        db: FlusterDb<'_>,
        langs: Vec<String>,
    ) -> FlusterResult<Vec<SnippetModel>> {
        let tbl = db
            .open_table(DatabaseTables::Snippets.to_string())
            .execute()
            .await
            .map_err(|_| FlusterError::FailToConnect)?;
        let query_string = format!(
            "lang in ({})",
            langs
                .iter()
                .map(|x| format!("\"{}\"", x))
                .collect::<Vec<String>>()
                .join(", ")
        );
        let items_batch = tbl
            .query()
            .only_if(query_string)
            .execute()
            .await
            .map_err(|e| FlusterError::FailToFind)?
            .try_collect::<Vec<_>>()
            .await
            .map_err(|_| FlusterError::FailToFind)?;
        if items_batch.is_empty() {
            return Ok(Vec::new());
        }
        let mut items: Vec<SnippetModel> = Vec::new();
        for batch in items_batch.iter() {
            let data: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
                println!("Serialization Error: {:?}", e);
                FlusterError::FailToSerialize
            })?;
            items.extend(data);
        }
        Ok(items)
    }

    async fn get_many_no_langs(db: FlusterDb<'_>) -> FlusterResult<Vec<SnippetModel>> {
        let tbl = db
            .open_table(DatabaseTables::Snippets.to_string())
            .execute()
            .await
            .map_err(|_| FlusterError::FailToConnect)?;
        let items_batch = tbl
            .query()
            .execute()
            .await
            .map_err(|_| FlusterError::FailToConnect)?
            .try_collect::<Vec<_>>()
            .await
            .map_err(|_| FlusterError::FailToCreateEntity)?;
        if items_batch.is_empty() {
            return Ok(Vec::new());
        }
        // let items: Vec<SnippetModel> =
        //     from_record_batch(items_batch).map_err(|_| FlusterError::FailToSerialize)?;
        let mut items: Vec<SnippetModel> = Vec::new();
        for batch in items_batch.iter() {
            let data: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
                println!("Serialization Error: {:?}", e);
                FlusterError::FailToSerialize
            })?;
            items.extend(data);
        }
        Ok(items)
    }

    pub async fn get_many(
        db: FlusterDb<'_>,
        opts: GetSnippetsParams,
    ) -> FlusterResult<Vec<SnippetModel>> {
        if opts.langs.is_some() {
            SnippetEntity::get_many_with_langs(db, opts.langs.unwrap()).await
        } else {
            SnippetEntity::get_many_no_langs(db).await
        }
    }
}

impl DbEntity<SnippetModel> for SnippetEntity {
    fn arrow_schema() -> std::sync::Arc<arrow_schema::Schema> {
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Utf8, false),
            Field::new("label", DataType::Utf8, false),
            Field::new("body", DataType::Utf8, false),
            Field::new("desc", DataType::Utf8, true),
            Field::new("lang", DataType::Utf8, false),
            Field::new(
                "ctime",
                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
                false,
            ),
            Field::new(
                "utime",
                DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
                false,
            ),
        ]))
    }

    fn to_record_batch(item: &SnippetModel, schema: Arc<Schema>) -> RecordBatch {
        let now = Utc::now();
        let ctime = arrow_array::TimestampMillisecondArray::from(vec![item
            .ctime
            .unwrap_or(now)
            .timestamp_millis()]);
        let utime = arrow_array::TimestampMillisecondArray::from(vec![item
            .utime
            .unwrap_or(now)
            .timestamp_millis()]);
        let body = arrow_array::StringArray::from(vec![item.body.clone()]);
        let id = arrow_array::StringArray::from(vec![item
            .id
            .clone()
            .or(Some(uuid::Uuid::new_v4().to_string()))]);
        let desc = arrow_array::StringArray::from(vec![item.desc.clone()]);
        let label = arrow_array::StringArray::from(vec![item.label.clone()]);
        let lang = arrow_array::StringArray::from(vec![item.lang.clone()]);
        RecordBatch::try_new(
            schema,
            vec![
                Arc::new(id),
                Arc::new(label),
                Arc::new(body),
                Arc::new(desc),
                Arc::new(lang),
                Arc::new(ctime),
                Arc::new(utime),
            ],
        )
        .unwrap()
    }
}

I keep reaching the FlusterError::FailToSerialize block when reading items from the database with the following error:

Error: serde::de::Error: premature end of input (data_type: "Date64", field: "$.ctime")

I should note that I have tried this field as a Date64 previously before realizing that apparently the Date64 field only accepts i64's that represent a specific day, not a complete datetime.

Any help is greatly appreciated. I can insert items into the database without error, but for the life of me I can't figure out how to read them from the database.

I know I read somewhere about a tauri-specta/specta issue with serializing bigints, but I've looked for like 45 minutes and I can't find where I saw that and how it was supposed to be resolved.

Any help is greatly appreciated. Even I have to serialize all dates as just a normal int I'm ok with that, but I'm at a loss for how to serialize dates using tauri-specta and lancedb's apache. I'm quite sure the issue is on the arrow end though...

1
  • If you want a Datetime use the timestamp type. Date64 are not very useful, they represent number of days since epoch as a 64 bit integer. Date32 is enough for most use case. Commented Jul 16 at 7:47

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.