cyb/optica/src/server/reload.rs

// ---
// tags: optica, rust
// crystal-type: source
// crystal-domain: comp
// ---
use crate::config::SiteConfig;
use crate::parser::{ParsedPage, PageId};
use crate::render::RenderedPage;
use anyhow::Result;
use colored::Colorize;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc};
use std::time::{Duration, SystemTime};

/// Reload script with the server's current build version baked in.
/// Without this, a freshly-loaded page starts at knownVersion = 0;
/// if the server has already incremented past 0 (any rebuild during
/// the session), the next poll sees server > client, the client
/// reloads, the new page again starts at 0, and the loop repeats β€”
/// a tight reload-storm we can't recover from.
pub fn reload_script(current_version: u64) -> String {
    format!(
        r#"<script>
(function() {{
  let knownVersion = {current_version};
  function tick() {{
    fetch('/__reload?v=' + knownVersion, {{ cache: 'no-store' }})
      .then(function (r) {{ return r.text(); }})
      .then(function (body) {{
        const parts = body.trim().split(':');
        const action = parts[0];
        const v = parts.length > 1 ? parseInt(parts[1]) || 0 : 0;
        if (action === 'reload') {{
          window.location.reload();
          return;
        }}
        if (v > 0) knownVersion = v;
        setTimeout(tick, 1500);
      }})
      .catch(function () {{ setTimeout(tick, 5000); }});
  }}
  tick();
}})();
</script>"#
    )
}

/// Cached state for incremental rebuilds.
struct BuildCache {
    /// source_path β†’ (mtime, ParsedPage) β€” skip re-parsing unchanged files
    parse_cache: HashMap<PathBuf, (SystemTime, ParsedPage)>,
    /// source_path β†’ DiscoveredFile β€” needed by fast path to re-parse without scanning
    file_cache: HashMap<PathBuf, crate::scanner::DiscoveredFile>,
    /// page_id β†’ RenderedPage β€” skip re-rendering unchanged pages
    render_cache: HashMap<PageId, RenderedPage>,
    /// page_id β†’ content_md hash β€” detect content changes
    content_hashes: HashMap<PageId, u64>,
    /// page_id β†’ tags hash β€” detect tag changes
    tag_hashes: HashMap<PageId, u64>,
    /// page_id β†’ meta hash (title + aliases + icon + stake + tags) β€” detect frontmatter changes
    meta_hashes: HashMap<PageId, u64>,
    /// page_id β†’ outgoing links hash β€” detect link changes
    link_hashes: HashMap<PageId, u64>,
    /// page_id β†’ sorted backlink ids β€” detect backlink changes
    backlink_snapshots: HashMap<PageId, Vec<PageId>>,
    /// Content page IDs from the last build (excludes stubs) β€” detect structural changes
    last_content_page_ids: HashSet<PageId>,
    /// Namespace tree children-sets from last build. Mapping namespace_key β†’
    /// set of page ids that were children of that namespace last time.
    /// Detect namespace parent changes by set-difference, so a parent is
    /// marked dirty when a child moves IN or OUT, not only when a current
    /// child re-parses.
    last_namespace_children: HashMap<String, HashSet<PageId>>,
    /// Whether the initial full build has completed
    initialized: bool,
    /// Cached subgraph parsed pages, keyed by subgraph name. Only the touched
    /// subgraph is re-ingested per reload; the rest are reused. Sidesteps the
    /// 30-60s "any write in any of 27 repos rebuilds the world" pathology.
    subgraph_pages_by_name: HashMap<String, Vec<crate::parser::ParsedPage>>,
    /// Subgraph name β†’ repo path. Used to attribute a changed file to its
    /// owning subgraph in O(decls) without recomputing globs.
    subgraph_repo_by_name: HashMap<String, PathBuf>,
    /// Cached graph store β€” reused when no structural change (avoids expensive PageRank/gravity)
    cached_store: Option<crate::graph::PageStore>,
    /// Names of subgraphs declared private in the optional --subgraphs TOML.
    /// Applied to every store after build_graph so visibility survives reload.
    private_subgraph_names: HashSet<String>,
    /// Per-subgraph parse cache: path β†’ (mtime, parsed page). Only re-parses
    /// files whose mtime changed, cutting large-subgraph re-ingest from 30s to <1s.
    subgraph_parse_caches: HashMap<String, crate::scanner::subgraph::SubgraphParseCache>,
}

impl BuildCache {
    fn new() -> Self {
        Self {
            parse_cache: HashMap::new(),
            file_cache: HashMap::new(),
            render_cache: HashMap::new(),
            content_hashes: HashMap::new(),
            tag_hashes: HashMap::new(),
            meta_hashes: HashMap::new(),
            link_hashes: HashMap::new(),
            backlink_snapshots: HashMap::new(),
            last_content_page_ids: HashSet::new(),
            last_namespace_children: HashMap::new(),
            initialized: false,
            subgraph_pages_by_name: HashMap::new(),
            subgraph_repo_by_name: HashMap::new(),
            cached_store: None,
            private_subgraph_names: HashSet::new(),
            subgraph_parse_caches: HashMap::new(),
        }
    }
}

/// Simple hash for content change detection (not cryptographic).
fn hash_str(s: &str) -> u64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut h = DefaultHasher::new();
    s.hash(&mut h);
    h.finish()
}

/// Hash frontmatter metadata fields that affect other pages (title, aliases, icon, stake, tags).
fn hash_meta(page: &ParsedPage) -> u64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut h = DefaultHasher::new();
    page.meta.title.hash(&mut h);
    let mut aliases = page.meta.aliases.clone();
    aliases.sort();
    aliases.join(",").hash(&mut h);
    page.meta.icon.hash(&mut h);
    page.meta.stake.hash(&mut h);
    let mut tags = page.meta.tags.clone();
    tags.sort();
    tags.join(",").hash(&mut h);
    h.finish()
}

/// Hash outgoing links to detect when a page's link set changes (affects backlinks on targets).
fn hash_links(page: &ParsedPage) -> u64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut h = DefaultHasher::new();
    let mut sorted_links = page.outgoing_links.clone();
    sorted_links.sort();
    sorted_links.hash(&mut h);
    h.finish()
}

/// Start a background thread that watches for file changes and rebuilds.
/// Increments `build_version` after each successful rebuild so SSE clients know to reload.
pub fn start_watch_rebuild(
    config: SiteConfig,
    build_version: Arc<AtomicU64>,
    subgraphs_path: Option<PathBuf>,
) {
    std::thread::spawn(move || {
        if let Err(e) = watch_and_rebuild_loop(&config, &build_version, subgraphs_path.as_deref()) {
            eprintln!("  {} File watcher error: {}", "Error".red(), e);
        }
    });
}

fn watch_and_rebuild_loop(
    config: &SiteConfig,
    build_version: &Arc<AtomicU64>,
    subgraphs_path: Option<&Path>,
) -> Result<()> {
    use notify::Watcher;

    // Channel now carries file paths so we know what changed
    let (tx, rx) = mpsc::channel::<Vec<PathBuf>>();

    let mut watcher =
        notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
            if let Ok(event) = res {
                // Accept all event kinds β€” Create, Modify, Remove, plus Any/Other.
                // On macOS, FSEvents may emit EventKind::Other with Flag::Rescan
                // when the kernel drops events; ignoring these loses file creations.
                if !event.paths.is_empty() {
                    let _ = tx.send(event.paths);
                }
            }
        })
        .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;

    // Watch graph directory (primary: "root", fallback: "graph", then "pages")
    let graph_dir = {
        let root = config.build.input_dir.join("root");
        let graph = config.build.input_dir.join("graph");
        if root.exists() { root } else if graph.exists() { graph } else { config.build.input_dir.join("pages") }
    };
    // Watch blog directory (primary: "blog", fallback: "journals")
    let blog_dir = {
        let primary = config.build.input_dir.join("blog");
        if primary.exists() { primary } else { config.build.input_dir.join("journals") }
    };

    if graph_dir.exists() {
        watcher.watch(&graph_dir, notify::RecursiveMode::Recursive)?;
    }
    if blog_dir.exists() {
        watcher.watch(&blog_dir, notify::RecursiveMode::Recursive)?;
    }

    // Watch subgraph repo directories for changes
    {
        let subgraph_decls = crate::scanner::subgraph::load_subgraph_decls(subgraphs_path)?;
        // Canonicalise the workspace root and the build output dir
        // so we can compare paths reliably. Without this, watching
        // the root subgraph (when its repo IS the workspace) creates
        // a feedback loop: every build writes to <root>/build, the
        // watcher fires events for those writes, and rebuild kicks
        // off again. Path-string filters miss directory-level events
        // (Foo macOS sends events for /…/cyber, not /…/cyber/build/).
        let workspace_root = config.build.input_dir.canonicalize()
            .unwrap_or_else(|_| config.build.input_dir.clone());
        let output_dir = config.build.output_dir.canonicalize()
            .unwrap_or_else(|_| config.build.output_dir.clone());
        for decl in &subgraph_decls {
            if !decl.repo_path.exists() { continue; }
            let canon = decl.repo_path.canonicalize()
                .unwrap_or_else(|_| decl.repo_path.clone());
            // Skip the root subgraph β€” it's already covered by the
            // graph_dir watch and its output_dir would feed back.
            if canon == workspace_root {
                eprintln!("  {} Skip subgraph '{}': workspace root (already watched via graph_dir)", "Watch".dimmed(), decl.name);
                continue;
            }
            // Skip any subgraph whose repo path contains or equals
            // the output dir (defensive, in case someone configures
            // an output dir inside a subgraph).
            if canon.starts_with(&output_dir) || output_dir.starts_with(&canon) {
                eprintln!("  {} Skip subgraph '{}': overlaps output dir", "Watch".dimmed(), decl.name);
                continue;
            }
            eprintln!("  {} Watching subgraph '{}': {}", "Watch".dimmed(), decl.name, decl.repo_path.display());
            watcher.watch(&decl.repo_path, notify::RecursiveMode::Recursive)?;
        }
    }

    let mut cache = BuildCache::new();
    if let Some(path) = subgraphs_path {
        cache.private_subgraph_names =
            crate::scanner::subgraph_config::load_private_names(path);
    }

    // Warm up: pre-populate subgraph cache so first incremental rebuild is fast.
    // Without this, the first file change triggers a full re-render of all 12K pages.
    {
        let discovered = crate::scanner::scan(&config.build.input_dir, &config.content)?;
        let mut root_parsed = crate::parser::parse_all(&discovered)?;
        let subgraph_decls = crate::scanner::subgraph::load_subgraph_decls(subgraphs_path)?;
        for decl in &subgraph_decls {
            cache.subgraph_repo_by_name
                .insert(decl.name.clone(), decl.repo_path.clone());
            let sg_cache = cache.subgraph_parse_caches.entry(decl.name.clone()).or_default();
            let ingestion =
                crate::scanner::subgraph::ingest_subgraph_cached(decl, &mut root_parsed, sg_cache)?;
            cache.subgraph_pages_by_name
                .insert(decl.name.clone(), ingestion.pages);
        }
        // Pre-populate parse cache and file cache for root graph files
        for file in discovered.pages.iter().chain(discovered.journals.iter()) {
            let mtime = std::fs::metadata(&file.path)
                .ok()
                .and_then(|m| m.modified().ok())
                .unwrap_or(SystemTime::UNIX_EPOCH);
            cache.file_cache.insert(file.path.clone(), file.clone());
            if let Ok(page) = crate::parser::parse_file(file) {
                cache.parse_cache.insert(file.path.clone(), (mtime, page));
            }
        }
        // Build the full graph to snapshot page IDs (including stubs) and hashes.
        // This matches what incremental_rebuild produces, preventing false structural changes.
        {
            let mut warmup_pages: Vec<ParsedPage> = cache.parse_cache.values()
                .map(|(_, p)| p.clone())
                .collect();
            // Add non-md files
            let non_md = crate::parser::parse_all(&crate::scanner::DiscoveredFiles {
                pages: vec![],
                journals: vec![],
                media: discovered.media.clone(),
                files: discovered.files.clone(),
            })?;
            warmup_pages.extend(non_md);
            // Add subgraph pages and enforce namespace monopoly
            let subgraph_namespaces: Vec<String> =
                subgraph_decls.iter().map(|d| d.name.clone()).collect();
            crate::scanner::subgraph::enforce_namespace_monopoly(
                &mut warmup_pages,
                &subgraph_namespaces,
            );
            for decl in &subgraph_decls {
                warmup_pages.retain(|p| {
                    !(p.id == decl.declaring_page_id && p.subgraph.is_none())
                });
            }
            for pages in cache.subgraph_pages_by_name.values() {
                warmup_pages.extend(pages.iter().cloned());
            }
            // Snapshot content page IDs and hashes BEFORE graph build.
            // This must match what incremental_rebuild hashes from all_parsed (pre-graph).
            // Warmup must produce the EXACT same set + content as incremental_rebuild,
            // otherwise the first reload sees fake structural drift and pays the full
            // 30-60s graph rebuild cost. Mirror the same pipeline order:
            //   synthesize_dir_indexes β†’ apply_ipfs_rewrites β†’ snapshot hashes.
            let subgraph_names: Vec<String> =
                subgraph_decls.iter().map(|d| d.name.clone()).collect();
            crate::parser::synthesize_dir_indexes(&mut warmup_pages, &subgraph_names);
            crate::parser::apply_ipfs_rewrites_for_config(&mut warmup_pages, config)?;
            for page in &warmup_pages {
                cache.last_content_page_ids.insert(page.id.clone());
                cache.content_hashes.insert(page.id.clone(), hash_str(&page.content_md));
                cache.tag_hashes.insert(page.id.clone(), hash_str(&page.meta.tags.join(",")));
                cache.meta_hashes.insert(page.id.clone(), hash_meta(page));
                cache.link_hashes.insert(page.id.clone(), hash_links(page));
            }
            // Build graph to get full store (with stubs, links, PageRank, etc.)
            let mut store = crate::graph::build_graph(warmup_pages)?;
            store.subgraph_private = cache.private_subgraph_names.clone();
            // Snapshot backlinks
            for (page_id, backlinks) in &store.backlinks {
                let mut sorted = backlinks.clone();
                sorted.sort();
                cache.backlink_snapshots.insert(page_id.clone(), sorted);
            }
            // Snapshot namespace tree children-sets
            cache.last_namespace_children = store
                .namespace_tree
                .iter()
                .map(|(k, v)| (k.clone(), v.iter().cloned().collect()))
                .collect();
            // Cache the store so fast path can reuse it
            cache.cached_store = Some(store);
        }

        // Pre-render all pages into render_cache. Without this, the first
        // live-reload edit pays a 50s cost rendering every page from scratch
        // (build_site already rendered them to disk but the in-memory cache
        // was empty). The cost is paid once at startup instead, so every edit
        // afterwards stays sub-second.
        if let Some(store) = cache.cached_store.as_ref() {
            let _ = crate::render::render_cached(
                store,
                config,
                &mut cache.render_cache,
                None,
            )?;
        }

        cache.initialized = true;
    }

    loop {
        if let Ok(paths) = rx.recv() {
            // Debounce: wait 100ms and collect all changed paths
            std::thread::sleep(Duration::from_millis(100));
            let mut changed: HashSet<PathBuf> = paths.into_iter().collect();
            while let Ok(more) = rx.try_recv() {
                changed.extend(more);
            }

            // Filter to content files, excluding .git/target/node_modules/build.
            // Accept any file (not just .md) β€” the scanner handles file type classification.
            changed.retain(|p| {
                let path_str = p.to_string_lossy();
                !path_str.contains("/.git/")
                    && !path_str.contains("/target/")
                    && !path_str.contains("/node_modules/")
                    && !path_str.contains("/build/")
            });

            if changed.is_empty() {
                continue;
            }

            let n = changed.len();
            let start = std::time::Instant::now();

            // Try fast path first (skips filesystem scan for content-only edits)
            if let Some(result) = try_fast_path(config, &mut cache, &changed) {
                match result {
                    Ok((rendered_count, dirty_count)) => {
                        let elapsed = start.elapsed();
                        build_version.fetch_add(1, Ordering::SeqCst);
                        eprintln!(
                            "  {} {} file{} β†’ fast rebuild {}/{} pages in {:.2}s",
                            "Done".green(),
                            n,
                            if n == 1 { "" } else { "s" },
                            dirty_count,
                            rendered_count,
                            elapsed.as_secs_f64()
                        );
                    }
                    Err(e) => {
                        eprintln!("  {} Fast rebuild failed: {}", "Error".red(), e);
                    }
                }
                continue;
            }

            // Full incremental rebuild (with scan)
            eprintln!(
                "  {} {} file{} changed, rebuilding...",
                "Watch".yellow(),
                n,
                if n == 1 { "" } else { "s" }
            );

            match incremental_rebuild(config, &mut cache, &changed, subgraphs_path) {
                Ok((rendered_count, dirty_count)) => {
                    let elapsed = start.elapsed();
                    build_version.fetch_add(1, Ordering::SeqCst);
                    eprintln!(
                        "  {} Rebuilt {}/{} pages in {:.2}s",
                        "Done".green(),
                        dirty_count,
                        rendered_count,
                        elapsed.as_secs_f64()
                    );
                }
                Err(e) => {
                    eprintln!("  {} Rebuild failed: {}", "Error".red(), e);
                }
            }
        }
    }
}

/// Fast path for content-only edits: skip the full filesystem scan entirely.
/// Returns Some((total, dirty)) on success, None if the fast path doesn't apply.
fn try_fast_path(
    config: &SiteConfig,
    cache: &mut BuildCache,
    changed_paths: &HashSet<PathBuf>,
) -> Option<Result<(usize, usize)>> {
    // Preconditions: cache must be initialized with a cached store
    if !cache.initialized || cache.cached_store.is_none() {
        return None;
    }

    // All changed paths must be known .md files already in our caches (not new, not deleted)
    for path in changed_paths {
        if !path.exists() {
            return None; // File was deleted β€” need full scan
        }
        if !cache.file_cache.contains_key(path) {
            return None; // Unknown file (new file?) β€” need full scan
        }
        if !cache.parse_cache.contains_key(path) {
            return None; // Not previously parsed β€” need full scan
        }
    }

    // No subgraph files
    if changed_paths.iter().any(|p| {
        cache.subgraph_repo_by_name.values().any(|repo| p.starts_with(repo))
    }) {
        return None;
    }

    // Re-parse changed files into a temporary buffer BEFORE touching caches.
    // If we detect structural changes (meta/links), we bail to incremental_rebuild β€”
    // and the caches must still hold OLD hashes so incremental_rebuild correctly
    // detects the changes. Updating caches before bail was the live-reload bug:
    // incremental_rebuild would see updated hashes and think nothing changed.
    struct PendingUpdate {
        path: PathBuf,
        mtime: SystemTime,
        page: ParsedPage,
        page_id: PageId,
        new_content: u64,
        new_meta: u64,
        new_links: u64,
        new_tags: u64,
    }

    let mut pending: Vec<PendingUpdate> = Vec::new();
    let mut dirty_ids: HashSet<PageId> = HashSet::new();
    let mut meta_changed = false;
    let mut links_changed = false;

    for path in changed_paths {
        let file = cache.file_cache.get(path).unwrap().clone();
        let page = match crate::parser::parse_file(&file) {
            Ok(p) => p,
            Err(e) => return Some(Err(e)),
        };
        let mtime = std::fs::metadata(path)
            .ok()
            .and_then(|m| m.modified().ok())
            .unwrap_or(SystemTime::UNIX_EPOCH);

        let page_id = page.id.clone();
        let new_content = hash_str(&page.content_md);
        let new_meta = hash_meta(&page);
        let new_links = hash_links(&page);
        let new_tags = hash_str(&page.meta.tags.join(","));

        if cache.content_hashes.get(&page_id).copied() != Some(new_content) {
            dirty_ids.insert(page_id.clone());
        }
        if cache.meta_hashes.get(&page_id).copied() != Some(new_meta) {
            dirty_ids.insert(page_id.clone());
            meta_changed = true;
        }
        if cache.link_hashes.get(&page_id).copied() != Some(new_links) {
            dirty_ids.insert(page_id.clone());
            links_changed = true;
        }

        pending.push(PendingUpdate {
            path: path.clone(),
            mtime,
            page,
            page_id,
            new_content,
            new_meta,
            new_links,
            new_tags,
        });
    }

    // If meta/links changed, this is structural β€” bail to full rebuild.
    // Caches are UNTOUCHED so incremental_rebuild will correctly detect changes.
    if meta_changed || links_changed {
        return None;
    }

    // Commit: update caches now that we know we're staying on the fast path
    for update in &pending {
        cache.content_hashes.insert(update.page_id.clone(), update.new_content);
        cache.meta_hashes.insert(update.page_id.clone(), update.new_meta);
        cache.link_hashes.insert(update.page_id.clone(), update.new_links);
        cache.tag_hashes.insert(update.page_id.clone(), update.new_tags);
        cache.parse_cache.insert(update.path.clone(), (update.mtime, update.page.clone()));
    }

    if dirty_ids.is_empty() {
        return Some(Ok((0, 0)));
    }

    // Content-only change: update cached store in-place and re-render dirty pages
    {
        let store = cache.cached_store.as_mut().unwrap();
        for dirty_id in &dirty_ids {
            // Find the freshly-parsed page in parse_cache
            if let Some((_, new_page)) = cache.parse_cache.values()
                .find(|(_, p)| p.id == **dirty_id)
            {
                if let Some(cached_page) = store.pages.get_mut(dirty_id) {
                    cached_page.content_md = new_page.content_md.clone();
                    cached_page.meta = new_page.meta.clone();
                    cached_page.outgoing_links = new_page.outgoing_links.clone();
                }
            }
        }
    }

    let store = cache.cached_store.as_ref().unwrap();

    // Render only the dirty pages (not all pages β€” avoids iterating 11K cached entries)
    let dirty_count = dirty_ids.len();
    let mut rendered_dirty = Vec::with_capacity(dirty_count);
    {
        let env = match crate::render::make_template_env(config) {
            Ok(e) => e,
            Err(e) => return Some(Err(e)),
        };
        for dirty_id in &dirty_ids {
            if let Some(page) = store.pages.get(dirty_id) {
                if !crate::graph::PageStore::is_page_public(page, &config.content) {
                    continue;
                }
                let rp = match crate::render::render_single_page(page, dirty_id, store, config, &env) {
                    Ok(r) => r,
                    Err(e) => return Some(Err(e)),
                };
                cache.render_cache.insert(dirty_id.clone(), rp.clone());
                rendered_dirty.push(rp);
            }
        }
    }

    // Write only dirty pages
    if let Err(e) = crate::output::write_dirty_pages(&rendered_dirty, &dirty_ids, config) {
        return Some(Err(e));
    }

    Some(Ok((dirty_count, dirty_count)))
}

/// Incremental rebuild: selective parse β†’ full graph β†’ selective render β†’ incremental output.
/// Returns (total_rendered, dirty_count).
fn incremental_rebuild(
    config: &SiteConfig,
    cache: &mut BuildCache,
    changed_paths: &HashSet<PathBuf>,
    subgraphs_path: Option<&Path>,
) -> Result<(usize, usize)> {
    // Step 1: Scan (always full β€” it's fast)
    let discovered = crate::scanner::scan(&config.build.input_dir, &config.content)?;

    // Step 2: Selective parse β€” only re-parse files whose mtime changed
    let mut all_parsed: Vec<ParsedPage> = Vec::new();
    let mut changed_page_ids: HashSet<PageId> = HashSet::new();

    for file in discovered.pages.iter().chain(discovered.journals.iter()) {
        let mtime = std::fs::metadata(&file.path)
            .ok()
            .and_then(|m| m.modified().ok())
            .unwrap_or(SystemTime::UNIX_EPOCH);

        // Keep file_cache in sync for fast path
        cache.file_cache.insert(file.path.clone(), file.clone());

        if let Some((cached_mtime, cached_page)) = cache.parse_cache.get(&file.path) {
            if *cached_mtime == mtime && !changed_paths.contains(&file.path) {
                // File unchanged β€” use cached parse
                all_parsed.push(cached_page.clone());
                continue;
            }
        }

        // Parse (new or changed file)
        let page = crate::parser::parse_file(file)?;
        changed_page_ids.insert(page.id.clone());
        cache.parse_cache.insert(file.path.clone(), (mtime, page.clone()));
        all_parsed.push(page);
    }

    // Non-markdown files: always re-parse (they're few and cheap)
    let non_md = crate::parser::parse_all(&crate::scanner::DiscoveredFiles {
        pages: vec![],
        journals: vec![],
        media: discovered.media.clone(),
        files: discovered.files.clone(),
    })?;
    all_parsed.extend(non_md);

    // Remove stale cache entries for deleted files
    let current_paths: HashSet<&PathBuf> = discovered
        .pages
        .iter()
        .chain(discovered.journals.iter())
        .map(|f| &f.path)
        .collect();
    cache.parse_cache.retain(|path, _| current_paths.contains(path));
    cache.file_cache.retain(|path, _| current_paths.contains(path));

    // Step 2b: Load subgraph decls (TOML), enforce monopoly, then re-ingest
    // ONLY the subgraphs whose files actually changed. Each subgraph repo can
    // contain thousands of files (cyb: 4417, pussy-ts: 1209); re-ingesting all
    // 27 on any spurious write was the source of 30-60s "live" reload times.
    let subgraph_decls = crate::scanner::subgraph::load_subgraph_decls(subgraphs_path)?;
    if !subgraph_decls.is_empty() {
        let subgraph_namespaces: Vec<String> =
            subgraph_decls.iter().map(|d| d.name.clone()).collect();
        let _evicted = crate::scanner::subgraph::enforce_namespace_monopoly(
            &mut all_parsed,
            &subgraph_namespaces,
        );

        // Decide per-subgraph: dirty (re-ingest) or clean (reuse cache).
        // A subgraph is dirty when any changed path lies inside its repo OR
        // when it has no cache entry yet (first run / new subgraph).
        let mut dirty_subgraphs: HashSet<String> = HashSet::new();
        for decl in &subgraph_decls {
            let path_dirty = changed_paths.iter().any(|p| p.starts_with(&decl.repo_path));
            let cache_cold = !cache.subgraph_pages_by_name.contains_key(&decl.name);
            if path_dirty || cache_cold {
                dirty_subgraphs.insert(decl.name.clone());
            }
        }

        // Track current decl set so we can prune removed subgraphs from the cache.
        let current_names: HashSet<String> =
            subgraph_decls.iter().map(|d| d.name.clone()).collect();
        cache.subgraph_pages_by_name.retain(|n, _| current_names.contains(n));
        cache.subgraph_repo_by_name.retain(|n, _| current_names.contains(n));
        cache.subgraph_parse_caches.retain(|n, _| current_names.contains(n));

        for decl in &subgraph_decls {
            // Always update the repo path β€” a decl may have been renamed
            cache.subgraph_repo_by_name.insert(decl.name.clone(), decl.repo_path.clone());

            if dirty_subgraphs.contains(&decl.name) {
                let sg_cache = cache.subgraph_parse_caches.entry(decl.name.clone()).or_default();
                let ingestion =
                    crate::scanner::subgraph::ingest_subgraph_cached(decl, &mut all_parsed, sg_cache)?;
                // Mark these pages so the dirty detection downstream picks them up.
                for page in &ingestion.pages {
                    changed_page_ids.insert(page.id.clone());
                }
                cache.subgraph_pages_by_name.insert(decl.name.clone(), ingestion.pages);
            } else {
                // Clean subgraph: mirror ingest_subgraph's declaring-page eviction
                // so the cached README occupies the slot, not the root stub.
                all_parsed.retain(|p| {
                    !(p.id == decl.declaring_page_id && p.subgraph.is_none())
                });
            }
        }

        for pages in cache.subgraph_pages_by_name.values() {
            all_parsed.extend(pages.iter().cloned());
        }
    }

    let subgraph_names: Vec<String> = subgraph_decls.iter().map(|d| d.name.clone()).collect();
    crate::parser::synthesize_dir_indexes(&mut all_parsed, &subgraph_names);

    // Re-apply IPFS map rewrites on every reload. Without this, any page
    // re-parsed during live-reload reverts to the raw `../media/<file>`
    // markdown path and renders a broken image (this only ran in the
    // initial build_site call, so any incremental rebuild silently
    // regressed images on re-rendered pages).
    crate::parser::apply_ipfs_rewrites_for_config(&mut all_parsed, config)?;

    // Step 3: Detect content, meta, and link changes BEFORE building graph (cheap hash comparison).
    let mut dirty_ids: HashSet<PageId> = HashSet::new();
    let mut content_page_ids: HashSet<PageId> = HashSet::new();
    let mut meta_changed = false;
    let mut links_changed = false;

    for page in &all_parsed {
        content_page_ids.insert(page.id.clone());

        // Only mark pages dirty if they were actually re-parsed from a changed file.
        // Subgraph pages and unchanged pages may have non-deterministic hash diffs
        // due to merge order differences β€” these must not pollute dirty_ids.
        let was_reparsed = changed_page_ids.contains(&page.id);

        // Content hash β€” detect body text changes
        let new_hash = hash_str(&page.content_md);
        if let Some(&old_hash) = cache.content_hashes.get(&page.id) {
            if old_hash != new_hash && was_reparsed {
                dirty_ids.insert(page.id.clone());
            }
        } else if was_reparsed {
            dirty_ids.insert(page.id.clone());
        }
        cache.content_hashes.insert(page.id.clone(), new_hash);

        // Meta hash β€” detect frontmatter changes (title, aliases, icon, stake, tags)
        let new_meta_hash = hash_meta(page);
        if let Some(&old_meta_hash) = cache.meta_hashes.get(&page.id) {
            if old_meta_hash != new_meta_hash && was_reparsed {
                dirty_ids.insert(page.id.clone());
                meta_changed = true;
            }
        } else if cache.initialized && was_reparsed {
            meta_changed = true;
        }
        cache.meta_hashes.insert(page.id.clone(), new_meta_hash);

        // Outgoing links hash β€” detect link set changes
        let new_link_hash = hash_links(page);
        if let Some(&old_link_hash) = cache.link_hashes.get(&page.id) {
            if old_link_hash != new_link_hash && was_reparsed {
                dirty_ids.insert(page.id.clone());
                links_changed = true;
            }
        } else if cache.initialized && was_reparsed {
            links_changed = true;
        }
        cache.link_hashes.insert(page.id.clone(), new_link_hash);
    }

    // Check for structural change: pages added or removed, or tags changed, or meta/links changed.
    let pages_added_or_removed = content_page_ids != cache.last_content_page_ids;
    let pages_removed = cache.last_content_page_ids.iter().any(|id| !content_page_ids.contains(id));
    let tags_changed = dirty_ids.iter().any(|dirty_id| {
        all_parsed.iter().find(|p| &p.id == dirty_id).map(|page| {
            let new_tag_hash = hash_str(&page.meta.tags.join(","));
            let changed = cache.tag_hashes.get(dirty_id)
                .map(|&old| old != new_tag_hash)
                .unwrap_or(true);
            cache.tag_hashes.insert(dirty_id.clone(), new_tag_hash);
            changed
        }).unwrap_or(false)
    });

    // Meta or link changes escalate to structural rebuild since they affect other pages
    let structural_change = !cache.initialized
        || pages_added_or_removed
        || tags_changed
        || meta_changed
        || links_changed;


    // Step 4: Build or reuse graph store.
    // Full graph build (PageRank, gravity, etc.) is expensive β€” only do it for structural changes.
    if structural_change || cache.cached_store.is_none() {
        let old_namespace_children = cache.last_namespace_children.clone();
        let mut store = if let Some(old) = cache.cached_store.as_ref() {
            crate::graph::build_graph_fast(
                all_parsed,
                old.pagerank.clone(),
                old.focus.clone(),
                old.gravity.clone(),
            )?
        } else {
            crate::graph::build_graph(all_parsed)?
        };
        store.subgraph_private = cache.private_subgraph_names.clone();
        cache.last_content_page_ids = content_page_ids.clone();

        // Fix 4: Compare backlink snapshots β€” mark pages with changed backlinks dirty
        for (page_id, backlinks) in &store.backlinks {
            let mut sorted = backlinks.clone();
            sorted.sort();
            let old_snapshot = cache.backlink_snapshots.get(page_id);
            if old_snapshot != Some(&sorted) {
                dirty_ids.insert(page_id.clone());
            }
            cache.backlink_snapshots.insert(page_id.clone(), sorted);
        }
        // Also check pages that used to have backlinks but no longer do
        let new_backlink_keys: HashSet<&PageId> = store.backlinks.keys().collect();
        let stale_backlink_pages: Vec<PageId> = cache.backlink_snapshots.keys()
            .filter(|k| !new_backlink_keys.contains(k))
            .cloned()
            .collect();
        for page_id in &stale_backlink_pages {
            if cache.backlink_snapshots.get(page_id).map(|v| !v.is_empty()).unwrap_or(false) {
                dirty_ids.insert(page_id.clone());
            }
            cache.backlink_snapshots.remove(page_id);
        }

        // Mark a namespace parent dirty whenever its children set differs
        // from last build β€” covers add, remove, move-in, move-out. Also
        // propagate to ancestors: render/context.rs builds a parent's
        // folder listing by iterating ALL namespace_tree keys with the
        // page name as prefix, so adding a deep sub-namespace (e.g.
        // `cyber valley/cve/team`) changes the folder listing on
        // `cyber valley/cve` AND `cyber valley`, even when neither page's
        // direct children set changed.
        let new_namespace_children: HashMap<String, HashSet<PageId>> = store
            .namespace_tree
            .iter()
            .map(|(k, v)| (k.clone(), v.iter().cloned().collect()))
            .collect();
        // Namespace keys are raw (e.g. "cyber valley/cve") but page ids are
        // slugified ("cyber-valley/cve"). Dirty marks must use slugified
        // form to match PageId entries in the render cache.
        let mark_with_ancestors = |dirty: &mut HashSet<PageId>, ns: &str| {
            dirty.insert(crate::parser::slugify_page_name(ns));
            let mut cur = ns;
            while let Some((parent, _)) = cur.rsplit_once('/') {
                dirty.insert(crate::parser::slugify_page_name(parent));
                cur = parent;
            }
        };
        for (ns_key, new_set) in &new_namespace_children {
            let old_set = old_namespace_children.get(ns_key);
            let key_changed = old_set.map(|s| s != new_set).unwrap_or(true);
            let key_is_new = old_set.is_none();
            if key_changed {
                dirty_ids.insert(crate::parser::slugify_page_name(ns_key));
            }
            if key_is_new {
                mark_with_ancestors(&mut dirty_ids, ns_key);
            }
        }
        for old_key in old_namespace_children.keys() {
            if !new_namespace_children.contains_key(old_key) {
                mark_with_ancestors(&mut dirty_ids, old_key);
            }
        }
        cache.last_namespace_children = new_namespace_children;

        for (page_id, page) in &store.pages {
            if !cache.content_hashes.contains_key(page_id) {
                cache.content_hashes.insert(page_id.clone(), hash_str(&page.content_md));
            }
            if !cache.meta_hashes.contains_key(page_id) {
                cache.meta_hashes.insert(page_id.clone(), hash_meta(page));
            }
            if !cache.link_hashes.contains_key(page_id) {
                cache.link_hashes.insert(page_id.clone(), hash_links(page));
            }
        }
        cache.cached_store = Some(store);
    } else {
        // Content-only change: update page content in the cached store in-place.
        // Skip expensive PageRank/gravity/trikernel recomputation.
        let store = cache.cached_store.as_mut().unwrap();
        for dirty_id in &dirty_ids {
            if let Some(new_page) = all_parsed.iter().find(|p| &p.id == dirty_id) {
                if let Some(cached_page) = store.pages.get_mut(dirty_id) {
                    cached_page.content_md = new_page.content_md.clone();
                    cached_page.meta = new_page.meta.clone();
                    cached_page.outgoing_links = new_page.outgoing_links.clone();
                }
            }
        }
    }

    // Fix 5: Prune stale cache entries β€” only keep current page IDs
    {
        let store = cache.cached_store.as_ref().unwrap();
        let current_ids: HashSet<&PageId> = store.pages.keys().collect();
        cache.content_hashes.retain(|k, _| current_ids.contains(k));
        cache.tag_hashes.retain(|k, _| current_ids.contains(k));
        cache.meta_hashes.retain(|k, _| current_ids.contains(k));
        cache.link_hashes.retain(|k, _| current_ids.contains(k));
        cache.backlink_snapshots.retain(|k, _| current_ids.contains(k));
        cache.render_cache.retain(|k, _| current_ids.contains(k)
            || k.starts_with("__"));  // Keep synthetic page caches
    }

    if structural_change {
        dirty_ids.insert("__structural__".to_string());
    }

    let dirty_count = dirty_ids.len();
    let store = cache.cached_store.as_ref().unwrap();

    // Step 5: Selective render
    let dirty_ref = if cache.initialized {
        Some(&dirty_ids)
    } else {
        None
    };
    let rendered = crate::render::render_cached(
        store,
        config,
        &mut cache.render_cache,
        dirty_ref,
    )?;
    let total = rendered.len();

    // Step 6: Output β€” write only what changed
    if !cache.initialized {
        // First build: full output
        crate::output::write_output(&rendered, &store, config, &discovered)?;
        cache.initialized = true;
    } else if structural_change {
        if pages_removed {
            // Pages were removed: full incremental write to clean up stale directories
            crate::output::write_incremental(&rendered, &store, config, &discovered)?;
        } else {
            // Pages added or tags/meta changed (no removals): write dirty + synthetic.
            // Previously this wrote ALL 12K pages via write_incremental,
            // which took 30-60s. Now we only write the actually-changed pages.
            // Include all synthetic/tag pages that render_cached re-rendered.
            for rp in &rendered {
                if rp.page_id.starts_with("__") {
                    dirty_ids.insert(rp.page_id.clone());
                }
            }
            crate::output::write_dirty_pages(&rendered, &dirty_ids, config)?;
        }
    } else {
        // Content-only change: write just the dirty pages
        crate::output::write_dirty_pages(&rendered, &dirty_ids, config)?;
    }

    Ok((total, dirty_count))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::parser::{PageMeta, PageKind, ParsedPage};
    use std::collections::HashMap;
    use std::path::PathBuf;

    /// Helper to build a minimal ParsedPage for testing hash functions.
    fn make_page(id: &str, title: &str, content: &str) -> ParsedPage {
        ParsedPage {
            id: id.to_string(),
            meta: PageMeta {
                title: title.to_string(),
                properties: HashMap::new(),
                tags: vec![],
                public: Some(true),
                aliases: vec![],
                date: None,
                icon: None,
                menu_order: None,
                stake: None,
            },
            kind: PageKind::Page,
            source_path: PathBuf::new(),
            namespace: None,
            subgraph: None,
            content_md: content.to_string(),
            outgoing_links: vec![],
        }
    }

    #[test]
    fn test_hash_str_deterministic() {
        let input = "hello world";
        assert_eq!(hash_str(input), hash_str(input));
        // Call multiple times to ensure stability
        let h1 = hash_str(input);
        let h2 = hash_str(input);
        let h3 = hash_str(input);
        assert_eq!(h1, h2);
        assert_eq!(h2, h3);
    }

    #[test]
    fn test_hash_str_different() {
        let h1 = hash_str("hello");
        let h2 = hash_str("world");
        assert_ne!(h1, h2, "different inputs must produce different hashes");
        // Also test subtle differences
        let h3 = hash_str("Hello");
        assert_ne!(h1, h3, "case difference must produce different hash");
    }

    #[test]
    fn test_hash_meta_detects_title_change() {
        let page_a = make_page("test", "Title A", "content");
        let page_b = make_page("test", "Title B", "content");
        assert_ne!(
            hash_meta(&page_a),
            hash_meta(&page_b),
            "different titles must produce different meta hashes"
        );
    }

    #[test]
    fn test_hash_meta_detects_icon_change() {
        let mut page_a = make_page("test", "Title", "content");
        let mut page_b = make_page("test", "Title", "content");
        page_a.meta.icon = Some("πŸ”΅".to_string());
        page_b.meta.icon = Some("🟒".to_string());
        assert_ne!(
            hash_meta(&page_a),
            hash_meta(&page_b),
            "different icons must produce different meta hashes"
        );
        // Also check None vs Some
        let page_c = make_page("test", "Title", "content");
        assert_ne!(
            hash_meta(&page_a),
            hash_meta(&page_c),
            "icon Some vs None must produce different meta hashes"
        );
    }

    #[test]
    fn test_hash_meta_detects_alias_change() {
        let mut page_a = make_page("test", "Title", "content");
        let mut page_b = make_page("test", "Title", "content");
        page_a.meta.aliases = vec!["alias1".to_string()];
        page_b.meta.aliases = vec!["alias2".to_string()];
        assert_ne!(
            hash_meta(&page_a),
            hash_meta(&page_b),
            "different aliases must produce different meta hashes"
        );
        // Same aliases β†’ same hash
        let mut page_c = make_page("test", "Title", "content");
        page_c.meta.aliases = vec!["alias1".to_string()];
        assert_eq!(
            hash_meta(&page_a),
            hash_meta(&page_c),
            "identical aliases must produce same meta hash"
        );
    }

    #[test]
    fn test_hash_links_detects_link_change() {
        let mut page_a = make_page("test", "Title", "content");
        let mut page_b = make_page("test", "Title", "content");
        page_a.outgoing_links = vec!["link-a".to_string()];
        page_b.outgoing_links = vec!["link-b".to_string()];
        assert_ne!(
            hash_links(&page_a),
            hash_links(&page_b),
            "different link sets must produce different link hashes"
        );
    }

    #[test]
    fn test_hash_links_order_independent() {
        let mut page_a = make_page("test", "Title", "content");
        let mut page_b = make_page("test", "Title", "content");
        page_a.outgoing_links = vec!["alpha".to_string(), "beta".to_string(), "gamma".to_string()];
        page_b.outgoing_links = vec!["gamma".to_string(), "alpha".to_string(), "beta".to_string()];
        assert_eq!(
            hash_links(&page_a),
            hash_links(&page_b),
            "hash_links must be order-independent (it sorts internally)"
        );
    }
}

Graph