From 375c11130578256d2ccd1f8febf23d48c4bb039d Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Wed, 9 Oct 2024 22:04:49 -0300 Subject: [PATCH] Some tweaks on sync messages encryption and decryption --- core/crates/cloud-services/src/client.rs | 2 +- .../crates/cloud-services/src/sync/receive.rs | 13 +++++-- core/crates/cloud-services/src/sync/send.rs | 13 +++++-- .../cloud-services/src/token_refresher.rs | 35 ++++++++++++++++++- crates/crypto/src/cloud/encrypt.rs | 6 +++- crates/crypto/src/cloud/secret_key.rs | 26 ++++++++++++++ 6 files changed, 88 insertions(+), 7 deletions(-) diff --git a/core/crates/cloud-services/src/client.rs b/core/crates/cloud-services/src/client.rs index cfb66784c..1c860eba5 100644 --- a/core/crates/cloud-services/src/client.rs +++ b/core/crates/cloud-services/src/client.rs @@ -67,7 +67,7 @@ impl CloudServices { #[cfg(not(debug_assertions))] { - builder = builder.https_only(true); + http_client_builder = http_client_builder.https_only(true); } let cloud_p2p_relay_url = cloud_p2p_relay_url diff --git a/core/crates/cloud-services/src/sync/receive.rs b/core/crates/cloud-services/src/sync/receive.rs index 283507ad4..e7387de4b 100644 --- a/core/crates/cloud-services/src/sync/receive.rs +++ b/core/crates/cloud-services/src/sync/receive.rs @@ -47,7 +47,7 @@ use tokio::{ time::sleep, }; use tokio_util::io::StreamReader; -use tracing::{error, instrument}; +use tracing::{debug, error, instrument}; use uuid::Uuid; use super::{SyncActors, ONE_MINUTE}; @@ -232,6 +232,10 @@ impl Receiver { } } +#[instrument( + skip_all, + fields(%sync_group_pub_id, %original_device_pub_id, operations_count, ?key_hash, %end_time), +)] async fn handle_single_message( sync_group_pub_id: groups::PubId, MessagesCollection { @@ -266,20 +270,25 @@ async fn handle_single_message( .map_err(Error::ErrorResponseDownloadSyncMessages)?; let crdt_ops = if let Some(size) = response.content_length() { + debug!(size, "Received encrypted sync messages collection"); extract_messages_known_size(response, size, secret_key, original_device_pub_id).await } else { + debug!("Received encrypted sync messages collection of unknown size"); extract_messages_unknown_size(response, secret_key, original_device_pub_id).await }?; + assert_eq!( crdt_ops.len(), operations_count as usize, "Sync messages count mismatch" ); + write_cloud_ops_to_db(crdt_ops, &sync.db).await?; + Ok((original_device_pub_id, end_time)) } -#[instrument(skip(response, secret_key), err)] +#[instrument(skip(response, size, secret_key), err)] async fn extract_messages_known_size( response: Response, size: u64, diff --git a/core/crates/cloud-services/src/sync/send.rs b/core/crates/cloud-services/src/sync/send.rs index 680a926b7..4ccaa66db 100644 --- a/core/crates/cloud-services/src/sync/send.rs +++ b/core/crates/cloud-services/src/sync/send.rs @@ -181,6 +181,15 @@ impl Sender { let messages_bytes = postcard::to_stdvec(&compressed_ops) .map_err(Error::SerializationFailureToPushSyncMessages)?; + let plain_text_size = messages_bytes.len(); + let expected_blob_size = if plain_text_size <= EncryptedBlock::PLAIN_TEXT_SIZE { + OneShotEncryption::cipher_text_size(&secret_key, plain_text_size) + } else { + StreamEncryption::cipher_text_size(&secret_key, plain_text_size) + } as u64; + + debug!(?expected_blob_size, ?key_hash, "Preparing sync message"); + let (mut push_updates, mut push_responses) = self .cloud_client .sync() @@ -197,7 +206,7 @@ impl Sender { operations_count, start_time, end_time, - expected_blob_size: messages_bytes.len() as u64, + expected_blob_size, }) .await?; @@ -546,7 +555,7 @@ async fn upload_to_single_url( messages_bytes: Vec, rng: &mut CryptoRng, ) -> Result<(), Error> { - let (cipher_text_size, body) = if messages_bytes.len() > EncryptedBlock::PLAIN_TEXT_SIZE { + let (cipher_text_size, body) = if messages_bytes.len() <= EncryptedBlock::PLAIN_TEXT_SIZE { let EncryptedBlock { nonce, cipher_text } = OneShotEncryption::encrypt(&secret_key, messages_bytes.as_slice(), rng) .map_err(Error::Encrypt)?; diff --git a/core/crates/cloud-services/src/token_refresher.rs b/core/crates/cloud-services/src/token_refresher.rs index 3416f2a87..ae11e15db 100644 --- a/core/crates/cloud-services/src/token_refresher.rs +++ b/core/crates/cloud-services/src/token_refresher.rs @@ -320,6 +320,8 @@ impl Runner { #[cfg(test)] mod tests { use reqwest::header; + use reqwest_middleware::ClientBuilder; + use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use serde_json::json; use crate::AUTH_SERVER_URL; @@ -403,7 +405,7 @@ mod tests { let client = reqwest::Client::new(); let response = client - .post("http://localhost:9420/api/auth/session/refresh") + .post(format!("{AUTH_SERVER_URL}/api/auth/session/refresh")) .header("rid", "session") .header(header::AUTHORIZATION, format!("Bearer {refresh_token}")) .send() @@ -432,4 +434,35 @@ mod tests { refresh_token.as_str() ); } + + #[ignore = "Needs an actual SuperTokens auth server running"] + #[tokio::test] + async fn test_refresher_runner() { + let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3)); + + let http_client = ClientBuilder::new(http_client_builder.build().unwrap()) + .with(RetryTransientMiddleware::new_with_policy( + ExponentialBackoff::builder().build_with_max_retries(3), + )) + .build(); + + let (refresh_tx, _refresh_rx) = flume::bounded(1); + + let mut runner = Runner { + initialized: false, + http_client, + refresh_url: Url::parse(&format!("{AUTH_SERVER_URL}/api/auth/session/refresh")) + .unwrap(), + current_token: None, + current_refresh_token: None, + token_decoding_buffer: Vec::new(), + refresh_tx, + }; + + let (access_token, refresh_token) = get_tokens().await; + + runner.init(access_token, refresh_token).await.unwrap(); + + runner.refresh().await.unwrap(); + } } diff --git a/crates/crypto/src/cloud/encrypt.rs b/crates/crypto/src/cloud/encrypt.rs index a11dcb46e..096c8c928 100644 --- a/crates/crypto/src/cloud/encrypt.rs +++ b/crates/crypto/src/cloud/encrypt.rs @@ -1,5 +1,5 @@ use crate::{ - primitives::{EncryptedBlock, StreamNonce}, + primitives::{EncryptedBlock, OneShotNonce, StreamNonce}, Error, }; @@ -14,6 +14,10 @@ use super::secret_key::SecretKey; pub trait OneShotEncryption { fn encrypt(&self, plaintext: &[u8], rng: &mut impl CryptoRng) -> Result; + + fn cipher_text_size(&self, plain_text_size: usize) -> usize { + size_of::() + plain_text_size + size_of::() + } } pub trait StreamEncryption { diff --git a/crates/crypto/src/cloud/secret_key.rs b/crates/crypto/src/cloud/secret_key.rs index d816acb28..c1df94f9f 100644 --- a/crates/crypto/src/cloud/secret_key.rs +++ b/crates/crypto/src/cloud/secret_key.rs @@ -178,6 +178,32 @@ mod tests { assert_eq!(message, decrypted_message.as_slice()); } + #[test] + fn one_shot_ref_test() { + use super::super::{decrypt::OneShotDecryption, encrypt::OneShotEncryption}; + let mut rng = CryptoRng::new().unwrap(); + + let message = b"Eu queria um apartamento no Guarujah; \ + Mas o melhor que eu consegui foi um barraco em Itaquah."; + + let key = SecretKey::generate(&mut rng); + + let EncryptedBlock { nonce, cipher_text } = key.encrypt(message, &mut rng).unwrap(); + + let mut bytes = Vec::with_capacity(nonce.len() + cipher_text.len()); + bytes.extend_from_slice(&nonce); + bytes.extend(cipher_text); + + assert_eq!( + bytes.len(), + OneShotEncryption::cipher_text_size(&key, message.len()) + ); + + let decrypted_message = key.decrypt(bytes.as_slice().into()).unwrap(); + + assert_eq!(message, decrypted_message.as_slice()); + } + async fn stream_test(rng: &mut CryptoRng, message: &[u8]) { use super::super::{decrypt::StreamDecryption, encrypt::StreamEncryption};