Some tweaks on sync messages encryption and decryption

This commit is contained in:
Ericson Soares 2024-10-09 22:04:49 -03:00
parent 620000c5f2
commit 375c111305
6 changed files with 88 additions and 7 deletions

View File

@ -67,7 +67,7 @@ impl CloudServices {
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
{ {
builder = builder.https_only(true); http_client_builder = http_client_builder.https_only(true);
} }
let cloud_p2p_relay_url = cloud_p2p_relay_url let cloud_p2p_relay_url = cloud_p2p_relay_url

View File

@ -47,7 +47,7 @@ use tokio::{
time::sleep, time::sleep,
}; };
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use tracing::{error, instrument}; use tracing::{debug, error, instrument};
use uuid::Uuid; use uuid::Uuid;
use super::{SyncActors, ONE_MINUTE}; use super::{SyncActors, ONE_MINUTE};
@ -232,6 +232,10 @@ impl Receiver {
} }
} }
#[instrument(
skip_all,
fields(%sync_group_pub_id, %original_device_pub_id, operations_count, ?key_hash, %end_time),
)]
async fn handle_single_message( async fn handle_single_message(
sync_group_pub_id: groups::PubId, sync_group_pub_id: groups::PubId,
MessagesCollection { MessagesCollection {
@ -266,20 +270,25 @@ async fn handle_single_message(
.map_err(Error::ErrorResponseDownloadSyncMessages)?; .map_err(Error::ErrorResponseDownloadSyncMessages)?;
let crdt_ops = if let Some(size) = response.content_length() { let crdt_ops = if let Some(size) = response.content_length() {
debug!(size, "Received encrypted sync messages collection");
extract_messages_known_size(response, size, secret_key, original_device_pub_id).await extract_messages_known_size(response, size, secret_key, original_device_pub_id).await
} else { } else {
debug!("Received encrypted sync messages collection of unknown size");
extract_messages_unknown_size(response, secret_key, original_device_pub_id).await extract_messages_unknown_size(response, secret_key, original_device_pub_id).await
}?; }?;
assert_eq!( assert_eq!(
crdt_ops.len(), crdt_ops.len(),
operations_count as usize, operations_count as usize,
"Sync messages count mismatch" "Sync messages count mismatch"
); );
write_cloud_ops_to_db(crdt_ops, &sync.db).await?; write_cloud_ops_to_db(crdt_ops, &sync.db).await?;
Ok((original_device_pub_id, end_time)) Ok((original_device_pub_id, end_time))
} }
#[instrument(skip(response, secret_key), err)] #[instrument(skip(response, size, secret_key), err)]
async fn extract_messages_known_size( async fn extract_messages_known_size(
response: Response, response: Response,
size: u64, size: u64,

View File

@ -181,6 +181,15 @@ impl Sender {
let messages_bytes = postcard::to_stdvec(&compressed_ops) let messages_bytes = postcard::to_stdvec(&compressed_ops)
.map_err(Error::SerializationFailureToPushSyncMessages)?; .map_err(Error::SerializationFailureToPushSyncMessages)?;
let plain_text_size = messages_bytes.len();
let expected_blob_size = if plain_text_size <= EncryptedBlock::PLAIN_TEXT_SIZE {
OneShotEncryption::cipher_text_size(&secret_key, plain_text_size)
} else {
StreamEncryption::cipher_text_size(&secret_key, plain_text_size)
} as u64;
debug!(?expected_blob_size, ?key_hash, "Preparing sync message");
let (mut push_updates, mut push_responses) = self let (mut push_updates, mut push_responses) = self
.cloud_client .cloud_client
.sync() .sync()
@ -197,7 +206,7 @@ impl Sender {
operations_count, operations_count,
start_time, start_time,
end_time, end_time,
expected_blob_size: messages_bytes.len() as u64, expected_blob_size,
}) })
.await?; .await?;
@ -546,7 +555,7 @@ async fn upload_to_single_url(
messages_bytes: Vec<u8>, messages_bytes: Vec<u8>,
rng: &mut CryptoRng, rng: &mut CryptoRng,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (cipher_text_size, body) = if messages_bytes.len() > EncryptedBlock::PLAIN_TEXT_SIZE { let (cipher_text_size, body) = if messages_bytes.len() <= EncryptedBlock::PLAIN_TEXT_SIZE {
let EncryptedBlock { nonce, cipher_text } = let EncryptedBlock { nonce, cipher_text } =
OneShotEncryption::encrypt(&secret_key, messages_bytes.as_slice(), rng) OneShotEncryption::encrypt(&secret_key, messages_bytes.as_slice(), rng)
.map_err(Error::Encrypt)?; .map_err(Error::Encrypt)?;

View File

@ -320,6 +320,8 @@ impl Runner {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use reqwest::header; use reqwest::header;
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde_json::json; use serde_json::json;
use crate::AUTH_SERVER_URL; use crate::AUTH_SERVER_URL;
@ -403,7 +405,7 @@ mod tests {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let response = client let response = client
.post("http://localhost:9420/api/auth/session/refresh") .post(format!("{AUTH_SERVER_URL}/api/auth/session/refresh"))
.header("rid", "session") .header("rid", "session")
.header(header::AUTHORIZATION, format!("Bearer {refresh_token}")) .header(header::AUTHORIZATION, format!("Bearer {refresh_token}"))
.send() .send()
@ -432,4 +434,35 @@ mod tests {
refresh_token.as_str() refresh_token.as_str()
); );
} }
#[ignore = "Needs an actual SuperTokens auth server running"]
#[tokio::test]
async fn test_refresher_runner() {
let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3));
let http_client = ClientBuilder::new(http_client_builder.build().unwrap())
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder().build_with_max_retries(3),
))
.build();
let (refresh_tx, _refresh_rx) = flume::bounded(1);
let mut runner = Runner {
initialized: false,
http_client,
refresh_url: Url::parse(&format!("{AUTH_SERVER_URL}/api/auth/session/refresh"))
.unwrap(),
current_token: None,
current_refresh_token: None,
token_decoding_buffer: Vec::new(),
refresh_tx,
};
let (access_token, refresh_token) = get_tokens().await;
runner.init(access_token, refresh_token).await.unwrap();
runner.refresh().await.unwrap();
}
} }

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
primitives::{EncryptedBlock, StreamNonce}, primitives::{EncryptedBlock, OneShotNonce, StreamNonce},
Error, Error,
}; };
@ -14,6 +14,10 @@ use super::secret_key::SecretKey;
pub trait OneShotEncryption { pub trait OneShotEncryption {
fn encrypt(&self, plaintext: &[u8], rng: &mut impl CryptoRng) -> Result<EncryptedBlock, Error>; fn encrypt(&self, plaintext: &[u8], rng: &mut impl CryptoRng) -> Result<EncryptedBlock, Error>;
fn cipher_text_size(&self, plain_text_size: usize) -> usize {
size_of::<OneShotNonce>() + plain_text_size + size_of::<Tag>()
}
} }
pub trait StreamEncryption { pub trait StreamEncryption {

View File

@ -178,6 +178,32 @@ mod tests {
assert_eq!(message, decrypted_message.as_slice()); assert_eq!(message, decrypted_message.as_slice());
} }
#[test]
fn one_shot_ref_test() {
use super::super::{decrypt::OneShotDecryption, encrypt::OneShotEncryption};
let mut rng = CryptoRng::new().unwrap();
let message = b"Eu queria um apartamento no Guarujah; \
Mas o melhor que eu consegui foi um barraco em Itaquah.";
let key = SecretKey::generate(&mut rng);
let EncryptedBlock { nonce, cipher_text } = key.encrypt(message, &mut rng).unwrap();
let mut bytes = Vec::with_capacity(nonce.len() + cipher_text.len());
bytes.extend_from_slice(&nonce);
bytes.extend(cipher_text);
assert_eq!(
bytes.len(),
OneShotEncryption::cipher_text_size(&key, message.len())
);
let decrypted_message = key.decrypt(bytes.as_slice().into()).unwrap();
assert_eq!(message, decrypted_message.as_slice());
}
async fn stream_test(rng: &mut CryptoRng, message: &[u8]) { async fn stream_test(rng: &mut CryptoRng, message: &[u8]) {
use super::super::{decrypt::StreamDecryption, encrypt::StreamEncryption}; use super::super::{decrypt::StreamDecryption, encrypt::StreamEncryption};