# Codex Usage Tracking With TelemHQ

Source HTML: https://telemhq.com/docs/codex

Use TelemHQ to track local Codex usage over time. The watcher reads completed Codex response events from local Codex SQLite logs and sends usage metadata to a TelemHQ tracker, including token usage, model, project name, git repo, and git branch when Codex has that context.

The watcher sends usage metadata only. It does not send prompts, generated code, command arguments, file contents, model output, or raw local project paths by default.

## Create A Tracker

In TelemHQ, create a tracker named `Codex usage`.

Leave the schedule blank. Codex usage is ad hoc, so TelemHQ should not mark missed pings as failures.

Copy the tracker ping URL.

## Save Your Ping URL

```bash
mkdir -p ~/.telemhq
export TELEMHQ_CODEX_PING_URL="https://telemhq.com/ping/YOUR_TRACKING_TOKEN"
printf 'TELEMHQ_CODEX_PING_URL="%s"\n' "$TELEMHQ_CODEX_PING_URL" > ~/.telemhq/codex-usage.env
chmod 600 ~/.telemhq/codex-usage.env
```

The private env file lets the watcher run from macOS launchd or another background service that does not inherit your terminal environment.

Optional: if you want to include the raw local working directory in each ping, add `CODEX_USAGE_INCLUDE_PROJECT_PATH="1"` to `~/.telemhq/codex-usage.env`. Most users should leave this off and use the default `project_path_hash` instead.

## Watcher Script

Save this as `~/.telemhq/codex-usage-watch.py`:

```python
#!/usr/bin/env python3
import argparse
import datetime as dt
import hashlib
import json
import os
import re
import sqlite3
import sys
import time
import urllib.parse
import urllib.request
from pathlib import Path

DEFAULT_DB = Path.home() / ".codex" / "logs_2.sqlite"
DEFAULT_STATE_DB = Path.home() / ".codex" / "state_5.sqlite"
DEFAULT_ENV = Path.home() / ".telemhq" / "codex-usage.env"
DEFAULT_STATE = Path.home() / ".telemhq" / "codex-usage-watch.state.json"

def load_env_file(path):
    if not path.exists():
        return
    for line in path.read_text().splitlines():
        line = line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        key = key.strip()
        value = value.strip().strip('"').strip("'")
        if key and key not in os.environ:
            os.environ[key] = value

def read_state(path):
    if not path.exists():
        return {"last_id": 0, "response_ids": []}
    raw = path.read_text().strip()
    if not raw:
        return {"last_id": 0, "response_ids": []}
    try:
        data = json.loads(raw)
        return {
            "last_id": int(data.get("last_id", 0)),
            "response_ids": list(data.get("response_ids", [])),
        }
    except json.JSONDecodeError:
        return {"last_id": int(raw), "response_ids": []}

def write_state(path, state):
    path.parent.mkdir(parents=True, exist_ok=True)
    state["response_ids"] = state.get("response_ids", [])[-500:]
    path.write_text(json.dumps(state, indent=2))

def extract_event(body):
    for marker in ("Received message ", "websocket event: "):
        index = body.find(marker)
        if index != -1:
            raw = body[index + len(marker):].strip()
            if raw.startswith("{"):
                return json.loads(raw)
    return None

def model_from_body(body, response):
    if response.get("model"):
        return response["model"]
    matches = re.findall(r"\bmodel=([^ }\]]+)", body)
    return matches[-1] if matches else "unknown"

def thread_from_body(body, row_thread_id):
    if row_thread_id:
        return row_thread_id
    match = re.search(r"(?:thread_id|thread\.id)=([0-9a-zA-Z-]+)", body)
    return match.group(1) if match else None

def sqlite_readonly_uri(path):
    return "file:{}?mode=ro".format(urllib.parse.quote(str(path), safe="/:"))

def git_repo_slug(origin_url):
    if not origin_url:
        return None
    value = origin_url.strip()
    if value.endswith(".git"):
        value = value[:-4]
    match = re.search(r"(?:github\.com[:/])([^/]+/[^/]+)$", value)
    if match:
        return match.group(1)
    if "/" in value:
        parts = [part for part in value.rsplit("/", 2) if part]
        if len(parts) >= 2:
            return "/".join(parts[-2:])
    if ":" in value:
        return value.rsplit(":", 1)[-1]
    return value or None

def project_metadata_from_thread(state_db_path, thread_id):
    if not thread_id or not state_db_path.exists():
        return {}
    try:
        with sqlite3.connect(sqlite_readonly_uri(state_db_path), uri=True) as connection:
            row = connection.execute(
                """
                SELECT cwd, git_branch, git_origin_url, git_sha
                FROM threads
                WHERE id = ?
                LIMIT 1
                """,
                (thread_id,),
            ).fetchone()
    except sqlite3.Error:
        return {}
    if not row:
        return {}

    cwd, git_branch, git_origin_url, git_sha = row
    metadata = {}
    if cwd:
        metadata["project"] = Path(cwd).name or "unknown"
        metadata["project_path_hash"] = hashlib.sha256(cwd.encode("utf-8")).hexdigest()[:12]
        if os.getenv("CODEX_USAGE_INCLUDE_PROJECT_PATH") == "1":
            metadata["project_cwd"] = cwd
    repo = git_repo_slug(git_origin_url)
    if repo:
        metadata["git_repo"] = repo
    if git_branch:
        metadata["git_branch"] = git_branch
    if git_sha:
        metadata["git_sha"] = git_sha[:12]
    return metadata

def build_payload(row_id, ts, row_thread_id, body, state_db_path):
    event = extract_event(body)
    if not event or event.get("type") != "response.completed":
        return None
    response = event.get("response") or {}
    usage = response.get("usage") or {}
    if not usage:
        return None
    input_tokens = usage.get("input_tokens")
    output_tokens = usage.get("output_tokens")
    total_tokens = usage.get("total_tokens")
    input_details = usage.get("input_tokens_details") or {}
    output_details = usage.get("output_tokens_details") or {}
    thread_id = thread_from_body(body, row_thread_id)
    payload = {
        "tool": "codex",
        "provider": "openai",
        "status": response.get("status") or "completed",
        "model": model_from_body(body, response),
        "session_type": "local",
        "thread_id": thread_id,
        "response_id": response.get("id"),
        "log_id": row_id,
        "timestamp": dt.datetime.fromtimestamp(ts, dt.timezone.utc).isoformat(),
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "total_tokens": total_tokens,
        "cached_input_tokens": input_details.get("cached_tokens"),
        "reasoning_tokens": output_details.get("reasoning_tokens"),
    }
    payload.update(project_metadata_from_thread(state_db_path, thread_id))
    return {key: value for key, value in payload.items() if value is not None}

def send_ping(ping_url, payload):
    data = json.dumps(payload).encode("utf-8")
    request = urllib.request.Request(
        ping_url,
        data=data,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    with urllib.request.urlopen(request, timeout=15) as response:
        if response.status >= 300:
            raise RuntimeError("TelemHQ returned HTTP {}".format(response.status))

def scan_once(db_path, codex_state_db_path, state_path, ping_url):
    state = read_state(state_path)
    sent_response_ids = set(state.get("response_ids", []))
    with sqlite3.connect(sqlite_readonly_uri(db_path), uri=True) as connection:
        rows = connection.execute(
            """
            SELECT id, ts, thread_id, feedback_log_body
            FROM logs
            WHERE id > ?
              AND feedback_log_body LIKE '%response.completed%'
              AND feedback_log_body LIKE '%"usage":%'
            ORDER BY id ASC
            """,
            (state["last_id"],),
        ).fetchall()
    sent = 0
    for row_id, ts, row_thread_id, body in rows:
        state["last_id"] = row_id
        try:
            payload = build_payload(row_id, ts, row_thread_id, body or "", codex_state_db_path)
        except json.JSONDecodeError:
            write_state(state_path, state)
            continue
        if not payload:
            write_state(state_path, state)
            continue
        response_id = payload.get("response_id")
        if response_id and response_id in sent_response_ids:
            write_state(state_path, state)
            continue
        send_ping(ping_url, payload)
        if response_id:
            sent_response_ids.add(response_id)
            state["response_ids"] = list(sent_response_ids)
        write_state(state_path, state)
        sent += 1
        print("Sent Codex usage ping: model={} project={} total_tokens={}".format(
            payload.get("model"),
            payload.get("project", "unknown"),
            payload.get("total_tokens")
        ))
    return sent

def main():
    parser = argparse.ArgumentParser(description="Send Codex usage metrics to TelemHQ.")
    parser.add_argument("--watch", action="store_true", help="Keep polling for new Codex responses.")
    parser.add_argument("--interval", type=int, default=int(os.getenv("CODEX_USAGE_POLL_SECONDS", "60")))
    parser.add_argument("--db", default=os.getenv("CODEX_USAGE_DB", str(DEFAULT_DB)))
    parser.add_argument("--codex-state-db", default=os.getenv("CODEX_STATE_DB", str(DEFAULT_STATE_DB)))
    parser.add_argument("--state", default=os.getenv("CODEX_USAGE_STATE_FILE", str(DEFAULT_STATE)))
    parser.add_argument("--env-file", default=os.getenv("CODEX_USAGE_ENV_FILE", str(DEFAULT_ENV)))
    args = parser.parse_args()
    load_env_file(Path(args.env_file).expanduser())
    ping_url = os.getenv("TELEMHQ_CODEX_PING_URL") or os.getenv("TELEMHQ_PING_URL")
    if not ping_url:
        sys.exit("Set TELEMHQ_CODEX_PING_URL in your shell or ~/.telemhq/codex-usage.env.")
    db_path = Path(args.db).expanduser()
    codex_state_db_path = Path(args.codex_state_db).expanduser()
    state_path = Path(args.state).expanduser()
    if not db_path.exists():
        sys.exit("Codex log database not found: {}".format(db_path))
    while True:
        sent = scan_once(db_path, codex_state_db_path, state_path, ping_url)
        if not args.watch:
            if sent == 0:
                print("No new completed Codex usage events found.")
            return
        time.sleep(max(args.interval, 10))

if __name__ == "__main__":
    main()
```

Make it executable:

```bash
chmod +x ~/.telemhq/codex-usage-watch.py
```

## Send A Test Ping

```bash
~/.telemhq/codex-usage-watch.py
```

In TelemHQ, open the tracker and check Ping History. You should see fields such as `model`, `input_tokens`, `output_tokens`, `total_tokens`, `cached_input_tokens`, and `reasoning_tokens`.

When Codex has project context available, you should also see fields such as `project`, `project_path_hash`, `git_repo`, `git_branch`, and `git_sha`.

## Keep It Running

```bash
~/.telemhq/codex-usage-watch.py --watch
```

## Start Automatically On macOS

Create a LaunchAgent so the watcher starts when you log in and restarts if it exits:

```bash
mkdir -p ~/Library/LaunchAgents

cat > ~/Library/LaunchAgents/com.telemhq.codex-usage-watch.plist <<PLIST
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
  "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
  <key>Label</key>
  <string>com.telemhq.codex-usage-watch</string>
  <key>ProgramArguments</key>
  <array>
    <string>$HOME/.telemhq/codex-usage-watch.py</string>
    <string>--watch</string>
    <string>--interval</string>
    <string>30</string>
  </array>
  <key>RunAtLoad</key>
  <true/>
  <key>KeepAlive</key>
  <true/>
  <key>StandardOutPath</key>
  <string>$HOME/.telemhq/codex-usage-watch.log</string>
  <key>StandardErrorPath</key>
  <string>$HOME/.telemhq/codex-usage-watch.err.log</string>
  <key>WorkingDirectory</key>
  <string>$HOME</string>
</dict>
</plist>
PLIST

launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/com.telemhq.codex-usage-watch.plist 2>/dev/null || true
launchctl bootstrap gui/$(id -u) ~/Library/LaunchAgents/com.telemhq.codex-usage-watch.plist
launchctl kickstart -k gui/$(id -u)/com.telemhq.codex-usage-watch
```

Verify it:

```bash
launchctl print gui/$(id -u)/com.telemhq.codex-usage-watch
pgrep -af codex-usage-watch.py
tail -f ~/.telemhq/codex-usage-watch.log ~/.telemhq/codex-usage-watch.err.log
```

## Recommended Charts

Pin these fields in TelemHQ Analytics:

- `total_tokens`
- `input_tokens`
- `output_tokens`
- `cached_input_tokens`
- `reasoning_tokens`
- `project` and `git_repo` in the payload summary
- `git_branch` in the payload summary

TelemHQ also shows a project usage chart when recent Codex payloads include more than one project. For Codex payloads, this chart uses `total_tokens` by default.

## Related Guides

- Sending pings: https://telemhq.com/docs/integration
- OpenAI pipeline tracking: https://telemhq.com/docs/openai
- Claude pipeline tracking: https://telemhq.com/docs/claude
