This commit is contained in:
alex wiesner
2026-04-12 06:47:14 +01:00
parent 1f0df3ed0d
commit 5d5d0e2d26
17 changed files with 1575 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
"""Caveman compress scripts.
This package provides tools to compress natural language markdown files
into caveman format to save input tokens.
"""
__all__ = ["cli", "compress", "detect", "validate"]
__version__ = "1.0.0"

View File

@@ -0,0 +1,3 @@
from .cli import main
main()

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
from pathlib import Path
import sys
# Support both direct execution and module import
try:
from .validate import validate
except ImportError:
sys.path.insert(0, str(Path(__file__).parent))
from validate import validate
try:
import tiktoken
_enc = tiktoken.get_encoding("o200k_base")
except ImportError:
_enc = None
def count_tokens(text):
if _enc is None:
return len(text.split()) # fallback: word count
return len(_enc.encode(text))
def benchmark_pair(orig_path: Path, comp_path: Path):
orig_text = orig_path.read_text()
comp_text = comp_path.read_text()
orig_tokens = count_tokens(orig_text)
comp_tokens = count_tokens(comp_text)
saved = 100 * (orig_tokens - comp_tokens) / orig_tokens if orig_tokens > 0 else 0.0
result = validate(orig_path, comp_path)
return (comp_path.name, orig_tokens, comp_tokens, saved, result.is_valid)
def print_table(rows):
print("\n| File | Original | Compressed | Saved % | Valid |")
print("|------|----------|------------|---------|-------|")
for r in rows:
print(f"| {r[0]} | {r[1]} | {r[2]} | {r[3]:.1f}% | {'' if r[4] else ''} |")
def main():
# Direct file pair: python3 benchmark.py original.md compressed.md
if len(sys.argv) == 3:
orig = Path(sys.argv[1]).resolve()
comp = Path(sys.argv[2]).resolve()
if not orig.exists():
print(f"❌ Not found: {orig}")
sys.exit(1)
if not comp.exists():
print(f"❌ Not found: {comp}")
sys.exit(1)
print_table([benchmark_pair(orig, comp)])
return
# Glob mode: repo_root/tests/caveman-compress/
tests_dir = Path(__file__).parent.parent.parent / "tests" / "caveman-compress"
if not tests_dir.exists():
print(f"❌ Tests dir not found: {tests_dir}")
sys.exit(1)
rows = []
for orig in sorted(tests_dir.glob("*.original.md")):
comp = orig.with_name(orig.stem.removesuffix(".original") + ".md")
if comp.exists():
rows.append(benchmark_pair(orig, comp))
if not rows:
print("No compressed file pairs found.")
return
print_table(rows)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Caveman Compress CLI
Usage:
caveman <filepath>
"""
import sys
from pathlib import Path
from .compress import compress_file
from .detect import detect_file_type, should_compress
def print_usage():
print("Usage: caveman <filepath>")
def main():
if len(sys.argv) != 2:
print_usage()
sys.exit(1)
filepath = Path(sys.argv[1])
# Check file exists
if not filepath.exists():
print(f"❌ File not found: {filepath}")
sys.exit(1)
if not filepath.is_file():
print(f"❌ Not a file: {filepath}")
sys.exit(1)
filepath = filepath.resolve()
# Detect file type
file_type = detect_file_type(filepath)
print(f"Detected: {file_type}")
# Check if compressible
if not should_compress(filepath):
print("Skipping: file is not natural language (code/config)")
sys.exit(0)
print("Starting caveman compression...\n")
try:
success = compress_file(filepath)
if success:
print("\nCompression completed successfully")
backup_path = filepath.with_name(filepath.stem + ".original.md")
print(f"Compressed: {filepath}")
print(f"Original: {backup_path}")
sys.exit(0)
else:
print("\n❌ Compression failed after retries")
sys.exit(2)
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(130)
except Exception as e:
print(f"\n❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Caveman Memory Compression Orchestrator
Usage:
python scripts/compress.py <filepath>
"""
import os
import re
import subprocess
from pathlib import Path
from typing import List
OUTER_FENCE_REGEX = re.compile(
r"\A\s*(`{3,}|~{3,})[^\n]*\n(.*)\n\1\s*\Z", re.DOTALL
)
def strip_llm_wrapper(text: str) -> str:
"""Strip outer ```markdown ... ``` fence when it wraps the entire output."""
m = OUTER_FENCE_REGEX.match(text)
if m:
return m.group(2)
return text
from .detect import should_compress
from .validate import validate
MAX_RETRIES = 2
# ---------- Claude Calls ----------
def call_claude(prompt: str) -> str:
api_key = os.environ.get("ANTHROPIC_API_KEY")
if api_key:
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
msg = client.messages.create(
model=os.environ.get("CAVEMAN_MODEL", "claude-sonnet-4-5"),
max_tokens=8192,
messages=[{"role": "user", "content": prompt}],
)
return strip_llm_wrapper(msg.content[0].text.strip())
except ImportError:
pass # anthropic not installed, fall back to CLI
# Fallback: use claude CLI (handles desktop auth)
try:
result = subprocess.run(
["claude", "--print"],
input=prompt,
text=True,
capture_output=True,
check=True,
)
return strip_llm_wrapper(result.stdout.strip())
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Claude call failed:\n{e.stderr}")
def build_compress_prompt(original: str) -> str:
return f"""
Compress this markdown into caveman format.
STRICT RULES:
- Do NOT modify anything inside ``` code blocks
- Do NOT modify anything inside inline backticks
- Preserve ALL URLs exactly
- Preserve ALL headings exactly
- Preserve file paths and commands
- Return ONLY the compressed markdown body — do NOT wrap the entire output in a ```markdown fence or any other fence. Inner code blocks from the original stay as-is; do not add a new outer fence around the whole file.
Only compress natural language.
TEXT:
{original}
"""
def build_fix_prompt(original: str, compressed: str, errors: List[str]) -> str:
errors_str = "\n".join(f"- {e}" for e in errors)
return f"""You are fixing a caveman-compressed markdown file. Specific validation errors were found.
CRITICAL RULES:
- DO NOT recompress or rephrase the file
- ONLY fix the listed errors — leave everything else exactly as-is
- The ORIGINAL is provided as reference only (to restore missing content)
- Preserve caveman style in all untouched sections
ERRORS TO FIX:
{errors_str}
HOW TO FIX:
- Missing URL: find it in ORIGINAL, restore it exactly where it belongs in COMPRESSED
- Code block mismatch: find the exact code block in ORIGINAL, restore it in COMPRESSED
- Heading mismatch: restore the exact heading text from ORIGINAL into COMPRESSED
- Do not touch any section not mentioned in the errors
ORIGINAL (reference only):
{original}
COMPRESSED (fix this):
{compressed}
Return ONLY the fixed compressed file. No explanation.
"""
# ---------- Core Logic ----------
def compress_file(filepath: Path) -> bool:
# Resolve and validate path
filepath = filepath.resolve()
MAX_FILE_SIZE = 500_000 # 500KB
if not filepath.exists():
raise FileNotFoundError(f"File not found: {filepath}")
if filepath.stat().st_size > MAX_FILE_SIZE:
raise ValueError(f"File too large to compress safely (max 500KB): {filepath}")
print(f"Processing: {filepath}")
if not should_compress(filepath):
print("Skipping (not natural language)")
return False
original_text = filepath.read_text(errors="ignore")
backup_path = filepath.with_name(filepath.stem + ".original.md")
# Check if backup already exists to prevent accidental overwriting
if backup_path.exists():
print(f"⚠️ Backup file already exists: {backup_path}")
print("The original backup may contain important content.")
print("Aborting to prevent data loss. Please remove or rename the backup file if you want to proceed.")
return False
# Step 1: Compress
print("Compressing with Claude...")
compressed = call_claude(build_compress_prompt(original_text))
# Save original as backup, write compressed to original path
backup_path.write_text(original_text)
filepath.write_text(compressed)
# Step 2: Validate + Retry
for attempt in range(MAX_RETRIES):
print(f"\nValidation attempt {attempt + 1}")
result = validate(backup_path, filepath)
if result.is_valid:
print("Validation passed")
break
print("❌ Validation failed:")
for err in result.errors:
print(f" - {err}")
if attempt == MAX_RETRIES - 1:
# Restore original on failure
filepath.write_text(original_text)
backup_path.unlink(missing_ok=True)
print("❌ Failed after retries — original restored")
return False
print("Fixing with Claude...")
compressed = call_claude(
build_fix_prompt(original_text, compressed, result.errors)
)
filepath.write_text(compressed)
return True

View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""Detect whether a file is natural language (compressible) or code/config (skip)."""
import json
import re
from pathlib import Path
# Extensions that are natural language and compressible
COMPRESSIBLE_EXTENSIONS = {".md", ".txt", ".markdown", ".rst"}
# Extensions that are code/config and should be skipped
SKIP_EXTENSIONS = {
".py", ".js", ".ts", ".tsx", ".jsx", ".json", ".yaml", ".yml",
".toml", ".env", ".lock", ".css", ".scss", ".html", ".xml",
".sql", ".sh", ".bash", ".zsh", ".go", ".rs", ".java", ".c",
".cpp", ".h", ".hpp", ".rb", ".php", ".swift", ".kt", ".lua",
".dockerfile", ".makefile", ".csv", ".ini", ".cfg",
}
# Patterns that indicate a line is code
CODE_PATTERNS = [
re.compile(r"^\s*(import |from .+ import |require\(|const |let |var )"),
re.compile(r"^\s*(def |class |function |async function |export )"),
re.compile(r"^\s*(if\s*\(|for\s*\(|while\s*\(|switch\s*\(|try\s*\{)"),
re.compile(r"^\s*[\}\]\);]+\s*$"), # closing braces/brackets
re.compile(r"^\s*@\w+"), # decorators/annotations
re.compile(r'^\s*"[^"]+"\s*:\s*'), # JSON-like key-value
re.compile(r"^\s*\w+\s*=\s*[{\[\(\"']"), # assignment with literal
]
def _is_code_line(line: str) -> bool:
"""Check if a line looks like code."""
return any(p.match(line) for p in CODE_PATTERNS)
def _is_json_content(text: str) -> bool:
"""Check if content is valid JSON."""
try:
json.loads(text)
return True
except (json.JSONDecodeError, ValueError):
return False
def _is_yaml_content(lines: list[str]) -> bool:
"""Heuristic: check if content looks like YAML."""
yaml_indicators = 0
for line in lines[:30]:
stripped = line.strip()
if stripped.startswith("---"):
yaml_indicators += 1
elif re.match(r"^\w[\w\s]*:\s", stripped):
yaml_indicators += 1
elif stripped.startswith("- ") and ":" in stripped:
yaml_indicators += 1
# If most non-empty lines look like YAML
non_empty = sum(1 for l in lines[:30] if l.strip())
return non_empty > 0 and yaml_indicators / non_empty > 0.6
def detect_file_type(filepath: Path) -> str:
"""Classify a file as 'natural_language', 'code', 'config', or 'unknown'.
Returns:
One of: 'natural_language', 'code', 'config', 'unknown'
"""
ext = filepath.suffix.lower()
# Extension-based classification
if ext in COMPRESSIBLE_EXTENSIONS:
return "natural_language"
if ext in SKIP_EXTENSIONS:
return "code" if ext not in {".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".env"} else "config"
# Extensionless files (like CLAUDE.md, TODO) — check content
if not ext:
try:
text = filepath.read_text(errors="ignore")
except (OSError, PermissionError):
return "unknown"
lines = text.splitlines()[:50]
if _is_json_content(text[:10000]):
return "config"
if _is_yaml_content(lines):
return "config"
code_lines = sum(1 for l in lines if l.strip() and _is_code_line(l))
non_empty = sum(1 for l in lines if l.strip())
if non_empty > 0 and code_lines / non_empty > 0.4:
return "code"
return "natural_language"
return "unknown"
def should_compress(filepath: Path) -> bool:
"""Return True if the file is natural language and should be compressed."""
if not filepath.is_file():
return False
# Skip backup files
if filepath.name.endswith(".original.md"):
return False
return detect_file_type(filepath) == "natural_language"
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python detect.py <file1> [file2] ...")
sys.exit(1)
for path_str in sys.argv[1:]:
p = Path(path_str).resolve()
file_type = detect_file_type(p)
compress = should_compress(p)
print(f" {p.name:30s} type={file_type:20s} compress={compress}")

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
import re
from pathlib import Path
URL_REGEX = re.compile(r"https?://[^\s)]+")
FENCE_OPEN_REGEX = re.compile(r"^(\s{0,3})(`{3,}|~{3,})(.*)$")
HEADING_REGEX = re.compile(r"^(#{1,6})\s+(.*)", re.MULTILINE)
BULLET_REGEX = re.compile(r"^\s*[-*+]\s+", re.MULTILINE)
# crude but effective path detection
# Requires either a path prefix (./ ../ / or drive letter) or a slash/backslash within the match
PATH_REGEX = re.compile(r"(?:\./|\.\./|/|[A-Za-z]:\\)[\w\-/\\\.]+|[\w\-\.]+[/\\][\w\-/\\\.]+")
class ValidationResult:
def __init__(self):
self.is_valid = True
self.errors = []
self.warnings = []
def add_error(self, msg):
self.is_valid = False
self.errors.append(msg)
def add_warning(self, msg):
self.warnings.append(msg)
def read_file(path: Path) -> str:
return path.read_text(errors="ignore")
# ---------- Extractors ----------
def extract_headings(text):
return [(level, title.strip()) for level, title in HEADING_REGEX.findall(text)]
def extract_code_blocks(text):
"""Line-based fenced code block extractor.
Handles ``` and ~~~ fences with variable length (CommonMark: closing
fence must use same char and be at least as long as opening). Supports
nested fences (e.g. an outer 4-backtick block wrapping inner 3-backtick
content).
"""
blocks = []
lines = text.split("\n")
i = 0
n = len(lines)
while i < n:
m = FENCE_OPEN_REGEX.match(lines[i])
if not m:
i += 1
continue
fence_char = m.group(2)[0]
fence_len = len(m.group(2))
open_line = lines[i]
block_lines = [open_line]
i += 1
closed = False
while i < n:
close_m = FENCE_OPEN_REGEX.match(lines[i])
if (
close_m
and close_m.group(2)[0] == fence_char
and len(close_m.group(2)) >= fence_len
and close_m.group(3).strip() == ""
):
block_lines.append(lines[i])
closed = True
i += 1
break
block_lines.append(lines[i])
i += 1
if closed:
blocks.append("\n".join(block_lines))
# Unclosed fences are silently skipped — they indicate malformed markdown
# and including them would cause false-positive validation failures.
return blocks
def extract_urls(text):
return set(URL_REGEX.findall(text))
def extract_paths(text):
return set(PATH_REGEX.findall(text))
def count_bullets(text):
return len(BULLET_REGEX.findall(text))
# ---------- Validators ----------
def validate_headings(orig, comp, result):
h1 = extract_headings(orig)
h2 = extract_headings(comp)
if len(h1) != len(h2):
result.add_error(f"Heading count mismatch: {len(h1)} vs {len(h2)}")
if h1 != h2:
result.add_warning("Heading text/order changed")
def validate_code_blocks(orig, comp, result):
c1 = extract_code_blocks(orig)
c2 = extract_code_blocks(comp)
if c1 != c2:
result.add_error("Code blocks not preserved exactly")
def validate_urls(orig, comp, result):
u1 = extract_urls(orig)
u2 = extract_urls(comp)
if u1 != u2:
result.add_error(f"URL mismatch: lost={u1 - u2}, added={u2 - u1}")
def validate_paths(orig, comp, result):
p1 = extract_paths(orig)
p2 = extract_paths(comp)
if p1 != p2:
result.add_warning(f"Path mismatch: lost={p1 - p2}, added={p2 - p1}")
def validate_bullets(orig, comp, result):
b1 = count_bullets(orig)
b2 = count_bullets(comp)
if b1 == 0:
return
diff = abs(b1 - b2) / b1
if diff > 0.15:
result.add_warning(f"Bullet count changed too much: {b1} -> {b2}")
# ---------- Main ----------
def validate(original_path: Path, compressed_path: Path) -> ValidationResult:
result = ValidationResult()
orig = read_file(original_path)
comp = read_file(compressed_path)
validate_headings(orig, comp, result)
validate_code_blocks(orig, comp, result)
validate_urls(orig, comp, result)
validate_paths(orig, comp, result)
validate_bullets(orig, comp, result)
return result
# ---------- CLI ----------
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: python validate.py <original> <compressed>")
sys.exit(1)
orig = Path(sys.argv[1]).resolve()
comp = Path(sys.argv[2]).resolve()
res = validate(orig, comp)
print(f"\nValid: {res.is_valid}")
if res.errors:
print("\nErrors:")
for e in res.errors:
print(f" - {e}")
if res.warnings:
print("\nWarnings:")
for w in res.warnings:
print(f" - {w}")