Some tweaks on sync messages send and compression

This commit is contained in:
Ericson Soares 2024-10-20 22:26:06 -03:00
parent f75b3a091d
commit f414e549f8
2 changed files with 61 additions and 19 deletions

View File

@ -34,14 +34,14 @@ use tokio::{
sync::{broadcast, Notify},
time::sleep,
};
use tracing::error;
use tracing::{debug, error};
use uuid::Uuid;
use super::{SyncActors, ONE_MINUTE};
const TEN_SECONDS: Duration = Duration::from_secs(10);
const MESSAGES_COLLECTION_SIZE: u32 = 100_000;
const MESSAGES_COLLECTION_SIZE: u32 = 10_000;
enum RaceNotifiedOrStopped {
Notified,
@ -147,6 +147,8 @@ impl Sender {
}
async fn run_loop_iteration(&mut self) -> Result<LoopStatus, Error> {
debug!("Starting cloud sender actor loop iteration");
let current_device_pub_id = devices::PubId(Uuid::from(&self.sync.device_pub_id));
let (key_hash, secret_key) = self
@ -166,6 +168,11 @@ impl Sender {
let mut status = LoopStatus::Idle;
let mut new_latest_timestamp = current_latest_timestamp;
debug!(
chunk_size = MESSAGES_COLLECTION_SIZE,
"Trying to fetch chunk of sync messages from the database"
);
while let Some(ops_res) = crdt_ops_stream.next().await {
let ops = ops_res?;
@ -173,9 +180,13 @@ impl Sender {
break;
};
debug!("Got first and last sync messages");
#[allow(clippy::cast_possible_truncation)]
let operations_count = ops.len() as u32;
debug!(operations_count, "Got chunk of sync messages");
new_latest_timestamp = last.timestamp;
let start_time = timestamp_to_datetime(first.timestamp);
@ -188,6 +199,16 @@ impl Sender {
let messages_bytes = rmp_serde::to_vec_named(&compressed_ops)
.map_err(Error::SerializationFailureToPushSyncMessages)?;
let encrypted_messages =
encrypt_messages(&secret_key, &mut self.rng, messages_bytes).await?;
let encrypted_messages_size = encrypted_messages.len();
debug!(
operations_count,
encrypted_messages_size, "Sending sync messages to cloud",
);
self.cloud_client
.sync()
.messages()
@ -202,20 +223,22 @@ impl Sender {
key_hash: key_hash.clone(),
operations_count,
time_range: (start_time, end_time),
encrypted_messages: encrypt_messages(
&secret_key,
&mut self.rng,
messages_bytes,
)
.await?,
encrypted_messages,
})
.await??;
debug!(
operations_count,
encrypted_messages_size, "Sent sync messages to cloud",
);
status = LoopStatus::SentMessages;
}
self.maybe_latest_timestamp = Some(new_latest_timestamp);
debug!("Finished cloud sender actor loop iteration");
Ok(status)
}

View File

@ -1,6 +1,6 @@
use crate::{CRDTOperation, CRDTOperationData, DevicePubId, ModelId, RecordId};
use std::collections::BTreeMap;
use std::collections::{hash_map::Entry, BTreeMap, HashMap};
use serde::{Deserialize, Serialize};
use uhlc::NTP64;
@ -17,11 +17,16 @@ pub struct CompressedCRDTOperationsPerModelPerDevice(
);
impl CompressedCRDTOperationsPerModelPerDevice {
/// Creates a new [`CompressedCRDTOperationsPerModelPerDevice`] from a vector of [`CRDTOperation`]s.
///
/// # Panics
///
/// Will panic if for some reason `rmp_serde::to_vec` fails to serialize a `rmpv::Value` to bytes.
#[must_use]
pub fn new(ops: Vec<CRDTOperation>) -> Self {
let mut compressed_map = BTreeMap::<
DevicePubId,
BTreeMap<ModelId, Vec<(RecordId, Vec<CompressedCRDTOperation>)>>,
BTreeMap<ModelId, HashMap<Vec<u8>, (RecordId, Vec<CompressedCRDTOperation>)>>,
>::new();
for CRDTOperation {
@ -38,14 +43,21 @@ impl CompressedCRDTOperationsPerModelPerDevice {
.entry(model_id)
.or_default();
// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq
if let Some((_, ops)) = records
.iter_mut()
.find(|(current_record_id, _)| *current_record_id == record_id)
{
ops.push(CompressedCRDTOperation { timestamp, data });
} else {
records.push((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq.
// So we use it's serialized bytes as a key.
let record_id_bytes =
rmp_serde::to_vec(&record_id).expect("already serialized to Value");
match records.entry(record_id_bytes) {
Entry::Occupied(mut entry) => {
entry
.get_mut()
.1
.push(CompressedCRDTOperation { timestamp, data });
}
Entry::Vacant(entry) => {
entry.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
}
}
}
@ -55,7 +67,14 @@ impl CompressedCRDTOperationsPerModelPerDevice {
.map(|(device_pub_id, model_map)| {
(
device_pub_id,
CompressedCRDTOperationsPerModel(model_map.into_iter().collect()),
CompressedCRDTOperationsPerModel(
model_map
.into_iter()
.map(|(model_id, ops_per_record_map)| {
(model_id, ops_per_record_map.into_values().collect())
})
.collect(),
),
)
})
.collect(),