Skip to content

Log analyzer

Parses an access log, groups entries by status code and path, computes per-bucket counts, and dumps a structured JSON report. Real-world data-pipeline shape in ~80 lines.

Source: examples/09_log_analyzer.c4

# Real CLI tool: analyze nginx-style access logs.
#
# Reads a log file, parses it, computes per-status / per-path statistics,
# and writes a JSON report. Demonstrates: stdlib (cli, fs, data, strings),
# string interpolation, *rest patterns, where filter, each-as-expression.
#
# Run: c4 run examples/09_log_analyzer.c4 -- examples/_sample_access.log

use cobra4.stdlib.cli as cli
use cobra4.stdlib.fs as fs
use cobra4.stdlib.data as data
use cobra4.stdlib.strings as strs

# Match: 127.0.0.1 - - [10/Apr/2026:13:55:36 +0000] "GET /api/x HTTP/1.1" 200 1234
fn parse_line(line) {
    parts = strs.extract_all(line, r'"([^"]*)"|\[([^\]]*)\]|(\S+)')
    flat = []
    for p in parts {
        for piece in p {
            if piece {
                flat.append(piece)
            }
        }
    }
    if len(flat) < 7 {
        return None
    }
    request = flat[5].split(" ")
    method = request[0] if len(request) > 0 else ""
    path = request[1] if len(request) > 1 else ""
    return {
        "ip": flat[0],
        "method": method,
        "path": path,
        "status": int(flat[6]),
        "bytes": int(flat[7]) if len(flat) > 7 and flat[7].isdigit() else 0,
    }
}

fn analyze(path) {
    "Read a log file and produce summary stats."
    text = fs.read_text(path)
    lines = strs.lines(text)

    parsed = []
    for line in lines {
        entry = parse_line(line)
        if entry is not None {
            parsed.append(entry)
        }
    }

    by_status = data.group_by(parsed, "status")
    status_counts = {}
    for entry in by_status.items() {
        status_counts[str(entry[0])] = len(entry[1])
    }

    fn _sum_bytes(rs) {
        total = 0
        for r in rs { total += r["bytes"] }
        return total
    }

    by_path = data.aggregate(
        parsed,
        "path",
        {"hits": fn(rs) = len(rs), "bytes": _sum_bytes},
    )

    errors = each r in parsed where r["status"] >= 500 { r }

    return {
        "total": len(parsed),
        "by_status": status_counts,
        "top_paths": data.sort_by(by_path, "hits", reverse=True)[:10],
        "errors": errors,
    }
}

fn main(input="./_sample_access.log", out="./_log_report.json") {
    "Parse a log file and write a JSON report."
    if not fs.exists(input) {
        # Generate a tiny sample for demo purposes.
        sample = (
            r'127.0.0.1 - - [10/Apr/2026:13:55:36 +0000] "GET /api/users HTTP/1.1" 200 1234' + "\n" +
            r'10.0.0.5 - - [10/Apr/2026:13:55:37 +0000] "POST /api/orders HTTP/1.1" 201 89' + "\n" +
            r'127.0.0.1 - - [10/Apr/2026:13:55:38 +0000] "GET /api/users HTTP/1.1" 200 1200' + "\n" +
            r'10.0.0.5 - - [10/Apr/2026:13:55:39 +0000] "GET /api/missing HTTP/1.1" 404 0' + "\n" +
            r'127.0.0.1 - - [10/Apr/2026:13:55:40 +0000] "GET /api/error HTTP/1.1" 500 41' + "\n"
        )
        fs.write_text(input, sample)
        log("generated sample log", file=input)
    }

    report = analyze(input)
    save(report, out)

    log("analysis complete",
        total=report["total"],
        unique_paths=len(report["top_paths"]),
        errors=len(report["errors"]),
        out=out)
    return report
}

cli.quick(main, description="Analyze access logs into a JSON report.")

Run it

c4 run examples/09_log_analyzer.c4