Optimize and fix attempt at cloud ingester

This commit is contained in:
Ericson Soares 2024-10-21 22:27:09 -03:00
parent eb098ebec2
commit 31f954f38f
6 changed files with 199 additions and 113 deletions

View File

@ -1,10 +1,8 @@
use crate::Error;
use sd_core_sync::{from_cloud_crdt_ops, CompressedCRDTOperationsPerModelPerDevice, SyncManager};
use sd_core_sync::SyncManager;
use sd_actors::{Actor, Stopper};
use sd_prisma::prisma::{cloud_crdt_operation, SortOrder};
use sd_utils::timestamp_to_datetime;
use std::{
future::IntoFuture,
@ -12,7 +10,6 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::SystemTime,
};
use futures::FutureExt;
@ -22,8 +19,6 @@ use tracing::{debug, error};
use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE};
const BATCH_SIZE: i64 = 1000;
/// Responsible for taking sync operations received from the cloud,
/// and applying them to the local database via the sync system's ingest actor.
@ -43,20 +38,14 @@ impl Actor<SyncActors> for Ingester {
Stopped,
}
'outer: loop {
loop {
self.active.store(true, Ordering::Relaxed);
self.active_notify.notify_waiters();
loop {
match self.run_loop_iteration().await {
Ok(IngestStatus::Completed) => break,
Ok(IngestStatus::InProgress) => {}
Err(e) => {
error!(?e, "Error during cloud sync ingester actor iteration");
sleep(ONE_MINUTE).await;
continue 'outer;
}
}
if let Err(e) = self.run_loop_iteration().await {
error!(?e, "Error during cloud sync ingester actor iteration");
sleep(ONE_MINUTE).await;
continue;
}
self.active.store(false, Ordering::Relaxed);
@ -79,11 +68,6 @@ impl Actor<SyncActors> for Ingester {
}
}
enum IngestStatus {
Completed,
InProgress,
}
impl Ingester {
pub const fn new(
sync: SyncManager,
@ -99,48 +83,33 @@ impl Ingester {
}
}
async fn run_loop_iteration(&self) -> Result<IngestStatus, Error> {
let (ops_ids, ops) = self
async fn run_loop_iteration(&self) -> Result<(), Error> {
let operations_to_ingest_count = self
.sync
.db
.cloud_crdt_operation()
.find_many(vec![])
.take(BATCH_SIZE)
.order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc))
.exec()
.await
.map_err(sd_core_sync::Error::from)?
.into_iter()
.map(from_cloud_crdt_ops)
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
if ops_ids.is_empty() {
return Ok(IngestStatus::Completed);
}
debug!(
messages_count = ops.len(),
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
last_message = ?ops
.last()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
"Messages to ingest",
);
self.sync
.ingest_ops(CompressedCRDTOperationsPerModelPerDevice::new(ops))
.await?;
self.sync
.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.count(vec![])
.exec()
.await
.map_err(sd_core_sync::Error::from)?;
Ok(IngestStatus::InProgress)
if operations_to_ingest_count == 0 {
debug!("Nothing to ingest, early finishing ingester loop");
return Ok(());
}
debug!(
operations_to_ingest_count,
"Starting sync messages cloud ingestion loop"
);
self.sync.ingest_ops().await?;
debug!(
operations_to_ingest_count,
"Finished sync messages cloud ingestion loop"
);
Ok(())
}
}

View File

@ -298,7 +298,24 @@ async fn handle_crdt_deletion(
record_id: rmpv::Value,
delete_op: &CompressedCRDTOperation,
) -> Result<(), Error> {
// deletes are the be all and end all, no need to check anything
// deletes are the be all and end all, except if we never created the object to begin with
// in this case we don't need to delete anything
if db
.crdt_operation()
.count(vec![
crdt_operation::model::equals(i32::from(model)),
crdt_operation::record_id::equals(rmp_serde::to_vec(&record_id)?),
])
.exec()
.await?
== 0
{
// This means that in the other device this entry was created and deleted, before this
// device here could even take notice of it. So we don't need to do anything here.
return Ok(());
}
let op = CRDTOperation {
device_pub_id: device_pub_id.into(),
model_id: model,

View File

@ -35,7 +35,7 @@ use sd_utils::uuid_to_bytes;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tokio::{sync::RwLock, task::JoinError};
pub mod backfill;
mod db_operation;
@ -77,6 +77,8 @@ pub enum Error {
EmptyOperations,
#[error("device not found: {0}")]
DeviceNotFound(DevicePubId),
#[error("processes crdt task panicked")]
ProcessCrdtPanic(JoinError),
}
impl From<Error> for rspc::Error {

View File

@ -1,36 +1,39 @@
use sd_core_prisma_helpers::DevicePubId;
use sd_prisma::{
prisma::{crdt_operation, device, PrismaClient, SortOrder},
prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient, SortOrder},
prisma_sync,
};
use sd_sync::{
CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice,
ModelId, OperationFactory,
};
use sd_sync::{CRDTOperation, CompressedCRDTOperation, ModelId, OperationFactory, RecordId};
use sd_utils::timestamp_to_datetime;
use std::{
collections::BTreeMap,
collections::{hash_map::Entry, BTreeMap, HashMap},
fmt,
num::NonZeroU128,
sync::{
atomic::{self, AtomicBool},
Arc,
},
time::SystemTime,
};
use async_stream::stream;
use futures::Stream;
use futures_concurrency::future::TryJoin;
use tokio::sync::{broadcast, Mutex, Notify, RwLock};
use futures_concurrency::future::{Join, TryJoin};
use tokio::{
spawn,
sync::{broadcast, Mutex, Notify, RwLock, Semaphore},
};
use tracing::{debug, warn};
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;
use super::{
crdt_op_db, db_operation::from_crdt_ops, ingest_utils::process_crdt_operations, Error,
SyncEvent, TimestampPerDevice, NTP64,
crdt_op_db,
db_operation::{from_cloud_crdt_ops, from_crdt_ops},
ingest_utils::process_crdt_operations,
Error, SyncEvent, TimestampPerDevice, NTP64,
};
/// Wrapper that spawns the ingest actor and provides utilities for reading and writing sync operations.
@ -44,7 +47,8 @@ pub struct Manager {
pub clock: Arc<HLC>,
pub active: Arc<AtomicBool>,
pub active_notify: Arc<Notify>,
pub sync_lock: Arc<Mutex<()>>,
pub(crate) sync_lock: Arc<Mutex<()>>,
pub(crate) available_parallelism: usize,
}
impl fmt::Debug for Manager {
@ -131,62 +135,154 @@ impl Manager {
active: Arc::default(),
active_notify: Arc::default(),
sync_lock: Arc::new(Mutex::default()),
available_parallelism: std::thread::available_parallelism()
.map_or(1, std::num::NonZero::get),
},
rx,
))
}
pub async fn ingest_ops(
async fn fetch_cloud_crdt_ops(
&self,
CompressedCRDTOperationsPerModelPerDevice(compressed_ops): CompressedCRDTOperationsPerModelPerDevice,
) -> Result<(), Error> {
// WARN: this order here exists because sync messages MUST be processed in this exact order
// due to relationship dependencies between these tables.
const INGEST_ORDER: &[ModelId] = &[
prisma_sync::device::MODEL_ID,
prisma_sync::storage_statistics::MODEL_ID,
prisma_sync::tag::MODEL_ID,
prisma_sync::location::MODEL_ID,
prisma_sync::object::MODEL_ID,
prisma_sync::exif_data::MODEL_ID,
prisma_sync::file_path::MODEL_ID,
prisma_sync::label::MODEL_ID,
prisma_sync::tag_on_object::MODEL_ID,
prisma_sync::label_on_object::MODEL_ID,
];
model_id: ModelId,
batch_size: i64,
) -> Result<(Vec<cloud_crdt_operation::id::Type>, Vec<CRDTOperation>), Error> {
self.db
.cloud_crdt_operation()
.find_many(vec![cloud_crdt_operation::model::equals(i32::from(
model_id,
))])
.take(batch_size)
.order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc))
.exec()
.await?
.into_iter()
.map(from_cloud_crdt_ops)
.collect::<Result<(Vec<_>, Vec<_>), _>>()
}
let _lock_guard = self.sync_lock.lock().await;
async fn ingest_by_model(&self, model_id: ModelId) -> Result<(), Error> {
let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?;
if ops_ids.is_empty() {
return Ok(());
}
let mut ops_fut_by_model = INGEST_ORDER
.iter()
.map(|&model_id| (model_id, vec![]))
.collect::<BTreeMap<_, _>>();
debug!(
messages_count = ops.len(),
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
last_message = ?ops
.last()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
model_id,
"Messages by model to ingest",
);
for (device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model)) in compressed_ops {
for (model_id, ops_per_record) in ops_per_model {
for (record_id, ops) in ops_per_record {
ops_fut_by_model
.get_mut(&model_id)
.ok_or(Error::InvalidModelId(model_id))?
.push(process_crdt_operations(
&self.clock,
&self.timestamp_per_device,
&self.db,
device_pub_id.into(),
model_id,
record_id,
ops,
));
let mut compressed_map =
BTreeMap::<Uuid, HashMap<Vec<u8>, (RecordId, Vec<CompressedCRDTOperation>)>>::new();
for CRDTOperation {
device_pub_id,
timestamp,
model_id: _, // Ignoring model_id as we know it already
record_id,
data,
} in ops
{
let records = compressed_map.entry(device_pub_id).or_default();
// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq.
// So we use it's serialized bytes as a key.
let record_id_bytes =
rmp_serde::to_vec_named(&record_id).expect("already serialized to Value");
match records.entry(record_id_bytes) {
Entry::Occupied(mut entry) => {
entry
.get_mut()
.1
.push(CompressedCRDTOperation { timestamp, data });
}
Entry::Vacant(entry) => {
entry.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
}
}
}
for model_id in INGEST_ORDER {
if let Some(futs) = ops_fut_by_model.remove(model_id) {
futs.try_join().await?;
}
let _lock_guard = self.sync_lock.lock().await;
let semaphore = &Arc::new(Semaphore::new(self.available_parallelism));
let handles = compressed_map
.into_iter()
.flat_map(|(device_pub_id, records)| {
records.into_values().map(move |(record_id, ops)| {
// We can process each record in parallel as they are independent
spawn({
let clock = Arc::clone(&self.clock);
let timestamp_per_device = Arc::clone(&self.timestamp_per_device);
let db = Arc::clone(&self.db);
let device_pub_id = device_pub_id.into();
let semaphore = Arc::clone(semaphore);
async move {
let _permit =
semaphore.acquire().await.expect("semaphore never closes");
process_crdt_operations(
&clock,
&timestamp_per_device,
&db,
device_pub_id,
model_id,
record_id,
ops,
)
.await
}
})
})
})
.collect::<Vec<_>>();
for res in handles.join().await {
res.map_err(Error::ProcessCrdtPanic)??;
}
self.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await?;
Ok(())
}
pub async fn ingest_ops(&self) -> Result<(), Error> {
// WARN: this order here exists because sync messages MUST be processed in this exact order
// due to relationship dependencies between these tables.
self.ingest_by_model(prisma_sync::device::MODEL_ID).await?;
(
self.ingest_by_model(prisma_sync::storage_statistics::MODEL_ID),
self.ingest_by_model(prisma_sync::tag::MODEL_ID),
self.ingest_by_model(prisma_sync::location::MODEL_ID),
self.ingest_by_model(prisma_sync::object::MODEL_ID),
self.ingest_by_model(prisma_sync::label::MODEL_ID),
)
.try_join()
.await?;
(
self.ingest_by_model(prisma_sync::exif_data::MODEL_ID),
self.ingest_by_model(prisma_sync::file_path::MODEL_ID),
self.ingest_by_model(prisma_sync::tag_on_object::MODEL_ID),
self.ingest_by_model(prisma_sync::label_on_object::MODEL_ID),
)
.try_join()
.await?;
if self.tx.send(SyncEvent::Ingested).is_err() {
warn!("failed to send ingested message on `ingest_ops`");
}

View File

@ -466,6 +466,7 @@ pub async fn scan_location(
)
.await?
}
ScanState::Indexed => {
node.job_system
.dispatch(
@ -478,6 +479,7 @@ pub async fn scan_location(
)
.await?
}
ScanState::FilesIdentified => {
node.job_system
.dispatch(

View File

@ -46,7 +46,7 @@ impl CompressedCRDTOperationsPerModelPerDevice {
// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq.
// So we use it's serialized bytes as a key.
let record_id_bytes =
rmp_serde::to_vec(&record_id).expect("already serialized to Value");
rmp_serde::to_vec_named(&record_id).expect("already serialized to Value");
match records.entry(record_id_bytes) {
Entry::Occupied(mut entry) => {