Skip to main content

File Parsing Bot Version 3, Revamped Py Script

Here is the third and current version (as of this writing) on the Python script I use to parse audio files generated by SDRTrunk, and push them to my Discord server.Β 

What I ended up going with was using a different machine for running SDRTrunk - a Lenovo thin client running Lubuntu. SDR sticks and SDRTrunk live on that, with a shared folder on the network for SDRTrunk to drop audio files into. From there, these scripts below live on my Proxmox host in a scripts-only container, where the heavy lifting is. This is different from the previous versions where the script and SDRTrunk lived on a Windows 11 2025 server VM on my Proxmox host. While it worked, it was sluggish despite having all the resources our government could muster. Splitting them like I have seems to have helped out immensely.Β 

What's changed?

  • Code base updated, condensed and cleaned up a bit
  • New and improved .env file with hard mapped stuff to eliminate duplicate threading
  • Whisper Prompt vocabulary added to aid in transcription
  • Local timestamps
  • Talkgroup mapping (icon, color, name)
  • Fallback routing for unknown systems
  • Thread de-dupe + retry (DB -> name -> create)
  • Watchdog + safety poller
  • Backlog concurrency
  • Rotating file logging

Move from Windows to Linux

Set up the container with the following:

sudo apt update
sudo apt install python3 python3-venv python3-pip ffmpeg -y

Make your directories:

mkdir /opt/sdrtrunk-discord
mkdir /opt/sdrtrunk-discord/database
mkdir /opt/sdrtrunk-discord/logs

Set up the venv and start it:

cd /opt/sdrtrunk-discord
python3 -m venv venv

Install the reqs:

pip install discord.py python-dotenv faster-whisper mutagen watchdog aiohttp pynacl

Set up a systemd service

sudo nano /etc/systemd/system/sdr-uploader.service
[Unit]
Description=SDRTrunk to Discord Uploader
After=network.target

[Service]
User=root
WorkingDirectory=/opt/sdrtrunk-discord
ExecStart=/opt/sdrtrunk-discord/venv/bin/python3 /opt/sdrtrunk-discord/uploader.py
Restart=always

[Install]
WantedBy=multi-user.target

Enable and start the service:

sudo systemctl enable sdr-uploader.service
sudo systemctl start sdr-uploader.service

Monitor logs:

sudo journalctl -u sdr-uploader.service -f

.env File

Place this file in the root directory: /opt/sdrtrunk-discord

# ==================================================================#
# SDR-Trunk-Discord Project v20251005.01.02                         #
# Spartan X311 https://skynet2.net                                  #
# All Your Comms Are Now Belong To Me                               #
# SDRTrunk DMR Uploader  Β© 2025 by Spartan X311 is licensed         #
# under CC BY-SA 4.0. To view a copy of this license, visit         #
# https://creativecommons.org/licenses/by-sa/4.0/                   #
#                                                                   #
# .env                                                              #
# ==================================================================#

# --- Discord Bot ---
DISCORD_BOT_TOKEN=your-token-here

# --- Paths ---
# Previous Directory: AUDIO_ROOT=/mnt/recordings
AUDIO_ROOT=/mnt/sdrtrunk-audio
SCRIPTS_DIR=/opt/sdrtrunk-discord
DB_DIR=/opt/sdrtrunk-discord/database
SQLITE_PATH=/opt/sdrtrunk-discord/database/uploader.sqlite3
LOGS_DIR=/opt/sdrtrunk-discord/logs

# --- System β†’ Channel map (IDs) ---
# Keys MUST match the "system" values in TALKGROUPS_JSON (case-insensitive in code)
SYSTEM_CHANNEL_MAP_PATH=${SCRIPTS_DIR}/system_map.json

# --- Known Talkgroups (JSON) ---
# id, system, alpha (name), color (hex), icon (emoji)
TALKGROUPS_JSON_PATH=${SCRIPTS_DIR}/talkgroups.json

# --- Hard-Mapped Threads (optional) ---
# Format: "SYSTEM:TGID": "THREAD_ID"
HARD_MAPPED_THREADS_PATH=${SCRIPTS_DIR}/hard_map.json

# --- Timezone ---
TIMEZONE=America/Phoenix

# --- Logging ---
LOG_LEVEL=DEBUG
LOG_TIMING=true

# --- Faster-Whisper ---
WHISPER_DEVICE=cpu
WHISPER_COMPUTE_TYPE=int8
WHISPER_MODEL_NAME=medium.en
WHISPER_LANGUAGE=en
WHISPER_BEAM_SIZE=3
WHISPER_VAD_FILTER=false
HF_HUB_DISABLE_PROGRESS_BARS=1

# --- Faster-Whisper Vocabulary ---
# It's best to include variations of spelling, abbreviations need spaces (ED vs E D)
WHISPER_PROMPT="Evolv, EVS, dispatch, ambo, screening, inmate, prisoner, endo, tele, plant ops, admin rep, house sup, copy, copies, copy that, in the clear, over and out, E D, Emergency Department, P D, L and D, P B X, N O C, NOC, knock, heilo, he low, helicopter, K 9, kay nine, K9"

# --- Behavior / Performance ---
SCAN_GLOB=**/*.mp3
STARTUP_BACKLOG_LIMIT=0
MIN_FILE_AGE_SEC=5
QUEUE_PATIENCE_SEC=45
BACKLOG_CONCURRENCY=2
POLL_INTERVAL_SEC=15
SEQUENTIAL_BACKLOG=true

# --- Fallback styling ---
FALLBACK_EMBED_COLOR=#FFFFFF

# --- File Cleanup ---
ENABLE_CLEANUP=false
# How old a file must be (in minutes) before it's eligible for deletion.
DELETE_FILES_OLDER_THAN_MINUTES=120
# How often (in minutes) the cleanup task should run.
CLEANUP_INTERVAL_MINUTES=30

# --- Database Cleanup ---
ENABLE_DB_CLEANUP=true
# How long to keep records of processed files (in days).
DB_RECORD_RETENTION_DAYS=30
# How often (in hours) the database cleanup task should run.
DB_CLEANUP_INTERVAL_HOURS=24

JSON Files

These are the helper files to outline system/talkgroup to Discord

talkgroups.json
[
  {
    "id": 107,
    "system": "SYSTEM 1",
    "alpha": "EVS",
    "color": "#2E7D32",
    "icon": "🫧"
  },
  {
    "id": 108,
    "system": "SYSTEM 1",
    "alpha": "Food Services",
    "color": "#AEEA00",
    "icon": "🍽️"
  },
  {
    "id": 109,
    "system": "SYSTEM 1",
    "alpha": "Four Peaks Incident Command",
    "color": "#72246c",
    "icon": "⚑"
  },
  {
    "id": 110,
    "system": "SYSTEM 1",
    "alpha": "Facilities",
    "color": "#4E342E",
    "icon": "πŸ› οΈ"
  },
  {
    "id": 111,
    "system": "SYSTEM 1",
    "alpha": "Security",
    "color": "#1565C0",
    "icon": "πŸ›‘οΈ"
  },
  {
    "id": 205,
    "system": "SYSTEM 1",
    "alpha": "All Incident Command",
    "color": "#FF8F00",
    "icon": "⚠️"
  },
  {
    "id": 206,
    "system": "SYSTEM 1",
    "alpha": "Sec Network Ops",
    "color": "#1DE9B6",
    "icon": "πŸ–₯️"
  },
  {
    "id": 207,
    "system": "SYSTEM 1",
    "alpha": "Security Investigations",
    "color": "#80D8FF",
    "icon": "πŸ”"
  },
  {
    "id": 1201,
    "system": "SYSTEM 2",
    "alpha": "Operations",
    "color": "#69a7d2",
    "icon": "βš”οΈ"
  },
  {
    "id": 1202,
    "system": "SYSTEM 2",
    "alpha": "Tactical",
    "color": "#607D8B",
    "icon": "πŸ—‘οΈ"
  }
]
system_map.json
{
  "SYSTEM 1": your-id-here,
  "SYSTEM 2": your-id-here,
  "FALLBACK": your-id-here
}
hard_map.json
{
  "SYSTEM 2:1201": "your-id-here",
  "SYSTEM 2:1202": "your-id-here",
  "SYSTEM 1:107": "your-id-here",
  "SYSTEM 1:108": "your-id-here",
  "SYSTEM 1:109": "your-id-here",
  "SYSTEM 1:110": "your-id-here",
  "SYSTEM 1:111": "your-id-here",
  "SYSTEM 1:205": "your-id-here",
  "SYSTEM 1:206": "your-id-here",
  "SYSTEM 1:207": "your-id-here"
}

uploader.py File

Place this file in the root directory: /opt/sdrtrunk-discord

#!/usr/bin/env python3
# ==================================================================#
# SDR-Trunk-Discord Project v20251005.01.02                         #
# Spartan X311 https://skynet2.net                                  #
# All Your Comms Are Now Belong To Me                               #
# SDRTrunk DMR Uploader Β© 2025 by Spartan X311 is licensed          #
# under CC BY-SA 4.0. To view a copy of this license, visit         #
# https://creativecommons.org/licenses/by-sa/4.0/                   #
#                                                                   #
# uploader.py                                                       #
# Clean rewrite for stability + correct filename parsing + retries  #
# ==================================================================#

import asyncio
import json
import logging
import os
import re
import sys
import time
from datetime import datetime, timedelta
from fnmatch import fnmatch
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple

import aiohttp
import aiosqlite
import discord
from discord import Embed, File as DiscordFile, HTTPException
from dotenv import load_dotenv
from faster_whisper import WhisperModel
from mutagen.easyid3 import EasyID3
from mutagen.mp3 import MP3
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError

# ---------------------------
# .env loading
# ---------------------------
SCRIPT_ROOT = Path(__file__).parent
DOTENV_PATH = SCRIPT_ROOT / ".env"
if DOTENV_PATH.exists():
    load_dotenv(DOTENV_PATH)
else:
    load_dotenv()

# ---------------------------
# Config from env
# ---------------------------
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN", "").strip()
AUDIO_ROOT = Path(os.getenv("AUDIO_ROOT", ".")).expanduser()
DB_PATH = Path(os.getenv("SQLITE_PATH", "./uploader.sqlite3")).expanduser()
DB_PATH.parent.mkdir(parents=True, exist_ok=True)

LOGS_DIR = Path(os.getenv("LOGS_DIR", "./logs")).expanduser()
LOGS_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOGS_DIR / "uploader.log"
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_TIMING = os.getenv("LOG_TIMING", "false").lower() == "true"

SCAN_GLOB = os.getenv("SCAN_GLOB", "**/*.mp3")
STARTUP_BACKLOG_LIMIT = int(os.getenv("STARTUP_BACKLOG_LIMIT", "0"))
MIN_FILE_AGE_SEC = int(os.getenv("MIN_FILE_AGE_SEC", "5"))
QUEUE_PATIENCE_SEC = int(os.getenv("QUEUE_PATIENCE_SEC", "10"))
BACKLOG_CONCURRENCY = max(1, int(os.getenv("BACKLOG_CONCURRENCY", "2")))
POLL_INTERVAL_SEC = max(3, int(os.getenv("POLL_INTERVAL_SEC", "15")))

ENABLE_CLEANUP = os.getenv("ENABLE_CLEANUP", "false").lower() == "true"
DELETE_FILES_OLDER_THAN_MINUTES = int(os.getenv("DELETE_FILES_OLDER_THAN_MINUTES", "120"))
CLEANUP_INTERVAL_MINUTES = int(os.getenv("CLEANUP_INTERVAL_MINUTES", "30"))

ENABLE_DB_CLEANUP = os.getenv("ENABLE_DB_CLEANUP", "false").lower() == "true"
DB_RECORD_RETENTION_DAYS = int(os.getenv("DB_RECORD_RETENTION_DAYS", "30"))
DB_CLEANUP_INTERVAL_HOURS = int(os.getenv("DB_CLEANUP_INTERVAL_HOURS", "24"))

PROCESSING_TS: Dict[Path, datetime] = {}
PROCESSING_FILES = set()
FILE_QUEUE: asyncio.PriorityQueue = asyncio.PriorityQueue()
RESULT_QUEUE: asyncio.PriorityQueue = asyncio.PriorityQueue()

# Tracks timestamps of files currently stabilizing/transcribing so the poster can wait
def min_processing_ts() -> Optional[datetime]:
    if not PROCESSING_TS:
        return None
    return min(PROCESSING_TS.values())

# Basic date/time sanity (used for release ordering; tolerant)
FILENAME_RE = re.compile(
    r"""
    ^
    (?P<date>\d{8})_(?P<time>\d{6})_?          # 20251021_071355 or 20251021_071355_
    (?P<system>[^_]+)                          # system (no underscore)
    (?:_(?P<site>\d+))?                        # optional numeric site id
    _(?P<site_name>.*?)                        # site/facility label (greedy-min, allows dashes etc.)
    __TO_(?P<tg>\d+)                           # required talkgroup
    (?:_FROM_(?P<src>\d+))?                    # optional source/unit id
    (?:_V(?P<ver>\d+))?                        # optional version suffix like _V2
    (?:_TONES(?:_[0-9.]+)+)?                   # optional trailing tone freqs: _TONES_1562.50_1312.50...
    \.mp3$
    """,
    re.VERBOSE,
)

# Detailed filename parser (covers both β€œstable” and β€œnightly” SDRTrunk variants)
# Examples:
#  20251020_213340System1_11_T-Four_Peaks__TO_111_FROM_15004.mp3
#  20251020_193608System2_SR_Salt_River__TO_1202.mp3
FNAME_RE = re.compile(
    r"""
    ^(?P<date>\d{8})_(?P<time>\d{6})_?            # 20251020_213340
    (?P<system>[^_]+)                             # System 1 / System 2
    (?:_(?P<site>[^_]+))?                         # optional site id or code (e.g. 11 or SR)
    _(?P<site_name>.+?)                           # Site_One / Site_Two (greedy until __TO_)
    __TO_(?P<tg>\d+)                              # talkgroup id
    (?:_FROM_(?P<src>\d+))?                       # optional source/unit id
    (?:_TONES_[^.]+)?                             # optional tones suffix
    \.mp3$                                        # extension
    """,
    re.IGNORECASE | re.VERBOSE,
)

TITLE_RE = re.compile(
    r'^\s*(?P<tg>\d+)\s*(?:"|\'|β€˜|’)?(?P<tg_name>[^"\'β€˜β€™]+?)(?:"|\'|β€˜|’)?\s*$'
)

# System map (upper-cased keys)
def _load_json_from_path(path_str: str, default_val: any) -> any:
    if not path_str:
        return default_val
    p = Path(path_str).expanduser()
    if not p.exists():
        logging.getLogger("uploader").warning(f"JSON config file not found: {p}")
        return default_val
    try:
        with open(p, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        logging.getLogger("uploader").exception(f"Failed to load/parse JSON config {p}: {e}")
        return default_val

SYSTEM_CHANNEL_MAP_PATH = os.getenv("SYSTEM_CHANNEL_MAP_PATH", "")
try:
    _RAW_MAP = _load_json_from_path(SYSTEM_CHANNEL_MAP_PATH, {})
    SYSTEM_CHANNEL_MAP: Dict[str, int] = {
        str(k).strip().upper(): int(v) for k, v in _RAW_MAP.items()
    }
except Exception:
    SYSTEM_CHANNEL_MAP = {}

TALKGROUPS_JSON_PATH = os.getenv("TALKGROUPS_JSON_PATH", "")
try:
    _TG_LIST = _load_json_from_path(TALKGROUPS_JSON_PATH, [])
except Exception:
    _TG_LIST = []

def _norm_sys(s: Optional[str]) -> str:
    return (s or "").strip().upper()

TG_MAP: Dict[Tuple[str, str], Dict[str, str]] = {}
for item in _TG_LIST:
    try:
        sys_key = _norm_sys(item.get("system"))
        tg_key = str(item.get("id"))
        TG_MAP[(sys_key, tg_key)] = {
            "alpha": str(item.get("alpha") or "").strip(),
            "color": str(item.get("color") or "").strip(),
            "icon": str(item.get("icon") or "").strip(),
        }
    except Exception:
        pass

HARD_MAPPED_THREADS_PATH = os.getenv("HARD_MAPPED_THREADS_PATH", "")
try:
    HARD_MAP_RAW = _load_json_from_path(HARD_MAPPED_THREADS_PATH, {})
    HARD_MAP = {str(k).strip().upper(): str(v).strip() for k, v in HARD_MAP_RAW.items()}
except Exception:
    HARD_MAP = {}

FALLBACK_KEY = "FALLBACK"
FALLBACK_EMBED_COLOR = os.getenv("FALLBACK_EMBED_COLOR", "#FFFFFF")

# Faster-Whisper
WHISPER_MODEL_NAME = os.getenv("WHISPER_MODEL_NAME", "small.en")
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
WHISPER_BEAM_SIZE = int(os.getenv("WHISPER_BEAM_SIZE", "1"))
WHISPER_VAD_FILTER = os.getenv("WHISPER_VAD_FILTER", "false").lower() == "true"
WHISPER_LANG = os.getenv("WHISPER_LANGUAGE", "en")
WHISPER_PROMPT = os.getenv("WHISPER_PROMPT", None)

# Timezone
LOCAL_TZ_NAME = os.getenv("TIMEZONE", "America/Phoenix")
def _load_local_tz(name: str):
    try:
        return ZoneInfo(name)
    except ZoneInfoNotFoundError:
        for alt in (name, "US/Arizona", "MST7MDT"):
            try:
                return ZoneInfo(alt)
            except Exception:
                pass
        return datetime.now().astimezone().tzinfo
LOCAL_TZ = _load_local_tz(LOCAL_TZ_NAME)

# ---------------------------
# Logging setup
# ---------------------------
root = logging.getLogger()
root.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
fmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s | %(message)s")

ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(fmt)
root.addHandler(ch)

fh = RotatingFileHandler(LOG_FILE, maxBytes=5 * 1024 * 1024, backupCount=7, encoding="utf-8")
fh.setFormatter(fmt)
root.addHandler(fh)

logging.getLogger("discord").setLevel(logging.INFO)
logging.getLogger("watchdog").setLevel(logging.INFO)
logging.getLogger("aiohttp").setLevel(logging.WARNING)

log = logging.getLogger("uploader")

# ---------------------------
# SQLite (init + migration)
# ---------------------------
SCHEMA_SQL = """
PRAGMA journal_mode=WAL;

CREATE TABLE IF NOT EXISTS files (
  id INTEGER PRIMARY KEY,
  path TEXT UNIQUE,
  size INTEGER,
  mtime REAL,
  status TEXT,                 -- NULL, 'processed', or 'failed'
  system TEXT,
  site TEXT,
  site_name TEXT,
  talkgroup TEXT,
  talkgroup_name TEXT,
  source TEXT,
  recorded_local TEXT,
  duration REAL,
  transcription TEXT,
  discord_message_id TEXT,
  discord_thread_id TEXT,
  processed_at_local TEXT,
  processing_error TEXT
);

CREATE INDEX IF NOT EXISTS idx_files_system_tg ON files(system, talkgroup);

CREATE TABLE IF NOT EXISTS threads (
  id INTEGER PRIMARY KEY,
  system TEXT,
  talkgroup TEXT,
  channel_id TEXT,
  thread_id TEXT,
  title TEXT,
  created_at_local TEXT,
  UNIQUE(system, talkgroup)
);
"""

async def db_connect() -> aiosqlite.Connection:
    conn = await aiosqlite.connect(DB_PATH)
    conn.row_factory = aiosqlite.Row
    await conn.execute("PRAGMA journal_mode=WAL")
    return conn

async def _has_column(conn: aiosqlite.Connection, table: str, column: str) -> bool:
    async with conn.execute(f"PRAGMA table_info({table})") as cursor:
        info = await cursor.fetchall()
        return any(col["name"] == column for col in info)

async def db_init_and_migrate():
    conn = await db_connect()
    try:
        await conn.executescript(SCHEMA_SQL)
        # Backfill columns if this is an older DB
        if not await _has_column(conn, "files", "status"):
            await conn.execute("ALTER TABLE files ADD COLUMN status TEXT")
        if not await _has_column(conn, "files", "processing_error"):
            await conn.execute("ALTER TABLE files ADD COLUMN processing_error TEXT")
        await conn.commit()

        # IMPORTANT: fix legacy rows that were defaulted to 'processed' without a message id
        await conn.execute(
            "UPDATE files SET status=NULL WHERE (status IS NULL OR status='processed') AND (discord_message_id IS NULL OR discord_message_id='')"
        )
        await conn.commit()
    finally:
        await conn.close()

# ---------------------------
# Parsing helpers
# ---------------------------
def parse_local_dt_from_filename(date_str: str, time_str: str) -> Optional[datetime]:
    try:
        s = f"{date_str}{time_str}"
        dt = datetime.strptime(s, "%Y%m%d%H%M%S")
        return dt.replace(tzinfo=LOCAL_TZ)
    except Exception as e:
        log.warning(f"parse_local_dt_from_filename failed for date_str={date_str} time_str={time_str}: {e}")
        return None

def recorded_datetime_for_path(p: Path) -> datetime:
    m = FILENAME_RE.match(p.name)
    if m:
        dt = parse_local_dt_from_filename(m.group("date"), m.group("time"))
        if dt is not None:
            return dt
    try:
        return datetime.fromtimestamp(p.stat().st_mtime, LOCAL_TZ)
    except Exception as e:
        log.error(f"recorded_datetime_for_path: stat() failed for {p}: {e}", exc_info=True)
        return datetime.now(LOCAL_TZ)

def parse_filename(p: Path) -> Dict[str, Optional[str]]:
    m = FNAME_RE.match(p.name)
    if not m:
        return {}
    d = m.groupdict()
    return {
        "system": d.get("system"),
        "site": d.get("site"),
        "site_name": (d.get("site_name") or "").replace("_", " ").strip(),
        "talkgroup": d.get("tg"),
        "source": d.get("src"),
        "recorded_local": parse_local_dt_from_filename(d["date"], d["time"]),
    }

def parse_id3(p: Path) -> Dict[str, Optional[str]]:
    out: Dict[str, Optional[str]] = {}
    try:
        audio = MP3(p)
        tags = EasyID3(p)
    except Exception as e:
        log.debug(f"ID3 read failed on {p.name}: {e}")
        return out

    if hasattr(audio.info, "length"):
        out["duration"] = float(audio.info.length)

    title = (tags.get("title", [None])[0] or "").strip()
    if title:
        tm = TITLE_RE.match(title)
        if tm:
            out["talkgroup"] = tm.group("tg")
            out["talkgroup_name"] = tm.group("tg_name").strip()
        else:
            out["talkgroup_name"] = title

    album = (tags.get("album", [None])[0] or "").strip()
    if album:
        out["site_name"] = album

    artist = (tags.get("artist", [None])[0] or "").strip()
    if artist:
        parts = artist.split(" ", 1)
        out["source"] = parts[0]

    # comment field key:value; accept 'Date:' too
    comment = None
    try:
        comment = audio.tags.get("COMM::'eng'").text[0]
    except Exception:
        comment = (tags.get("comment", [None])[0] or "").strip()
    if comment:
        for pair in comment.split(";"):
            if ":" in pair:
                k, v = pair.split(":", 1)
                k = k.strip().lower()
                v = v.strip()
                if k == "date":
                    out["recorded_local"] = v.split(".")[0] if "." in v else v
                elif k == "system":
                    out["system"] = v
                elif k == "site":
                    out["site_name"] = v
                elif k in ("name", "sitename"):
                    out["site_name"] = v
                elif k == "frequency":
                    out["frequency"] = v
                elif k == "decoder":
                    out["decoder"] = v

    return out

def merge_meta(primary: Dict, fallback: Dict) -> Dict:
    merged: Dict[str, Optional[str]] = {}
    for k in set(primary.keys()) | set(fallback.keys()):
        merged[k] = primary.get(k) or fallback.get(k)
    return merged

def _hex_to_int_color(hex_str: Optional[str]) -> Optional[int]:
    if not hex_str:
        return None
    s = str(hex_str).strip().lstrip("#")
    if len(s) == 3:
        s = "".join(ch * 2 for ch in s)
    if len(s) == 8:
        s = s[:6]
    try:
        return int(s, 16) & 0xFFFFFF
    except Exception:
        return None

# ---------------------------
# Whisper (lazy init)
# ---------------------------
_whisper_model: Optional[WhisperModel] = None
_whisper_lock = asyncio.Lock()

async def get_whisper() -> WhisperModel:
    global _whisper_model
    async with _whisper_lock:
        if _whisper_model is None:
            log.info(f"Loading faster-whisper '{WHISPER_MODEL_NAME}' on {WHISPER_DEVICE} ({WHISPER_COMPUTE_TYPE})...")
            _whisper_model = WhisperModel(
                WHISPER_MODEL_NAME, device=WHISPER_DEVICE, compute_type=WHISPER_COMPUTE_TYPE
            )
        return _whisper_model

async def transcribe_async(path: Path) -> str:
    try:
        model = await get_whisper()
        loop = asyncio.get_running_loop()

        def do_transcribe():
            segments, _info = model.transcribe(
                str(path),
                language=WHISPER_LANG,
                vad_filter=WHISPER_VAD_FILTER,
                beam_size=WHISPER_BEAM_SIZE,
                initial_prompt=WHISPER_PROMPT,
            )
            return "".join(seg.text for seg in segments).strip() or "(no speech detected)"

        transcription = await loop.run_in_executor(None, do_transcribe)
        return transcription
    except Exception as e:
        log.exception(f"Transcription failed: {e}")
        return "(transcription error)"

# ---------------------------
# Discord client + helpers
# ---------------------------
intents = discord.Intents.default()
client = discord.Client(intents=intents)

async def _safe_fetch_channel(cid: int) -> Optional[discord.abc.GuildChannel]:
    try:
        ch = client.get_channel(cid)
        if ch is None:
            ch = await client.fetch_channel(cid)
        return ch
    except Exception:
        return None

async def _iter_possible_threads(channel: discord.TextChannel) -> Iterable[discord.Thread]:
    try:
        for t in channel.threads:
            yield t
    except Exception:
        pass
    try:
        async for t in channel.archived_threads(limit=100, private=False):
            yield t
    except Exception:
        pass

async def _find_thread_by_name(channel: discord.TextChannel, name: str) -> Optional[int]:
    target = name.lower().strip()
    async for t in _iter_possible_threads(channel):
        try:
            if t.name.lower().strip() == target:
                return int(t.id)
        except Exception:
            pass
    return None

async def _create_thread_with_retry(channel: discord.TextChannel, name: str, retries: int = 3) -> int:
    delay = 1.0
    for attempt in range(1, retries + 1):
        try:
            new_thread = await channel.create_thread(name=name, type=discord.ChannelType.public_thread)
            return int(new_thread.id)
        except (aiohttp.ClientConnectionError, aiohttp.ServerDisconnectedError, HTTPException) as e:
            if attempt < retries:
                log.warning(f"create_thread failed ({e}); retry {attempt}/{retries} in {delay:.1f}s")
                await asyncio.sleep(delay)
                delay *= 2
                tid = await _find_thread_by_name(channel, name)
                if tid:
                    return tid
                continue
            raise

THREAD_LOCKS: dict[tuple[str, str], asyncio.Lock] = {}

def _get_thread_lock(sys_upper: str, tg_str: str) -> asyncio.Lock:
    key = (sys_upper, tg_str)
    lock = THREAD_LOCKS.get(key)
    if lock is None:
        lock = asyncio.Lock()
        THREAD_LOCKS[key] = lock
    return lock

def _desired_title(system: str, tgid: str, fallback: bool, mapped: Dict[str, str]) -> str:
    if fallback:
        return f"πŸ›œ {tgid} Fallback Unknown"
    icon = (mapped.get("icon") or "").strip()
    alpha = (mapped.get("alpha") or "").strip()
    icon_sp = (icon + " ") if icon else ""
    tail = f"{tgid} {alpha}".strip() if alpha else tgid
    return f"{icon_sp}{tail}".strip()

async def get_or_create_thread(system: str, talkgroup: str, talkgroup_name: Optional[str]) -> Optional[int]:
    sys_upper = _norm_sys(system)
    tg_str = str(talkgroup)

    lock = _get_thread_lock(sys_upper, tg_str)
    async with lock:
        # 1) Hard override?
        hard_key = f"{sys_upper}:{tg_str}"
        if hard_key in HARD_MAP:
            tid = int(HARD_MAP[hard_key])
            th = await _safe_fetch_channel(tid)
            if th:
                log.debug(f"Using hard-mapped thread for {hard_key} β†’ {tid}")
                return tid
            else:
                log.warning(f"Hard-mapped thread {tid} for {hard_key} not accessible; falling back.")

        # 2) Choose channel (known system or fallback)
        is_known_system = sys_upper in SYSTEM_CHANNEL_MAP
        if not is_known_system:
            if FALLBACK_KEY not in SYSTEM_CHANNEL_MAP:
                log.error(f"No system channel mapping for '{system}' and no FALLBACK provided.")
                return None
            channel_id = SYSTEM_CHANNEL_MAP[FALLBACK_KEY]
            mapped = {}
            desired_title = _desired_title(system, tg_str, True, mapped)
        else:
            channel_id = SYSTEM_CHANNEL_MAP[sys_upper]
            mapped = TG_MAP.get((sys_upper, tg_str), {"alpha": talkgroup_name or ""})
            desired_title = _desired_title(system, tg_str, False, mapped)

        # 3) DB cached thread?
        conn = await db_connect()
        db_tid = None
        try:
            async with conn.execute(
                "SELECT thread_id FROM threads WHERE system=? AND talkgroup=?",
                (system, talkgroup),
            ) as cursor:
                row = await cursor.fetchone()
            if row and row["thread_id"]:
                db_tid = int(row["thread_id"])
        finally:
            await conn.close()

        channel = await _safe_fetch_channel(channel_id)
        if channel is None or not isinstance(channel, discord.TextChannel):
            log.error(f"Could not fetch text channel {channel_id} for system '{system}'")
            return None

        if db_tid:
            thread = await _safe_fetch_channel(db_tid)
            if thread:
                try:
                    if hasattr(thread, "name") and thread.name != desired_title:
                        await thread.edit(name=desired_title)
                except Exception:
                    pass
                return db_tid
            else:
                log.info(f"Thread {db_tid} missing; will search by name.")

        # 4) Search by name (de-dupe)
        existing_tid = await _find_thread_by_name(channel, desired_title)
        if existing_tid:
            conn = await db_connect()
            try:
                await conn.execute(
                    "INSERT OR REPLACE INTO threads(system, talkgroup, channel_id, thread_id, title, created_at_local) "
                    "VALUES(?,?,?,?,?,?)",
                    (system, talkgroup, str(channel_id), str(existing_tid), desired_title,
                     datetime.now(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")),
                )
                await conn.commit()
            finally:
                await conn.close()
            return existing_tid

        # 5) Create new
        tid = await _create_thread_with_retry(channel, desired_title)
        conn = await db_connect()
        try:
            await conn.execute(
                "INSERT OR REPLACE INTO threads(system, talkgroup, channel_id, thread_id, title, created_at_local) "
                "VALUES(?,?,?,?,?,?)",
                (system, talkgroup, str(channel_id), str(tid), desired_title,
                 datetime.now(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")),
            )
            await conn.commit()
        finally:
            await conn.close()

        log.info(f"Created thread {desired_title} ({tid}) in channel {channel_id}")
        return tid

# ---------------------------
# Posting
# ---------------------------
def seconds_to_hms(sec: Optional[float]) -> str:
    if not sec:
        return "00:00"
    sec = int(round(sec))
    m, s = divmod(sec, 60)
    h, m = divmod(m, 60)
    return f"{h:02d}:{m:02d}:{s:02d}" if h else f"{m:02d}:{s:02d}"

async def post_clip(meta: Dict, audio_path: Path, transcription: str) -> Optional[int]:
    system = meta.get("system")
    talkgroup = meta.get("talkgroup")
    talkgroup_name = meta.get("talkgroup_name")

    thread_id = await get_or_create_thread(system, talkgroup, talkgroup_name)
    if not thread_id:
        return None
    thread = await _safe_fetch_channel(thread_id)
    if not thread:
        return None

    sys_upper = _norm_sys(system)
    tg_key = (sys_upper, str(talkgroup))
    mapped = TG_MAP.get(tg_key, {})
    icon = mapped.get("icon", "")
    alpha = mapped.get("alpha", talkgroup_name or "")
    icon_sp = (icon + " ") if icon else ""
    title = f"{icon_sp}{talkgroup} {alpha}".strip()

    color = _hex_to_int_color(mapped.get("color")) if mapped.get("color") else None
    if color is None and sys_upper not in SYSTEM_CHANNEL_MAP and FALLBACK_EMBED_COLOR:
        color = _hex_to_int_color(FALLBACK_EMBED_COLOR)
    color = color or 0x3BA55D

    embed = Embed(title=title, description="Recorded Transmission", color=color)
    ts_local = meta.get("recorded_local") or "Unknown"
    embed.add_field(name="Transcription", value=(transcription[:1024] or "β€”"), inline=False)
    embed.add_field(name="System", value=system or "Unknown", inline=True)
    embed.add_field(name="Site", value=f"{meta.get('site') or ''} {meta.get('site_name') or ''}".strip() or "?", inline=True)
    embed.add_field(name="Talkgroup", value=meta.get("talkgroup") or "?", inline=True)
    embed.add_field(name="Source", value=meta.get("source") or "?", inline=True)
    embed.add_field(name="Timestamp", value=f"{ts_local}", inline=True)

    md = []
    if meta.get("duration"):
        md.append(f"Length: {seconds_to_hms(meta['duration'])}")
    if meta.get("decoder"):
        md.append(f"Decoder: {meta['decoder']}")
    if meta.get("frequency"):
        md.append(f"Freq: {meta['frequency']}")
    if md:
        embed.add_field(name="Metadata", value="\n".join(md)[:1024], inline=False)

    file = DiscordFile(str(audio_path), filename=audio_path.name)
    log.info(f"DISCORD: posting {audio_path.name} to thread {thread_id} …")
    msg = await thread.send(file=file, embed=embed)
    log.info(f"DISCORD: posted {audio_path.name} successfully (msg_id={msg.id}).")
    return int(msg.id)

# ---------------------------
# Processing
# ---------------------------
async def is_file_handled(path: Path) -> bool:
    """Treat as handled ONLY if status='processed' AND we have a discord_message_id."""
    conn = await db_connect()
    try:
        async with conn.execute(
            "SELECT status, discord_message_id FROM files WHERE path=?",
            (str(path),),
        ) as cursor:
            row = await cursor.fetchone()
        if not row:
            return False
        return (row["status"] == "processed") and (row["discord_message_id"] not in (None, ""))
    finally:
        await conn.close()

async def record_processed(path: Path, meta: Dict, transcription: str, message_id: Optional[int], thread_id: Optional[int]):
    stat = path.stat()
    conn = await db_connect()
    try:
        await conn.execute(
            "INSERT OR REPLACE INTO files(path,size,mtime,status,system,site,site_name,talkgroup,talkgroup_name,source,recorded_local,duration,transcription,discord_message_id,discord_thread_id,processed_at_local) "
            "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
            (
                str(path),
                stat.st_size,
                stat.st_mtime,
                "processed",
                meta.get("system"),
                meta.get("site"),
                meta.get("site_name"),
                meta.get("talkgroup"),
                meta.get("talkgroup_name"),
                meta.get("source"),
                (meta.get("recorded_local").strftime("%Y-%m-%d %H:%M:%S") if isinstance(meta.get("recorded_local"), datetime) else meta.get("recorded_local")),
                meta.get("duration"),
                transcription,
                str(message_id) if message_id else None,
                str(thread_id) if thread_id else None,
                datetime.now(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S"),
            ),
        )
        await conn.commit()
    finally:
        await conn.close()

async def record_failed(path: Path, error_message: str):
    stat = None
    try:
        stat = path.stat()
    except FileNotFoundError:
        pass
    conn = await db_connect()
    try:
        await conn.execute(
            "INSERT OR REPLACE INTO files(path,size,mtime,status,processing_error,processed_at_local) "
            "VALUES(?,?,?,?,?,?)",
            (
                str(path),
                stat.st_size if stat else 0,
                stat.st_mtime if stat else 0,
                "failed",
                error_message[:1024],
                datetime.now(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S"),
            ),
        )
        await conn.commit()
    finally:
        await conn.close()

async def process_one_prepare(path: Path):
    """
    Do all heavy work except posting. Enqueue the result for the ordered poster.
    """
    try:
        if not path.exists():
            return

        # Skip if DB says already processed with a message id
        if await is_file_handled(path):
            log.debug(f"process_one_prepare: {path.name} already handled.")
            return

        # Extract metadata
        meta_id3 = parse_id3(path)
        meta_fn  = parse_filename(path)
        meta     = merge_meta(meta_id3, meta_fn)

        if not meta.get("system") or not meta.get("talkgroup"):
            msg = "Missing key metadata (system or talkgroup)"
            log.warning(f"{msg}: {path.name}")
            await record_failed(path, msg)
            return

        # Talkgroup mapping polish
        sys_upper = _norm_sys(meta.get("system"))
        tg_key = (sys_upper, str(meta.get("talkgroup")))
        mapped = TG_MAP.get(tg_key)
        if mapped:
            if mapped.get("alpha"): meta["talkgroup_name"] = mapped["alpha"]
            if mapped.get("icon"):  meta["icon"] = mapped["icon"]
            if mapped.get("color"):
                col = _hex_to_int_color(mapped["color"])
                if col is not None:
                    meta["embed_color"] = col

        # Transcribe
        transcription = await transcribe_async(path)

        # Timestamp for ordering
        ts = recorded_datetime_for_path(path)

        # Hand off to ordered poster
        await RESULT_QUEUE.put((ts, path.stat().st_mtime, path.name, path, meta, transcription))
        log.info(f"PREPARED: {path.name} β†’ queued for ordered posting (ts={ts}).")

    except Exception as e:
        log.exception(f"Failed preparing {path.name}: {e}")
        await record_failed(path, str(e))

async def process_one(path: Path):
    t0 = time.perf_counter()
    try:
        if not path.exists():
            return

        if await is_file_handled(path):
            log.debug(f"process_one: {path.name} already handled (DB says processed).")
            return

        # Extract metadata
        meta_id3 = parse_id3(path)
        meta_fn = parse_filename(path)
        meta = merge_meta(meta_id3, meta_fn)
        if not meta.get("system") or not meta.get("talkgroup"):
            log.warning(f"Missing key metadata, skipping: {path.name}")
            await record_failed(path, "Missing key metadata (system or talkgroup)")
            return

        # Enrich from mapping
        sys_upper = _norm_sys(meta.get("system"))
        tg_key = (sys_upper, str(meta.get("talkgroup")))
        mapped = TG_MAP.get(tg_key)
        if mapped:
            if mapped.get("alpha"): meta["talkgroup_name"] = mapped["alpha"]
            if mapped.get("icon"): meta["icon"] = mapped["icon"]
            if mapped.get("color"):
                col = _hex_to_int_color(mapped["color"])
                if col is not None: meta["embed_color"] = col

        # Transcription (blocking offloaded)
        asr_start = time.perf_counter()
        transcription = await transcribe_async(path)
        asr_dur = time.perf_counter() - asr_start

        # Post to Discord
        post_start = time.perf_counter()
        msg_id = await post_clip(meta, path, transcription)
        post_dur = time.perf_counter() - post_start

        # Find thread id for DB
        thread_id = None
        conn = await db_connect()
        try:
            async with conn.execute(
                "SELECT thread_id FROM threads WHERE system=? AND talkgroup=?",
                (meta.get("system"), meta.get("talkgroup")),
            ) as cursor:
                row = await cursor.fetchone()
            if row and row["thread_id"]:
                thread_id = int(row["thread_id"])
        finally:
            await conn.close()

        # Record success
        db_start = time.perf_counter()
        await record_processed(path, meta, transcription, msg_id, thread_id)
        db_dur = time.perf_counter() - db_start

        total = time.perf_counter() - t0
        if LOG_TIMING:
            log.info(
                f"Timing {path.name} | asr={asr_dur:.2f}s upload={post_dur:.2f}s db={db_dur:.2f}s total={total:.2f}s"
            )
        else:
            log.info(f"Processed β†’ {path.name} (msg_id={msg_id})")

    except Exception as e:
        log.exception(f"Failed processing {path.name}: {e}")
        await record_failed(path, str(e))

async def add_to_queue(path: Path):
    """
    Safely adds a file to the processing queue after checking stability.
    """
    p_str = str(path)
    if p_str in PROCESSING_FILES:
        log.debug(f"ADD_TO_QUEUE: {path.name} already in PROCESSING_FILES.")
        return

    if await is_file_handled(path):
        log.debug(f"ADD_TO_QUEUE: {path.name} already handled in DB.")
        return

    # ---> NEW: mark as in-flight with its chronological timestamp
    ts = recorded_datetime_for_path(path)
    PROCESSING_FILES.add(p_str)
    PROCESSING_TS[p_str] = ts
    log.info(f"ADD_TO_QUEUE: Adding {path.name} to PROCESSING_FILES and starting stability checks...")

    try:
        # Stability: wait until size stops changing across two checks
        try:
            last = -1
            while True:
                cur = path.stat().st_size
                if cur == last and cur > 0:
                    break
                last = cur
                await asyncio.sleep(MIN_FILE_AGE_SEC)
            log.info(f"ADD_TO_QUEUE: Stability check PASSED for {path.name}.")
        except FileNotFoundError:
            log.warning(f"File {path.name} disappeared while waiting for it to stabilize.")
            PROCESSING_FILES.discard(p_str)
            PROCESSING_TS.pop(p_str, None)   # <-- NEW: clear in-flight
            return

        # Use the same ts we computed above (no need to recompute)
        await FILE_QUEUE.put((ts, path.name, path))
        log.info(f"ADD_TO_QUEUE: Queued {path.name} (ts={ts}).")

    except Exception as e:
        log.exception(f"Failed to queue {path.name}: {e}")
        PROCESSING_FILES.discard(p_str)
        PROCESSING_TS.pop(p_str, None)

async def file_processing_worker():
    log.info(f"File processing worker started (concurrency={BACKLOG_CONCURRENCY}).")
    while True:
        ts, name, path = await FILE_QUEUE.get()
        p_str = str(path)
        try:
            log.info(f"WORKER: Got {name} (ts: {ts}) from queue.")
            # If you have a patience/holding buffer here, keep it as-is
            await process_one_prepare(path)
        except Exception as e:
            log.exception(f"WORKER error on {name}: {e}")
        finally:
            FILE_QUEUE.task_done()
            # ---> NEW: clear in-flight once preparation is done (success or fail)
            PROCESSING_FILES.discard(p_str)
            PROCESSING_TS.pop(p_str, None)

class MP3Handler(FileSystemEventHandler):
    def on_created(self, event):
        try:
            if event.is_directory:
                return
            p = Path(event.src_path)
            name = p.name.lower()
            is_mp3 = name.endswith(".mp3")
            glob_ok = True
            try:
                glob_ok = fnmatch(name, Path(SCAN_GLOB).name.lower()) or fnmatch(str(p).lower(), SCAN_GLOB.lower())
            except Exception:
                pass
            logging.getLogger("uploader").debug(f"watchdog: created -> {p} | is_mp3={is_mp3} glob_ok={glob_ok}")
            if is_mp3 and glob_ok:
                asyncio.run_coroutine_threadsafe(add_to_queue(p), client.loop)
        except Exception as e:
            logging.getLogger("uploader").exception(f"watchdog handler error for {event.src_path}: {e}")

async def poster_worker():
    """
    Single-threaded poster: always posts the earliest timestamp next.
    Guarantees chronological posting across all clips.
    """
    log.info("Poster worker started (ordered posting enabled).")
    while True:
        try:
            # Pull the next prepared item (already transcribed & ready to post)
            ts, _mtime, name, path, meta, transcription = await RESULT_QUEUE.get()
            try:
                # If something earlier is still being prepared, wait up to QUEUE_PATIENCE_SEC
                waited = 0.0
                while True:
                    mints = min_processing_ts()  # earliest ts of any item still stabilizing/transcribing
                    if (mints is None) or (mints >= ts):
                        break
                    await asyncio.sleep(0.5)
                    waited += 0.5
                    if waited >= QUEUE_PATIENCE_SEC:
                        log.warning(
                            f"Poster waited {QUEUE_PATIENCE_SEC}s for earlier items; proceeding with {name}"
                        )
                        break

                # Post to Discord (now safe to maintain order)
                msg_id = await post_clip(meta, path, transcription)

                # Resolve thread_id for DB bookkeeping
                thread_id = None
                conn = await db_connect()
                try:
                    async with conn.execute(
                        "SELECT thread_id FROM threads WHERE system=? AND talkgroup=?",
                        (meta.get('system'), meta.get('talkgroup')),
                    ) as cur:
                        row = await cur.fetchone()
                    if row and row["thread_id"]:
                        try:
                            thread_id = int(row["thread_id"])
                        except Exception:
                            thread_id = None
                finally:
                    await conn.close()

                await record_processed(path, meta, transcription, msg_id, thread_id)
                log.info(f"POSTED (ordered) β†’ {name} (msg_id={msg_id})")

            except Exception as e:
                log.exception(f"Poster error for {name}: {e}")
                await record_failed(path, f"poster error: {e}")
            finally:
                RESULT_QUEUE.task_done()

        except Exception as outer:
            log.error(f"Poster loop error: {outer}", exc_info=True)
            await asyncio.sleep(1.0)

def start_watcher():
    log.info(f"Starting watchdog observer on: {AUDIO_ROOT}")
    obs = Observer()
    handler = MP3Handler()
    obs.schedule(handler, str(AUDIO_ROOT), recursive=True)
    obs.start()
    log.info("Watchdog observer started.")
    return obs

async def poll_for_new_files():
    await asyncio.sleep(POLL_INTERVAL_SEC)
    while True:
        try:
            log.info("Safety poller starting a full scan...")
            for p in AUDIO_ROOT.rglob(SCAN_GLOB):
                if p.suffix.lower() == ".mp3":
                    await add_to_queue(p)
            log.info("Safety poller scan complete.")
        except Exception as e:
            log.exception(f"poller error: {e}")
        await asyncio.sleep(POLL_INTERVAL_SEC)

# ---------------------------
# Client events
# ---------------------------
@client.event
async def on_ready():
    log.info(f"Logged in as {client.user} (id={client.user.id})")
    log.info("Initializing and migrating database...")
    await db_init_and_migrate()
    log.info("Database migration complete.")

    for i in range(BACKLOG_CONCURRENCY):
        asyncio.create_task(file_processing_worker())

    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, start_watcher)

    asyncio.create_task(poll_for_new_files())

    asyncio.create_task(poster_worker())

    if ENABLE_CLEANUP:
        asyncio.create_task(cleanup_old_files())
    if ENABLE_DB_CLEANUP:
        asyncio.create_task(cleanup_old_db_records())

    log.info("All services started. Monitoring for new files...")

# ---------------------------
# Cleanup tasks (optional; stubs to keep parity with your env flags)
# ---------------------------
async def cleanup_old_files():
    while True:
        try:
            cutoff = time.time() - (DELETE_FILES_OLDER_THAN_MINUTES * 60)
            deleted = 0
            for p in AUDIO_ROOT.rglob(SCAN_GLOB):
                try:
                    if p.stat().st_mtime < cutoff:
                        p.unlink(missing_ok=True)
                        deleted += 1
                except Exception:
                    pass
            if deleted:
                log.info(f"cleanup_old_files: deleted {deleted} old files")
        except Exception as e:
            log.warning(f"cleanup_old_files error: {e}")
        await asyncio.sleep(CLEANUP_INTERVAL_MINUTES * 60)

async def cleanup_old_db_records():
    while True:
        try:
            cutoff = datetime.now(LOCAL_TZ) - timedelta(days=DB_RECORD_RETENTION_DAYS)
            conn = await db_connect()
            try:
                await conn.execute(
                    "DELETE FROM files WHERE processed_at_local < ?",
                    (cutoff.strftime("%Y-%m-%d %H:%M:%S"),),
                )
                await conn.commit()
            finally:
                await conn.close()
        except Exception as e:
            log.warning(f"cleanup_old_db_records error: {e}")
        await asyncio.sleep(DB_CLEANUP_INTERVAL_HOURS * 3600)

# ---------------------------
# Sanity + main
# ---------------------------
def sanity_check():
    ok = True
    if not BOT_TOKEN:
        log.error("DISCORD_BOT_TOKEN missing in .env")
        ok = False
    if not AUDIO_ROOT.exists():
        log.error(f"AUDIO_ROOT not found: {AUDIO_ROOT}")
        ok = False
    if not SYSTEM_CHANNEL_MAP:
        log.error(
            "SYSTEM_CHANNEL_MAP is empty. Check that .env is loading correctly "
            f"and that {os.getenv('SYSTEM_CHANNEL_MAP_PATH')} exists and is not empty."
        )
        ok = False
    return ok

def main():
    if not sanity_check():
        sys.exit(2)
    client.run(BOT_TOKEN, log_handler=None)

if __name__ == "__main__":
    main()