All files / main session-watcher.ts

92.59% Statements 75/81
91.66% Branches 22/24
100% Functions 6/6
92.59% Lines 75/81

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 1241x                                       1x 1x 1x   1x 3x 3x   1x 3x       3x 3x 3x 3x     3x 3x   3x 3x 3x     3x 3x   1x 3x 3x 3x 3x 3x   1x 4x 4x 4x 4x             4x         4x 4x   4x   4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 9x 5x 9x 4x 4x 4x 5x 4x 4x 4x 4x       4x 4x   1x   5x 5x 5x 5x 1x 1x 1x 1x 1x 1x 4x 5x 1x  
import chokidar from "chokidar";
import type { FSWatcher } from "chokidar";
import fs from "node:fs";
import { EventEmitter } from "node:events";
import type { AgentEvent } from "../shared/types";
import { normalizeJsonlEvents } from "./event-normalizer";
import { logger } from "./logger";
 
/**
 * Watches a root directory for Claude Code JSONL session logs and emits
 * normalized `AgentEvent` objects. Tracks a per-file byte offset so each
 * append is read exactly once and truncations reset to the top of the file.
 *
 * Usage:
 *   const watcher = new SessionWatcher("~/.claude/projects");
 *   watcher.on("event", (e: AgentEvent) => ...);
 *   await watcher.start();
 *   // later
 *   await watcher.stop();
 */
export class SessionWatcher extends EventEmitter {
  private watcher: FSWatcher | null = null;
  private offsets = new Map<string, number>();
 
  constructor(private readonly rootDir: string) {
    super();
  }
 
  async start(): Promise<void> {
    logger.info("SessionWatcher starting", { rootDir: this.rootDir });
    // chokidar v4 dropped built-in glob support, so we watch the root dir and
    // filter to *.jsonl via the `ignored` predicate. Non-file paths (dirs) must
    // pass through so subdirectories get traversed.
    this.watcher = chokidar.watch(this.rootDir, {
      persistent: true,
      ignoreInitial: false,
      ignored: (p, stats) => !!stats?.isFile() && !p.endsWith(".jsonl"),
      // Wait for writes to settle before firing so we do not read a partial
      // JSONL line mid-append. 50ms is tuned to match typical Claude Code flush cadence.
      awaitWriteFinish: { stabilityThreshold: 50, pollInterval: 20 }
    });
 
    this.watcher.on("add", (file) => this.readFromOffset(file));
    this.watcher.on("change", (file) => this.readFromOffset(file));
    this.watcher.on("unlink", (file) => {
      logger.debug("SessionWatcher file removed", { file });
      this.offsets.delete(file);
    });
  }
 
  async stop(): Promise<void> {
    logger.info("SessionWatcher stopping");
    await this.watcher?.close();
    this.watcher = null;
    this.offsets.clear();
  }
 
  private readFromOffset(file: string): void {
    let size: number;
    try {
      size = fs.statSync(file).size;
    } catch {
      // File was unlinked between the chokidar event and our stat call. The
      // unlink handler will clean up this.offsets; nothing to read.
      logger.warn("SessionWatcher stat failed (file removed mid-read)", { file });
      return;
    }
 
    const prevOffset = this.offsets.get(file) ?? 0;
    // Truncation or rewrite detection: chokidar only fires `change` when the
    // file actually changed, so if the file is not strictly larger than our
    // cursor we must be looking at a rewrite (same size with different bytes,
    // or a shrink). Reset to the top and re-read.
    let offset = prevOffset;
    if (size <= prevOffset && prevOffset > 0) offset = 0;
 
    if (offset >= size) return;
 
    const stream = fs.createReadStream(file, {
      start: offset,
      end: size - 1,
      encoding: "utf8"
    });
    let buffer = "";
    stream.on("data", (chunk) => {
      buffer += chunk;
    });
    stream.on("end", () => {
      const lines = buffer.split("\n");
      let emitted = 0;
      for (const line of lines) {
        if (!line.trim()) continue;
        const events = this.parseLine(line, file);
        for (const event of events) {
          this.emit("event", event);
          emitted += 1;
        }
      }
      if (emitted > 0) logger.debug("SessionWatcher emitted events", { file, count: emitted });
      this.offsets.set(file, size);
    });
    stream.on("error", (err) => {
      // Swallow read errors: the file may have been rotated out from under us.
      // The next `change` event will reset the offset via the truncation guard above.
      logger.warn("SessionWatcher read stream error", { file, message: err.message });
    });
  }
 
  private parseLine(line: string, file: string): AgentEvent[] {
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    let raw: any;
    try {
      raw = JSON.parse(line);
    } catch {
      logger.warn("SessionWatcher malformed JSONL line", {
        file,
        excerpt: line.slice(0, 120)
      });
      return [];
    }
    return normalizeJsonlEvents(raw, line);
  }
}