I'm working on a Tauri application that uses tauri-specta for type safety and I can't figure out how to properly serialize dates. This is the file where most of the serialization and deserialization takes place:
use arrow_array::{Date64Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use chrono::Utc;
use futures::TryStreamExt;
use lancedb::query::{ExecutableQuery, QueryBase};
use serde::{Deserialize, Serialize};
use serde_arrow::{from_record_batch, marrow::array::TimestampArray};
use std::{ops::Index, sync::Arc};
use crate::core::{
database::tables::table_paths::DatabaseTables,
types::{
errors::errors::{FlusterError, FlusterResult},
traits::db_entity::DbEntity,
FlusterDb,
},
};
use super::{get_snippet_params::GetSnippetsParams, snippet_model::SnippetModel};
#[derive(Serialize, Deserialize, Debug)]
pub struct SnippetEntity {}
impl SnippetEntity {
pub async fn delete_by_id(id: String, conn: FlusterDb<'_>) -> FlusterResult<()> {
let tbl = conn
.open_table(DatabaseTables::Snippets.to_string())
.execute()
.await
.map_err(|_| FlusterError::FailToFind)?;
tbl.delete(&format!("id = \"{}\"", id))
.await
.map_err(|_| FlusterError::FailToDelete)?;
Ok(())
}
pub async fn get_by_id(id: String, conn: FlusterDb<'_>) -> FlusterResult<SnippetModel> {
let tbl = conn
.open_table(DatabaseTables::Snippets.to_string())
.execute()
.await
.map_err(|_| FlusterError::FailToFind)?;
let res = tbl
.query()
.only_if(format!("id = \"{}\"", id))
.execute()
.await
.map_err(|_| FlusterError::FailToFind)?
.try_collect::<Vec<_>>()
.await
.map_err(|_| FlusterError::FailToFind)?;
if res.is_empty() {
return Err(FlusterError::FailToFind);
}
if res.len() > 1 {
return Err(FlusterError::DuplicateId);
}
let batch = res.index(0);
let items: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
println!("Failed to serialize: {:?}", e);
FlusterError::FailToSerialize
})?;
let len = items.len();
match len {
0 => Err(FlusterError::FailToFind),
1 => Ok(items.index(0).clone()),
_ => Err(FlusterError::DuplicateId),
}
}
pub async fn save_many(
&self,
items: Vec<SnippetModel>,
db: FlusterDb<'_>,
) -> FlusterResult<()> {
let schema = SnippetEntity::arrow_schema();
let tbl = db
.open_table(DatabaseTables::Snippets.to_string())
.execute()
.await
.map_err(|_| FlusterError::FailToOpenTable)?;
let batches: Vec<Result<RecordBatch, ArrowError>> = items
.iter()
.map(|x| Ok(SnippetEntity::to_record_batch(x, schema.clone())))
.collect();
let stream = Box::new(RecordBatchIterator::new(
batches.into_iter(),
schema.clone(),
));
// RESUME: Come back here when back online and able to look at the docs for querying
// with strings. This needs to turn into an upsert statement.
// tbl.merge_insert(j)
let primary_key: &[&str] = &["id"];
tbl.merge_insert(primary_key)
.when_matched_update_all(None)
.when_not_matched_insert_all()
.clone()
.execute(stream)
.await
.map_err(|_| FlusterError::FailToCreateEntity)?;
Ok(())
}
async fn get_many_with_langs(
db: FlusterDb<'_>,
langs: Vec<String>,
) -> FlusterResult<Vec<SnippetModel>> {
let tbl = db
.open_table(DatabaseTables::Snippets.to_string())
.execute()
.await
.map_err(|_| FlusterError::FailToConnect)?;
let query_string = format!(
"lang in ({})",
langs
.iter()
.map(|x| format!("\"{}\"", x))
.collect::<Vec<String>>()
.join(", ")
);
let items_batch = tbl
.query()
.only_if(query_string)
.execute()
.await
.map_err(|e| FlusterError::FailToFind)?
.try_collect::<Vec<_>>()
.await
.map_err(|_| FlusterError::FailToFind)?;
if items_batch.is_empty() {
return Ok(Vec::new());
}
let mut items: Vec<SnippetModel> = Vec::new();
for batch in items_batch.iter() {
let data: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
println!("Serialization Error: {:?}", e);
FlusterError::FailToSerialize
})?;
items.extend(data);
}
Ok(items)
}
async fn get_many_no_langs(db: FlusterDb<'_>) -> FlusterResult<Vec<SnippetModel>> {
let tbl = db
.open_table(DatabaseTables::Snippets.to_string())
.execute()
.await
.map_err(|_| FlusterError::FailToConnect)?;
let items_batch = tbl
.query()
.execute()
.await
.map_err(|_| FlusterError::FailToConnect)?
.try_collect::<Vec<_>>()
.await
.map_err(|_| FlusterError::FailToCreateEntity)?;
if items_batch.is_empty() {
return Ok(Vec::new());
}
// let items: Vec<SnippetModel> =
// from_record_batch(items_batch).map_err(|_| FlusterError::FailToSerialize)?;
let mut items: Vec<SnippetModel> = Vec::new();
for batch in items_batch.iter() {
let data: Vec<SnippetModel> = from_record_batch(batch).map_err(|e| {
println!("Serialization Error: {:?}", e);
FlusterError::FailToSerialize
})?;
items.extend(data);
}
Ok(items)
}
pub async fn get_many(
db: FlusterDb<'_>,
opts: GetSnippetsParams,
) -> FlusterResult<Vec<SnippetModel>> {
if opts.langs.is_some() {
SnippetEntity::get_many_with_langs(db, opts.langs.unwrap()).await
} else {
SnippetEntity::get_many_no_langs(db).await
}
}
}
impl DbEntity<SnippetModel> for SnippetEntity {
fn arrow_schema() -> std::sync::Arc<arrow_schema::Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("label", DataType::Utf8, false),
Field::new("body", DataType::Utf8, false),
Field::new("desc", DataType::Utf8, true),
Field::new("lang", DataType::Utf8, false),
Field::new(
"ctime",
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
false,
),
Field::new(
"utime",
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
false,
),
]))
}
fn to_record_batch(item: &SnippetModel, schema: Arc<Schema>) -> RecordBatch {
let now = Utc::now();
let ctime = arrow_array::TimestampMillisecondArray::from(vec![item
.ctime
.unwrap_or(now)
.timestamp_millis()]);
let utime = arrow_array::TimestampMillisecondArray::from(vec![item
.utime
.unwrap_or(now)
.timestamp_millis()]);
let body = arrow_array::StringArray::from(vec![item.body.clone()]);
let id = arrow_array::StringArray::from(vec![item
.id
.clone()
.or(Some(uuid::Uuid::new_v4().to_string()))]);
let desc = arrow_array::StringArray::from(vec![item.desc.clone()]);
let label = arrow_array::StringArray::from(vec![item.label.clone()]);
let lang = arrow_array::StringArray::from(vec![item.lang.clone()]);
RecordBatch::try_new(
schema,
vec![
Arc::new(id),
Arc::new(label),
Arc::new(body),
Arc::new(desc),
Arc::new(lang),
Arc::new(ctime),
Arc::new(utime),
],
)
.unwrap()
}
}
I keep reaching the FlusterError::FailToSerialize block when reading items from the database with the following error:
Error: serde::de::Error: premature end of input (data_type: "Date64", field: "$.ctime")
I should note that I have tried this field as a Date64 previously before realizing that apparently the Date64 field only accepts i64's that represent a specific day, not a complete datetime.
Any help is greatly appreciated. I can insert items into the database without error, but for the life of me I can't figure out how to read them from the database.
I know I read somewhere about a tauri-specta/specta issue with serializing bigints, but I've looked for like 45 minutes and I can't find where I saw that and how it was supposed to be resolved.
Any help is greatly appreciated. Even I have to serialize all dates as just a normal int I'm ok with that, but I'm at a loss for how to serialize dates using tauri-specta and lancedb's apache. I'm quite sure the issue is on the arrow end though...