Skip to main content

Weather Scripts

In typical fashion, I've got a timer and trigger script for weather alerts. Since this script is used primarily in Mesa, Arizona, its tailored for specific areas. Adjust to suit your needs as you see fit.

Timer Script - mm_wx_alerts.py - Every 15 Minutes

This script is the default running one, set to run every 15 minutes via the CRON scheduler in MM. It looks for and returns any NWS active alerts for specific things, with some exclusions, and is based on alert zones from the NWS. You can adjust the zones by swapping out the zone codes, and what phrases to ignore in the alerts produced by NWS.

#!/usr/bin/env python3
# mm_meta:
#   name: WX Alerts (Scheduled)
#   emoji: 🚨
#   language: Python
"""
Polls NWS active alerts for configured zones and broadcasts any NEW alerts
to the mesh. Uses fingerprint-based deduplication so each unique alert is
only sent once, even across multiple poll cycles. Silently skips low-value
alert types (air quality, outlooks, etc.) to reduce RF noise.

Scheduled use: Run every 10-15 minutes via MM Timer trigger.
Trigger use:   See mm_wx_alerts_trigger.py for on-demand version.

Env vars:
  USER_AGENT   HTTP User-Agent string (default: SKYNET2 identifier)
  ALERT_ZONES  Comma-separated NWS UGC zone codes (default: Mesa/East Valley)
"""
import os, json, re, hashlib, sys, time, socket
import urllib.request
from pathlib import Path
from datetime import datetime

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT   = os.getenv("USER_AGENT", "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
MAX_LEN      = 200
ACCEPT       = "application/geo+json"

DEFAULT_ZONES = "AZC013,AZZ537,AZZ538,AZZ540,AZZ541,AZZ542,AZZ543,AZZ544,AZZ545,AZZ546,AZZ547,AZZ548,AZZ549,AZZ550,AZZ551,AZZ554,AZZ555"
ZONES = os.getenv("ALERT_ZONES", DEFAULT_ZONES)

# Event types to silently skip β€” not actionable alerts for mesh users
IGNORE_PHRASES = [
    "air quality",               # Air Quality Alert
    "hydrologic outlook",        # Long-range river forecast, not an alert
    "hazardous weather outlook",  # Generic planning text, not an alert
    "child abduction",           # AMBER alerts via NWS API are very spammy
]

STATE_DIR  = Path("/data/scripts/state")
STATE_FILE = STATE_DIR / "wx_active_fingerprints.json"

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
    """Collapse whitespace and hard-truncate to MAX_LEN characters."""
    s = re.sub(r"\s+", " ", (s or "").strip())
    return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."

def get_json(url: str, timeout: int = 15, retries: int = 2) -> dict:
    """Fetch JSON from url with retry/backoff. Raises on final failure."""
    for attempt in range(retries + 1):
        try:
            req = urllib.request.Request(
                url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
            )
            with urllib.request.urlopen(req, timeout=timeout) as r:
                return json.loads(r.read().decode("utf-8"))
        except Exception:
            if attempt < retries:
                time.sleep(2)
                continue
            raise
    return {}

def load_state() -> dict:
    """Load the active fingerprint dict from disk. Returns {} on any error."""
    try:
        if STATE_FILE.exists():
            return json.loads(STATE_FILE.read_text("utf-8"))
    except Exception:
        pass
    return {}

def save_state(state: dict):
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    STATE_FILE.write_text(json.dumps(state), encoding="utf-8")

def severity_tag(event: str) -> str:
    """Return an emoji tag reflecting the severity/type of an NWS event string."""
    e = (event or "").lower()
    # Level 1 β€” Arizona critical (immediate action required)
    if "dust storm"  in e: return "πŸŒ«οΈβ›”"   # Pull Aside, Stay Alive
    if "flash flood" in e: return "🌊🚨"   # Turn Around, Don't Drown
    if "tornado"     in e: return "πŸŒͺ️🚨"   # Take Cover Now
    # Level 2 β€” AZ hazards (preparation needed)
    if "red flag" in e or "fire" in e: return "🚩"
    if "heat"     in e: return "🌑️"
    if "severe thunderstorm" in e: return "β›ˆοΈ"
    if "freeze" in e or "frost" in e or "winter" in e: return "❄️"
    # Level 3 β€” Generic NWS severity tiers
    if "warning"  in e: return "πŸ”΄"
    if "watch"    in e: return "🟠"
    if "advisory" in e: return "🟑"
    return "ℹ️"

def format_time(iso_ts: str) -> str:
    """Convert an ISO 8601 timestamp to a short local HH:MM string."""
    if not iso_ts:
        return "??"
    try:
        dt = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")).astimezone()
        return dt.strftime("%H:%M")
    except Exception:
        return "??"

# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
    try:
        active = load_state()

        # Prune fingerprints for alerts that have already expired
        now_iso = datetime.now().astimezone().isoformat()
        active = {k: v for k, v in active.items() if v > now_iso}

        url  = "https://api.weather.gov/alerts/active?zone=" + urllib.request.quote(ZONES, safe=",")
        data = get_json(url)
        features = data.get("features") or []

        new_msgs = []

        for a in features:
            p     = a.get("properties") or {}
            event = p.get("event") or "Alert"

            # Skip low-value / spammy alert types
            if any(phrase in event.lower() for phrase in IGNORE_PHRASES):
                continue

            # Build a stable fingerprint: event name + sorted zone list
            zones_str = ",".join(sorted(p.get("geocode", {}).get("UGC", [])))
            sig_hash  = hashlib.md5(f"{event}|{zones_str}".encode()).hexdigest()
            expires   = p.get("expires") or ""

            if sig_hash in active:
                # Already announced β€” update expiry if the alert was extended
                if expires > active[sig_hash]:
                    active[sig_hash] = expires
                continue

            # New alert β€” format and queue
            tag = severity_tag(event)
            msg = f"{tag} {event} until {format_time(expires)}"
            new_msgs.append(clamp(msg))

            if expires:
                active[sig_hash] = expires

        save_state(active)

        if not new_msgs:
            print(json.dumps({"response": ""}))
            return

        # Cap at 3 messages per cycle to avoid flooding the mesh
        print(json.dumps({"responses": new_msgs[:3]}, ensure_ascii=False))

    except Exception as e:
        print(f"[wx_alerts] poll failed: {e}", file=sys.stderr)
        print(json.dumps({"response": ""}))

if __name__ == "__main__":
    main()

Trigger - mm_wx_alerts_trigger.py - On Demand

This is the on-demand script, which replies to key words found in OTA messages on the mesh. It will return the active alerts based on the same info. I have it respond in the weather channel instead of DMs so others can see the info.

#!/usr/bin/env python3
# mm_meta:
#   name: WX Alerts (On-Demand)
#   emoji: 🚨
#   language: Python
"""
Returns current NWS active alerts for configured zones on demand.
Unlike the scheduled version, this always responds β€” even if quiet β€”
so the user gets explicit confirmation either way.

Trigger use: Assign to a keyword trigger (e.g. "!alerts") in MM.

Env vars:
  USER_AGENT   HTTP User-Agent string (default: SKYNET2 identifier)
  ALERT_ZONES  Comma-separated NWS UGC zone codes (default: Mesa/East Valley)
"""
import os, json, re, sys, time, socket
import urllib.request
from datetime import datetime

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT   = os.getenv("USER_AGENT", "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
MAX_LEN      = 200
ACCEPT       = "application/geo+json"

DEFAULT_ZONES = "AZC013,AZZ537,AZZ538,AZZ540,AZZ541,AZZ542,AZZ543,AZZ544,AZZ545,AZZ546,AZZ547,AZZ548,AZZ549,AZZ550,AZZ551,AZZ554,AZZ555"
ZONES = os.getenv("ALERT_ZONES", DEFAULT_ZONES)

IGNORE_PHRASES = [
    "air quality",
    "hydrologic outlook",
    "hazardous weather outlook",
    "child abduction",
]

# ---------------------------------------------------------------------------
# Helpers  (kept identical to mm_wx_alerts.py)
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
    """Collapse whitespace and hard-truncate to MAX_LEN characters."""
    s = re.sub(r"\s+", " ", (s or "").strip())
    return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."

def get_json(url: str, timeout: int = 15, retries: int = 2) -> dict:
    """Fetch JSON from url with retry/backoff. Raises on final failure."""
    for attempt in range(retries + 1):
        try:
            req = urllib.request.Request(
                url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
            )
            with urllib.request.urlopen(req, timeout=timeout) as r:
                return json.loads(r.read().decode("utf-8"))
        except Exception:
            if attempt < retries:
                time.sleep(2)
                continue
            raise
    return {}

def severity_tag(event: str) -> str:
    """Return an emoji tag reflecting the severity/type of an NWS event string."""
    e = (event or "").lower()
    if "dust storm"  in e: return "πŸŒ«οΈβ›”"
    if "flash flood" in e: return "🌊🚨"
    if "tornado"     in e: return "πŸŒͺ️🚨"
    if "red flag" in e or "fire" in e: return "🚩"
    if "heat"     in e: return "🌑️"
    if "severe thunderstorm" in e: return "β›ˆοΈ"
    if "freeze" in e or "frost" in e or "winter" in e: return "❄️"
    if "warning"  in e: return "πŸ”΄"
    if "watch"    in e: return "🟠"
    if "advisory" in e: return "🟑"
    return "ℹ️"

def format_time(iso_ts: str) -> str:
    """Convert an ISO 8601 timestamp to a short local HH:MM string."""
    if not iso_ts:
        return "??"
    try:
        dt = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")).astimezone()
        return dt.strftime("%H:%M")
    except Exception:
        return "??"

# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
    try:
        url  = "https://api.weather.gov/alerts/active?zone=" + urllib.request.quote(ZONES, safe=",")
        data = get_json(url)
        features = data.get("features") or []

        msgs = []
        for a in features:
            p     = a.get("properties") or {}
            event = p.get("event") or "Alert"

            if any(phrase in event.lower() for phrase in IGNORE_PHRASES):
                continue

            tag     = severity_tag(event)
            expires = p.get("expires") or ""
            msg     = f"{tag} {event} until {format_time(expires)}"
            msgs.append(clamp(msg))

        if not msgs:
            # Always give the user an explicit "all clear" on demand
            print(json.dumps({"response": "βœ… No active weather alerts for Mesa."}))
            return

        print(json.dumps({"responses": msgs[:3]}, ensure_ascii=False))

    except Exception as e:
        print(f"[wx_alerts_trigger] failed: {e}", file=sys.stderr)
        print(json.dumps({"response": "⚠️ Error fetching alerts."}))

if __name__ == "__main__":
    main()

Timer Script - mm_outlook.py - 15JUN through 30SEP

This is a script that is turned on manually between 15JUN and 30SEP each year, which is Arizona's monsoon season. This script produces the Hazardous Weather Outlook (HWO) to send OTA.Β 

#!/usr/bin/env python3
# mm_meta:
#   name: WX Hazardous Outlook (Scheduled)
#   emoji: ⚠️
#   language: Python
"""
Fetches the NWS Hazardous Weather Outlook (HWO) for the configured zone
and broadcasts a condensed summary to the mesh once daily. The HWO is the
"what to watch for this week" product β€” it's worth daily broadcasting during
monsoon season (roughly Jun 15 – Sep 30) when convective threats develop fast.

During other seasons, use mm_wx_outlook_trigger.py for on-demand access
and disable this timer in MM to avoid RF noise from low-threat outlooks.

Scheduled use: Enable MM Timer trigger daily during monsoon season.
On-demand use: See mm_wx_outlook_trigger.py.

Env vars:
  USER_AGENT    HTTP User-Agent string (default: SKYNET2 identifier)
  OUTLOOK_ZONE  NWS CWA (office) zone code (default: AZ β€” Phoenix/Tucson)
"""
import os, json, re, sys, time
import urllib.request
from pathlib import Path
from datetime import datetime

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT   = os.getenv("USER_AGENT",    "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
OUTLOOK_ZONE = os.getenv("OUTLOOK_ZONE", "AZ")   # State abbreviation for NWS HWO product

MAX_LEN = 200
ACCEPT  = "application/geo+json"

STATE_DIR  = Path("/data/scripts/state")
STATE_FILE = STATE_DIR / "wx_outlook_last.json"

# The NWS HWO product code is "HWO" β€” filter to our state's issuing office
# Phoenix NWS (PSR) and Tucson NWS (TWC) both issue HWOs for AZ zones.
HWO_URL = "https://api.weather.gov/products/types/HWO/locations/PSR"

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
    """Strip blank lines, collapse inline whitespace, truncate to MAX_LEN."""
    lines = [re.sub(r"[ \t]+", " ", (ln or "").strip()) for ln in (s or "").splitlines()]
    s = "\n".join(ln for ln in lines if ln).strip()
    return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."

def get_json(url: str, timeout: int = 20, retries: int = 2) -> dict:
    """Fetch JSON from url with retry/backoff. Raises on final failure."""
    for attempt in range(retries + 1):
        try:
            req = urllib.request.Request(
                url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
            )
            with urllib.request.urlopen(req, timeout=timeout) as r:
                return json.loads(r.read().decode("utf-8"))
        except Exception:
            if attempt < retries:
                time.sleep(2)
                continue
            raise
    return {}

def load_last_id() -> str | None:
    try:
        if STATE_FILE.exists():
            return json.loads(STATE_FILE.read_text("utf-8")).get("id")
    except Exception:
        pass
    return None

def save_last_id(product_id: str):
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    STATE_FILE.write_text(
        json.dumps({"id": product_id, "saved": datetime.now().isoformat()}),
        encoding="utf-8",
    )

def extract_day1_summary(text: str) -> str:
    """
    Pull the Day 1 (today's) threat summary from the HWO product text.
    The HWO has a fixed structure: .DAY ONE... followed by a paragraph.
    Falls back to the first non-header paragraph if structure is unusual.
    """
    if not text:
        return ""

    # Normalize line endings
    text = re.sub(r"\r\n?", "\n", text)

    # Try to extract .DAY ONE. section
    m = re.search(r"\.DAY ONE[^.]*\.\s*\n+(.*?)(?=\n\.[A-Z]|\Z)", text, re.S | re.I)
    if m:
        raw = m.group(1).strip()
        # Collapse internal whitespace/newlines to a single space
        return re.sub(r"\s+", " ", raw)

    # Fallback: grab first meaningful paragraph (skip header lines starting with &&, $$ etc.)
    for para in text.split("\n\n"):
        clean = re.sub(r"\s+", " ", para.strip())
        if len(clean) > 40 and not clean.startswith(("&&", "$$", "HAZARDOUS", "National")):
            return clean

    return ""

# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
    try:
        listing = get_json(HWO_URL)
        products = listing.get("@graph") or []

        if not products:
            print(json.dumps({"response": ""}))
            return

        # Most recent HWO is first in the list
        latest    = products[0]
        prod_id   = latest.get("id") or latest.get("@id") or ""
        prod_url  = latest.get("@id") or ""

        # Deduplicate β€” only broadcast when a new HWO has been issued
        if prod_id and prod_id == load_last_id():
            print(json.dumps({"response": ""}))
            return

        # Fetch the full product text
        if not prod_url:
            print(json.dumps({"response": ""}))
            return

        product  = get_json(prod_url)
        raw_text = product.get("productText") or ""
        summary  = extract_day1_summary(raw_text)

        if not summary or len(summary) < 20:
            # NWS issued an HWO with no significant Day 1 content β€” skip broadcast
            if prod_id:
                save_last_id(prod_id)
            print(json.dumps({"response": ""}))
            return

        msg = clamp(f"⚠️ HWO Day 1\n{summary}")

        if prod_id:
            save_last_id(prod_id)

        print(json.dumps({"response": msg}, ensure_ascii=False))

    except Exception as e:
        print(f"[wx_outlook] failed: {e}", file=sys.stderr)
        print(json.dumps({"response": ""}))

if __name__ == "__main__":
    main()

Β