From 31f954f38fb76c6f837e755afffdd9df9eb4f21f Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Mon, 21 Oct 2024 22:27:09 -0300 Subject: [PATCH] Optimize and fix attempt at cloud ingester --- core/crates/cloud-services/src/sync/ingest.rs | 85 +++----- core/crates/sync/src/ingest_utils.rs | 19 +- core/crates/sync/src/lib.rs | 4 +- core/crates/sync/src/manager.rs | 200 +++++++++++++----- core/src/location/mod.rs | 2 + crates/sync/src/compressed.rs | 2 +- 6 files changed, 199 insertions(+), 113 deletions(-) diff --git a/core/crates/cloud-services/src/sync/ingest.rs b/core/crates/cloud-services/src/sync/ingest.rs index 065ceb964..9592b64ab 100644 --- a/core/crates/cloud-services/src/sync/ingest.rs +++ b/core/crates/cloud-services/src/sync/ingest.rs @@ -1,10 +1,8 @@ use crate::Error; -use sd_core_sync::{from_cloud_crdt_ops, CompressedCRDTOperationsPerModelPerDevice, SyncManager}; +use sd_core_sync::SyncManager; use sd_actors::{Actor, Stopper}; -use sd_prisma::prisma::{cloud_crdt_operation, SortOrder}; -use sd_utils::timestamp_to_datetime; use std::{ future::IntoFuture, @@ -12,7 +10,6 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::SystemTime, }; use futures::FutureExt; @@ -22,8 +19,6 @@ use tracing::{debug, error}; use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE}; -const BATCH_SIZE: i64 = 1000; - /// Responsible for taking sync operations received from the cloud, /// and applying them to the local database via the sync system's ingest actor. @@ -43,20 +38,14 @@ impl Actor for Ingester { Stopped, } - 'outer: loop { + loop { self.active.store(true, Ordering::Relaxed); self.active_notify.notify_waiters(); - loop { - match self.run_loop_iteration().await { - Ok(IngestStatus::Completed) => break, - Ok(IngestStatus::InProgress) => {} - Err(e) => { - error!(?e, "Error during cloud sync ingester actor iteration"); - sleep(ONE_MINUTE).await; - continue 'outer; - } - } + if let Err(e) = self.run_loop_iteration().await { + error!(?e, "Error during cloud sync ingester actor iteration"); + sleep(ONE_MINUTE).await; + continue; } self.active.store(false, Ordering::Relaxed); @@ -79,11 +68,6 @@ impl Actor for Ingester { } } -enum IngestStatus { - Completed, - InProgress, -} - impl Ingester { pub const fn new( sync: SyncManager, @@ -99,48 +83,33 @@ impl Ingester { } } - async fn run_loop_iteration(&self) -> Result { - let (ops_ids, ops) = self + async fn run_loop_iteration(&self) -> Result<(), Error> { + let operations_to_ingest_count = self .sync .db .cloud_crdt_operation() - .find_many(vec![]) - .take(BATCH_SIZE) - .order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc)) - .exec() - .await - .map_err(sd_core_sync::Error::from)? - .into_iter() - .map(from_cloud_crdt_ops) - .collect::, Vec<_>), _>>()?; - - if ops_ids.is_empty() { - return Ok(IngestStatus::Completed); - } - - debug!( - messages_count = ops.len(), - first_message = ?ops - .first() - .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), - last_message = ?ops - .last() - .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), - "Messages to ingest", - ); - - self.sync - .ingest_ops(CompressedCRDTOperationsPerModelPerDevice::new(ops)) - .await?; - - self.sync - .db - .cloud_crdt_operation() - .delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)]) + .count(vec![]) .exec() .await .map_err(sd_core_sync::Error::from)?; - Ok(IngestStatus::InProgress) + if operations_to_ingest_count == 0 { + debug!("Nothing to ingest, early finishing ingester loop"); + return Ok(()); + } + + debug!( + operations_to_ingest_count, + "Starting sync messages cloud ingestion loop" + ); + + self.sync.ingest_ops().await?; + + debug!( + operations_to_ingest_count, + "Finished sync messages cloud ingestion loop" + ); + + Ok(()) } } diff --git a/core/crates/sync/src/ingest_utils.rs b/core/crates/sync/src/ingest_utils.rs index 6c77a96b7..e63f317ed 100644 --- a/core/crates/sync/src/ingest_utils.rs +++ b/core/crates/sync/src/ingest_utils.rs @@ -298,7 +298,24 @@ async fn handle_crdt_deletion( record_id: rmpv::Value, delete_op: &CompressedCRDTOperation, ) -> Result<(), Error> { - // deletes are the be all and end all, no need to check anything + // deletes are the be all and end all, except if we never created the object to begin with + // in this case we don't need to delete anything + + if db + .crdt_operation() + .count(vec![ + crdt_operation::model::equals(i32::from(model)), + crdt_operation::record_id::equals(rmp_serde::to_vec(&record_id)?), + ]) + .exec() + .await? + == 0 + { + // This means that in the other device this entry was created and deleted, before this + // device here could even take notice of it. So we don't need to do anything here. + return Ok(()); + } + let op = CRDTOperation { device_pub_id: device_pub_id.into(), model_id: model, diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs index 56822509c..5b8d90efe 100644 --- a/core/crates/sync/src/lib.rs +++ b/core/crates/sync/src/lib.rs @@ -35,7 +35,7 @@ use sd_utils::uuid_to_bytes; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, task::JoinError}; pub mod backfill; mod db_operation; @@ -77,6 +77,8 @@ pub enum Error { EmptyOperations, #[error("device not found: {0}")] DeviceNotFound(DevicePubId), + #[error("processes crdt task panicked")] + ProcessCrdtPanic(JoinError), } impl From for rspc::Error { diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 382261b9d..a01b6716b 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -1,36 +1,39 @@ use sd_core_prisma_helpers::DevicePubId; use sd_prisma::{ - prisma::{crdt_operation, device, PrismaClient, SortOrder}, + prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient, SortOrder}, prisma_sync, }; -use sd_sync::{ - CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice, - ModelId, OperationFactory, -}; +use sd_sync::{CRDTOperation, CompressedCRDTOperation, ModelId, OperationFactory, RecordId}; use sd_utils::timestamp_to_datetime; use std::{ - collections::BTreeMap, + collections::{hash_map::Entry, BTreeMap, HashMap}, fmt, num::NonZeroU128, sync::{ atomic::{self, AtomicBool}, Arc, }, + time::SystemTime, }; use async_stream::stream; use futures::Stream; -use futures_concurrency::future::TryJoin; -use tokio::sync::{broadcast, Mutex, Notify, RwLock}; +use futures_concurrency::future::{Join, TryJoin}; +use tokio::{ + spawn, + sync::{broadcast, Mutex, Notify, RwLock, Semaphore}, +}; use tracing::{debug, warn}; use uhlc::{HLCBuilder, HLC}; use uuid::Uuid; use super::{ - crdt_op_db, db_operation::from_crdt_ops, ingest_utils::process_crdt_operations, Error, - SyncEvent, TimestampPerDevice, NTP64, + crdt_op_db, + db_operation::{from_cloud_crdt_ops, from_crdt_ops}, + ingest_utils::process_crdt_operations, + Error, SyncEvent, TimestampPerDevice, NTP64, }; /// Wrapper that spawns the ingest actor and provides utilities for reading and writing sync operations. @@ -44,7 +47,8 @@ pub struct Manager { pub clock: Arc, pub active: Arc, pub active_notify: Arc, - pub sync_lock: Arc>, + pub(crate) sync_lock: Arc>, + pub(crate) available_parallelism: usize, } impl fmt::Debug for Manager { @@ -131,62 +135,154 @@ impl Manager { active: Arc::default(), active_notify: Arc::default(), sync_lock: Arc::new(Mutex::default()), + available_parallelism: std::thread::available_parallelism() + .map_or(1, std::num::NonZero::get), }, rx, )) } - pub async fn ingest_ops( + async fn fetch_cloud_crdt_ops( &self, - CompressedCRDTOperationsPerModelPerDevice(compressed_ops): CompressedCRDTOperationsPerModelPerDevice, - ) -> Result<(), Error> { - // WARN: this order here exists because sync messages MUST be processed in this exact order - // due to relationship dependencies between these tables. - const INGEST_ORDER: &[ModelId] = &[ - prisma_sync::device::MODEL_ID, - prisma_sync::storage_statistics::MODEL_ID, - prisma_sync::tag::MODEL_ID, - prisma_sync::location::MODEL_ID, - prisma_sync::object::MODEL_ID, - prisma_sync::exif_data::MODEL_ID, - prisma_sync::file_path::MODEL_ID, - prisma_sync::label::MODEL_ID, - prisma_sync::tag_on_object::MODEL_ID, - prisma_sync::label_on_object::MODEL_ID, - ]; + model_id: ModelId, + batch_size: i64, + ) -> Result<(Vec, Vec), Error> { + self.db + .cloud_crdt_operation() + .find_many(vec![cloud_crdt_operation::model::equals(i32::from( + model_id, + ))]) + .take(batch_size) + .order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc)) + .exec() + .await? + .into_iter() + .map(from_cloud_crdt_ops) + .collect::, Vec<_>), _>>() + } - let _lock_guard = self.sync_lock.lock().await; + async fn ingest_by_model(&self, model_id: ModelId) -> Result<(), Error> { + let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?; + if ops_ids.is_empty() { + return Ok(()); + } - let mut ops_fut_by_model = INGEST_ORDER - .iter() - .map(|&model_id| (model_id, vec![])) - .collect::>(); + debug!( + messages_count = ops.len(), + first_message = ?ops + .first() + .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), + last_message = ?ops + .last() + .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), + model_id, + "Messages by model to ingest", + ); - for (device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model)) in compressed_ops { - for (model_id, ops_per_record) in ops_per_model { - for (record_id, ops) in ops_per_record { - ops_fut_by_model - .get_mut(&model_id) - .ok_or(Error::InvalidModelId(model_id))? - .push(process_crdt_operations( - &self.clock, - &self.timestamp_per_device, - &self.db, - device_pub_id.into(), - model_id, - record_id, - ops, - )); + let mut compressed_map = + BTreeMap::, (RecordId, Vec)>>::new(); + + for CRDTOperation { + device_pub_id, + timestamp, + model_id: _, // Ignoring model_id as we know it already + record_id, + data, + } in ops + { + let records = compressed_map.entry(device_pub_id).or_default(); + + // Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq. + // So we use it's serialized bytes as a key. + let record_id_bytes = + rmp_serde::to_vec_named(&record_id).expect("already serialized to Value"); + + match records.entry(record_id_bytes) { + Entry::Occupied(mut entry) => { + entry + .get_mut() + .1 + .push(CompressedCRDTOperation { timestamp, data }); + } + Entry::Vacant(entry) => { + entry.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }])); } } } - for model_id in INGEST_ORDER { - if let Some(futs) = ops_fut_by_model.remove(model_id) { - futs.try_join().await?; - } + let _lock_guard = self.sync_lock.lock().await; + + let semaphore = &Arc::new(Semaphore::new(self.available_parallelism)); + + let handles = compressed_map + .into_iter() + .flat_map(|(device_pub_id, records)| { + records.into_values().map(move |(record_id, ops)| { + // We can process each record in parallel as they are independent + spawn({ + let clock = Arc::clone(&self.clock); + let timestamp_per_device = Arc::clone(&self.timestamp_per_device); + let db = Arc::clone(&self.db); + let device_pub_id = device_pub_id.into(); + let semaphore = Arc::clone(semaphore); + + async move { + let _permit = + semaphore.acquire().await.expect("semaphore never closes"); + + process_crdt_operations( + &clock, + ×tamp_per_device, + &db, + device_pub_id, + model_id, + record_id, + ops, + ) + .await + } + }) + }) + }) + .collect::>(); + + for res in handles.join().await { + res.map_err(Error::ProcessCrdtPanic)??; } + self.db + .cloud_crdt_operation() + .delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)]) + .exec() + .await?; + + Ok(()) + } + + pub async fn ingest_ops(&self) -> Result<(), Error> { + // WARN: this order here exists because sync messages MUST be processed in this exact order + // due to relationship dependencies between these tables. + self.ingest_by_model(prisma_sync::device::MODEL_ID).await?; + + ( + self.ingest_by_model(prisma_sync::storage_statistics::MODEL_ID), + self.ingest_by_model(prisma_sync::tag::MODEL_ID), + self.ingest_by_model(prisma_sync::location::MODEL_ID), + self.ingest_by_model(prisma_sync::object::MODEL_ID), + self.ingest_by_model(prisma_sync::label::MODEL_ID), + ) + .try_join() + .await?; + + ( + self.ingest_by_model(prisma_sync::exif_data::MODEL_ID), + self.ingest_by_model(prisma_sync::file_path::MODEL_ID), + self.ingest_by_model(prisma_sync::tag_on_object::MODEL_ID), + self.ingest_by_model(prisma_sync::label_on_object::MODEL_ID), + ) + .try_join() + .await?; + if self.tx.send(SyncEvent::Ingested).is_err() { warn!("failed to send ingested message on `ingest_ops`"); } diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index a4a998995..d3baf8532 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -466,6 +466,7 @@ pub async fn scan_location( ) .await? } + ScanState::Indexed => { node.job_system .dispatch( @@ -478,6 +479,7 @@ pub async fn scan_location( ) .await? } + ScanState::FilesIdentified => { node.job_system .dispatch( diff --git a/crates/sync/src/compressed.rs b/crates/sync/src/compressed.rs index 47c38e7fe..a2e3a147d 100644 --- a/crates/sync/src/compressed.rs +++ b/crates/sync/src/compressed.rs @@ -46,7 +46,7 @@ impl CompressedCRDTOperationsPerModelPerDevice { // Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq. // So we use it's serialized bytes as a key. let record_id_bytes = - rmp_serde::to_vec(&record_id).expect("already serialized to Value"); + rmp_serde::to_vec_named(&record_id).expect("already serialized to Value"); match records.entry(record_id_bytes) { Entry::Occupied(mut entry) => {