mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2025-12-11 20:15:30 +01:00
refactor: pass location_id to indexing responder handlers
Pass location_id parameter to handle_modify, handle_remove, and handle_rename functions for better context and future location-scoped operations.
This commit is contained in:
parent
64e0a0d5a5
commit
5d84520707
@ -224,27 +224,75 @@ if entry.indexed_at.is_some() {
|
||||
|
||||
**Option B: Innermost location wins (efficient)**
|
||||
```rust
|
||||
// In the watcher event dispatch:
|
||||
fn find_deepest_watching_location(path: &Path) -> Option<Uuid> {
|
||||
// Find all locations that contain this path
|
||||
let candidates = self.watched_locations
|
||||
.iter()
|
||||
.filter(|(_, loc)| path.starts_with(&loc.path))
|
||||
.collect::<Vec<_>>();
|
||||
// In the watcher event dispatch or routing:
|
||||
async fn find_deepest_watching_location(
|
||||
&self,
|
||||
event_path: &Path,
|
||||
library_id: Uuid,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<Option<Uuid>> {
|
||||
// NOTE: All locations in watched_locations are already filtered to THIS device
|
||||
// (INDEX-003 Phase 1 ensures only owned locations are watched)
|
||||
|
||||
// Return the one with the longest path (deepest nesting)
|
||||
candidates
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (location_id, watched_loc) in self.watched_locations.read().await.iter() {
|
||||
// Get location's entry record to check tree relationship
|
||||
let location_record = location::Entity::find()
|
||||
.filter(location::Column::Uuid.eq(*location_id))
|
||||
.one(db)
|
||||
.await?;
|
||||
|
||||
if let Some(loc) = location_record {
|
||||
if let Some(root_entry_id) = loc.entry_id {
|
||||
// Check if event path is under this location's entry tree
|
||||
// Use entry_closure and directory_paths, not path string matching
|
||||
if is_path_in_entry_tree(event_path, root_entry_id, db).await? {
|
||||
// Get depth of location's root in the overall entry tree
|
||||
let depth = get_entry_depth(root_entry_id, db).await?;
|
||||
candidates.push((*location_id, depth));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return location with deepest (highest depth value) root entry
|
||||
// Deeper in tree = more nested = should take precedence
|
||||
Ok(candidates
|
||||
.into_iter()
|
||||
.max_by_key(|(_, loc)| loc.path.components().count())
|
||||
.map(|(id, _)| *id)
|
||||
.max_by_key(|(_, depth)| *depth)
|
||||
.map(|(id, _)| id))
|
||||
}
|
||||
|
||||
// Only dispatch event to the innermost location
|
||||
if let Some(location_id) = find_deepest_watching_location(&event.path) {
|
||||
dispatch_to_worker(location_id, event).await;
|
||||
async fn is_path_in_entry_tree(
|
||||
path: &Path,
|
||||
root_entry_id: i32,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<bool> {
|
||||
// Try to resolve the path within this entry tree
|
||||
let path_str = path.to_string_lossy().to_string();
|
||||
|
||||
let result = db
|
||||
.query_one(Statement::from_sql_and_values(
|
||||
DbBackend::Sqlite,
|
||||
r#"
|
||||
SELECT 1
|
||||
FROM directory_paths dp
|
||||
INNER JOIN entry_closure ec ON ec.descendant_id = dp.entry_id
|
||||
WHERE dp.path = ?
|
||||
AND ec.ancestor_id = ?
|
||||
LIMIT 1
|
||||
"#,
|
||||
vec![path_str.into(), root_entry_id.into()],
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(result.is_some())
|
||||
}
|
||||
```
|
||||
|
||||
**Device filtering note**: Since INDEX-003 Phase 1 ensures only this device's locations are loaded into `watched_locations`, we don't need additional device_id filtering here. All locations in the HashMap are guaranteed to be owned by the current device.
|
||||
|
||||
**Recommendation**: Start with Option A (both trigger), optimize to Option B later.
|
||||
|
||||
### 4. Location Deletion - Preserve Shared Entries
|
||||
@ -311,12 +359,12 @@ async fn delete_location(&self, location_id: Uuid, db: &DatabaseConnection) -> R
|
||||
|
||||
**Current sync** (no nesting support):
|
||||
- Location A syncs → creates entries 1-5
|
||||
- Location B syncs → creates duplicate entries 100-102 ❌
|
||||
- Location B syncs → creates duplicate entries 100-102
|
||||
|
||||
**With nesting support**:
|
||||
- Location A syncs → creates entries 1-5 ✅
|
||||
- Location B syncs → just creates location record pointing to existing entry 2 ✅
|
||||
- No entry duplication ✅
|
||||
- Location A syncs → creates entries 1-5
|
||||
- Location B syncs → just creates location record pointing to existing entry 2
|
||||
- No entry duplication
|
||||
|
||||
**Implementation**: Location sync already uses `entry_id` reference, so this works automatically! Just need to ensure receiving device doesn't re-create entries.
|
||||
|
||||
@ -575,8 +623,8 @@ mv /Documents/Work /Documents/Personal/Work
|
||||
**Current behavior**:
|
||||
- Location A's watcher detects rename
|
||||
- Updates entry 2's parent from entry 1 to entry 3 (Personal)
|
||||
- Location B's `entry_id` still points to entry 2 ✅
|
||||
- Location B's path is now wrong ❌
|
||||
- Location B's `entry_id` still points to entry 2
|
||||
- Location B's path is now wrong
|
||||
|
||||
**Solution**: Update location path when root entry moves:
|
||||
```rust
|
||||
@ -636,7 +684,7 @@ fn get_effective_index_mode(path: &Path, db: &DatabaseConnection) -> IndexMode {
|
||||
**With nesting**:
|
||||
- Location B syncs → `entry_id: 2`
|
||||
- Entry 2 might not exist yet on receiving device!
|
||||
- Foreign key constraint violation ❌
|
||||
- Foreign key constraint violation
|
||||
|
||||
**Solution**: Defer nested location sync until parent location syncs:
|
||||
```rust
|
||||
|
||||
@ -45,11 +45,11 @@ pub async fn apply(
|
||||
.await?
|
||||
}
|
||||
FsRawEventKind::Modify { path } => {
|
||||
handle_modify(&ctx, &path, rule_toggles, location_root).await?
|
||||
handle_modify(&ctx, location_id, &path, rule_toggles, location_root).await?
|
||||
}
|
||||
FsRawEventKind::Remove { path } => handle_remove(&ctx, &path).await?,
|
||||
FsRawEventKind::Remove { path } => handle_remove(&ctx, location_id, &path).await?,
|
||||
FsRawEventKind::Rename { from, to } => {
|
||||
handle_rename(&ctx, &from, &to, rule_toggles, location_root).await?
|
||||
handle_rename(&ctx, location_id, &from, &to, rule_toggles, location_root).await?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@ -91,14 +91,16 @@ pub async fn apply_batch(
|
||||
|
||||
// Process removes
|
||||
for path in removes {
|
||||
if let Err(e) = handle_remove(&ctx, &path).await {
|
||||
if let Err(e) = handle_remove(&ctx, location_id, &path).await {
|
||||
tracing::error!("Failed to handle remove for {}: {}", path.display(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// Process renames
|
||||
for (from, to) in renames {
|
||||
if let Err(e) = handle_rename(&ctx, &from, &to, rule_toggles, location_root).await {
|
||||
if let Err(e) =
|
||||
handle_rename(&ctx, location_id, &from, &to, rule_toggles, location_root).await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to handle rename from {} to {}: {}",
|
||||
from.display(),
|
||||
@ -127,7 +129,7 @@ pub async fn apply_batch(
|
||||
|
||||
// Process modifies
|
||||
for path in modifies {
|
||||
if let Err(e) = handle_modify(&ctx, &path, rule_toggles, location_root).await {
|
||||
if let Err(e) = handle_modify(&ctx, location_id, &path, rule_toggles, location_root).await {
|
||||
tracing::error!("Failed to handle modify for {}: {}", path.display(), e);
|
||||
}
|
||||
}
|
||||
@ -135,6 +137,19 @@ pub async fn apply_batch(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the location's root entry ID for scoping queries
|
||||
async fn get_location_root_entry_id(ctx: &impl IndexingCtx, location_id: Uuid) -> Result<i32> {
|
||||
let location_record = entities::location::Entity::find()
|
||||
.filter(entities::location::Column::Uuid.eq(location_id))
|
||||
.one(ctx.library_db())
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("Location not found: {}", location_id))?;
|
||||
|
||||
location_record
|
||||
.entry_id
|
||||
.ok_or_else(|| anyhow::anyhow!("Location {} has no root entry", location_id))
|
||||
}
|
||||
|
||||
/// Check if a path should be filtered based on indexing rules
|
||||
async fn should_filter_path(
|
||||
path: &Path,
|
||||
@ -203,7 +218,7 @@ async fn handle_create(
|
||||
// Minimal state provides parent cache used by EntryProcessor
|
||||
let mut state = IndexerState::new(&crate::domain::addressing::SdPath::local(path));
|
||||
|
||||
// CRITICAL: Seed the location root entry into cache to scope parent lookup
|
||||
// Seed the location root entry into cache to scope parent lookup
|
||||
// This ensures parents are found within THIS location's tree, not another device's location
|
||||
// with the same path. Without this, create_entry could attach to the wrong location's tree.
|
||||
if let Ok(Some(location_record)) = entities::location::Entity::find()
|
||||
@ -272,6 +287,7 @@ async fn handle_create(
|
||||
/// Handle modify: resolve entry ID by path, then update
|
||||
async fn handle_modify(
|
||||
ctx: &impl IndexingCtx,
|
||||
location_id: Uuid,
|
||||
path: &Path,
|
||||
rule_toggles: RuleToggles,
|
||||
location_root: &Path,
|
||||
@ -284,6 +300,9 @@ async fn handle_modify(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Get location root entry ID for scoped queries
|
||||
let location_root_entry_id = get_location_root_entry_id(ctx, location_id).await?;
|
||||
|
||||
// If inode indicates a move, handle as a move and skip update
|
||||
// Responder uses direct filesystem access (None backend) since it reacts to local FS events
|
||||
let meta = EntryProcessor::extract_metadata(path, None).await?;
|
||||
@ -291,7 +310,9 @@ async fn handle_modify(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(entry_id) = resolve_entry_id_by_path(ctx, path).await? {
|
||||
if let Some(entry_id) =
|
||||
resolve_entry_id_by_path_scoped(ctx, path, location_root_entry_id).await?
|
||||
{
|
||||
let dir_entry = DirEntry {
|
||||
path: meta.path,
|
||||
kind: meta.kind,
|
||||
@ -305,9 +326,15 @@ async fn handle_modify(
|
||||
}
|
||||
|
||||
/// Handle remove: resolve entry ID and delete subtree (closure table + cache)
|
||||
async fn handle_remove(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
|
||||
async fn handle_remove(ctx: &impl IndexingCtx, location_id: Uuid, path: &Path) -> Result<()> {
|
||||
debug!("Remove: {}", path.display());
|
||||
if let Some(entry_id) = resolve_entry_id_by_path(ctx, path).await? {
|
||||
|
||||
// Get location root entry ID for scoped queries
|
||||
let location_root_entry_id = get_location_root_entry_id(ctx, location_id).await?;
|
||||
|
||||
if let Some(entry_id) =
|
||||
resolve_entry_id_by_path_scoped(ctx, path, location_root_entry_id).await?
|
||||
{
|
||||
delete_subtree(ctx, entry_id).await?;
|
||||
}
|
||||
Ok(())
|
||||
@ -316,6 +343,7 @@ async fn handle_remove(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
|
||||
/// Handle rename/move: resolve source entry and move via EntryProcessor
|
||||
async fn handle_rename(
|
||||
ctx: &impl IndexingCtx,
|
||||
location_id: Uuid,
|
||||
from: &Path,
|
||||
to: &Path,
|
||||
rule_toggles: RuleToggles,
|
||||
@ -323,6 +351,9 @@ async fn handle_rename(
|
||||
) -> Result<()> {
|
||||
debug!("Rename: {} -> {}", from.display(), to.display());
|
||||
|
||||
// Get location root entry ID for scoped queries
|
||||
let location_root_entry_id = get_location_root_entry_id(ctx, location_id).await?;
|
||||
|
||||
// Check if the destination path should be filtered
|
||||
// If the file is being moved to a filtered location, we should remove it from the database
|
||||
if should_filter_path(to, rule_toggles, location_root).await? {
|
||||
@ -331,9 +362,12 @@ async fn handle_rename(
|
||||
to.display()
|
||||
);
|
||||
// Treat this as a removal of the source file
|
||||
return handle_remove(ctx, from).await;
|
||||
return handle_remove(ctx, location_id, from).await;
|
||||
}
|
||||
if let Some(entry_id) = resolve_entry_id_by_path(ctx, from).await? {
|
||||
|
||||
if let Some(entry_id) =
|
||||
resolve_entry_id_by_path_scoped(ctx, from, location_root_entry_id).await?
|
||||
{
|
||||
debug!("Found entry {} for old path, moving to new path", entry_id);
|
||||
|
||||
// Create state and populate entry_id_cache with parent directories
|
||||
@ -341,7 +375,10 @@ async fn handle_rename(
|
||||
|
||||
// Populate cache with new parent directory if it exists
|
||||
if let Some(new_parent_path) = to.parent() {
|
||||
if let Ok(Some(parent_id)) = resolve_directory_entry_id(ctx, new_parent_path).await {
|
||||
if let Ok(Some(parent_id)) =
|
||||
resolve_directory_entry_id_scoped(ctx, new_parent_path, location_root_entry_id)
|
||||
.await
|
||||
{
|
||||
state
|
||||
.entry_id_cache
|
||||
.insert(new_parent_path.to_path_buf(), parent_id);
|
||||
@ -385,51 +422,73 @@ async fn build_dir_entry(path: &Path) -> Result<DirEntry> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolve an entry ID by absolute path (directory first, then file by parent/name/extension)
|
||||
async fn resolve_entry_id_by_path(ctx: &impl IndexingCtx, abs_path: &Path) -> Result<Option<i32>> {
|
||||
if let Some(id) = resolve_directory_entry_id(ctx, abs_path).await? {
|
||||
return Ok(Some(id));
|
||||
}
|
||||
resolve_file_entry_id(ctx, abs_path).await
|
||||
}
|
||||
|
||||
/// Resolve a directory entry by its full path in the directory_paths table
|
||||
async fn resolve_directory_entry_id(
|
||||
/// Resolve an entry ID by absolute path, scoped to location's entry tree
|
||||
async fn resolve_entry_id_by_path_scoped(
|
||||
ctx: &impl IndexingCtx,
|
||||
abs_path: &Path,
|
||||
location_root_entry_id: i32,
|
||||
) -> Result<Option<i32>> {
|
||||
let path_str = abs_path.to_string_lossy().to_string();
|
||||
|
||||
// CRITICAL: Path alone is ambiguous if multiple devices have same paths
|
||||
// Query could return entries from any device's location
|
||||
// TODO: Scope by location_root to ensure we only find entries in THIS location's tree
|
||||
// For now, this returns the first match (usually correct if only one device has the path)
|
||||
let model = entities::directory_paths::Entity::find()
|
||||
.filter(entities::directory_paths::Column::Path.eq(path_str))
|
||||
.one(ctx.library_db())
|
||||
.await?;
|
||||
Ok(model.map(|m| m.entry_id))
|
||||
if let Some(id) =
|
||||
resolve_directory_entry_id_scoped(ctx, abs_path, location_root_entry_id).await?
|
||||
{
|
||||
return Ok(Some(id));
|
||||
}
|
||||
resolve_file_entry_id_scoped(ctx, abs_path, location_root_entry_id).await
|
||||
}
|
||||
|
||||
/// Resolve a file entry by parent directory path + file name (+ extension)
|
||||
async fn resolve_file_entry_id(ctx: &impl IndexingCtx, abs_path: &Path) -> Result<Option<i32>> {
|
||||
/// Resolve a directory entry by path, scoped to location's entry tree using entry_closure
|
||||
async fn resolve_directory_entry_id_scoped(
|
||||
ctx: &impl IndexingCtx,
|
||||
abs_path: &Path,
|
||||
location_root_entry_id: i32,
|
||||
) -> Result<Option<i32>> {
|
||||
use sea_orm::FromQueryResult;
|
||||
|
||||
let path_str = abs_path.to_string_lossy().to_string();
|
||||
|
||||
// Query directory_paths and JOIN with entry_closure to scope by location
|
||||
// This ensures we only find entries within THIS location's tree
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct DirectoryEntryId {
|
||||
entry_id: i32,
|
||||
}
|
||||
|
||||
let result = DirectoryEntryId::find_by_statement(sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DbBackend::Sqlite,
|
||||
r#"
|
||||
SELECT dp.entry_id
|
||||
FROM directory_paths dp
|
||||
INNER JOIN entry_closure ec ON ec.descendant_id = dp.entry_id
|
||||
WHERE dp.path = ?
|
||||
AND ec.ancestor_id = ?
|
||||
"#,
|
||||
vec![path_str.into(), location_root_entry_id.into()],
|
||||
))
|
||||
.one(ctx.library_db())
|
||||
.await?;
|
||||
|
||||
Ok(result.map(|r| r.entry_id))
|
||||
}
|
||||
|
||||
/// Resolve a file entry by parent directory path + file name, scoped to location's tree
|
||||
async fn resolve_file_entry_id_scoped(
|
||||
ctx: &impl IndexingCtx,
|
||||
abs_path: &Path,
|
||||
location_root_entry_id: i32,
|
||||
) -> Result<Option<i32>> {
|
||||
let parent = match abs_path.parent() {
|
||||
Some(p) => p,
|
||||
None => return Ok(None),
|
||||
};
|
||||
let parent_str = parent.to_string_lossy().to_string();
|
||||
|
||||
// CRITICAL: Same ambiguity issue as resolve_directory_entry_id
|
||||
// TODO: Scope by location to ensure we find the correct parent
|
||||
let parent_dir = match entities::directory_paths::Entity::find()
|
||||
.filter(entities::directory_paths::Column::Path.eq(parent_str))
|
||||
.one(ctx.library_db())
|
||||
.await?
|
||||
{
|
||||
Some(m) => m,
|
||||
None => return Ok(None),
|
||||
};
|
||||
// First resolve parent directory using scoped lookup
|
||||
let parent_id =
|
||||
match resolve_directory_entry_id_scoped(ctx, parent, location_root_entry_id).await? {
|
||||
Some(id) => id,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
// Now find the file entry by parent + name + extension
|
||||
let name = abs_path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
@ -441,7 +500,7 @@ async fn resolve_file_entry_id(ctx: &impl IndexingCtx, abs_path: &Path) -> Resul
|
||||
.map(|s| s.to_lowercase());
|
||||
|
||||
let mut q = entities::entry::Entity::find()
|
||||
.filter(entities::entry::Column::ParentId.eq(parent_dir.entry_id))
|
||||
.filter(entities::entry::Column::ParentId.eq(parent_id))
|
||||
.filter(entities::entry::Column::Name.eq(name));
|
||||
if let Some(e) = ext {
|
||||
q = q.filter(entities::entry::Column::Extension.eq(e));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user