Weather Scripts
In typical fashion, I've got a timer and trigger script for weather alerts. Since this script is used primarily in Mesa, Arizona, its tailored for specific areas. Adjust to suit your needs as you see fit.
Timer Script - mm_wx_alerts.py - Every 15 Minutes
This script is the default running one, set to run every 15 minutes via the CRON scheduler in MM. It looks for and returns any NWS active alerts for specific things, with some exclusions, and is based on alert zones from the NWS. You can adjust the zones by swapping out the zone codes, and what phrases to ignore in the alerts produced by NWS.
#!/usr/bin/env python3
# mm_meta:
# name: WX Alerts (Scheduled)
# emoji: π¨
# language: Python
"""
Polls NWS active alerts for configured zones and broadcasts any NEW alerts
to the mesh. Uses fingerprint-based deduplication so each unique alert is
only sent once, even across multiple poll cycles. Silently skips low-value
alert types (air quality, outlooks, etc.) to reduce RF noise.
Scheduled use: Run every 10-15 minutes via MM Timer trigger.
Trigger use: See mm_wx_alerts_trigger.py for on-demand version.
Env vars:
USER_AGENT HTTP User-Agent string (default: SKYNET2 identifier)
ALERT_ZONES Comma-separated NWS UGC zone codes (default: Mesa/East Valley)
"""
import os, json, re, hashlib, sys, time, socket
import urllib.request
from pathlib import Path
from datetime import datetime
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT = os.getenv("USER_AGENT", "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
MAX_LEN = 200
ACCEPT = "application/geo+json"
DEFAULT_ZONES = "AZC013,AZZ537,AZZ538,AZZ540,AZZ541,AZZ542,AZZ543,AZZ544,AZZ545,AZZ546,AZZ547,AZZ548,AZZ549,AZZ550,AZZ551,AZZ554,AZZ555"
ZONES = os.getenv("ALERT_ZONES", DEFAULT_ZONES)
# Event types to silently skip β not actionable alerts for mesh users
IGNORE_PHRASES = [
"air quality", # Air Quality Alert
"hydrologic outlook", # Long-range river forecast, not an alert
"hazardous weather outlook", # Generic planning text, not an alert
"child abduction", # AMBER alerts via NWS API are very spammy
]
STATE_DIR = Path("/data/scripts/state")
STATE_FILE = STATE_DIR / "wx_active_fingerprints.json"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
"""Collapse whitespace and hard-truncate to MAX_LEN characters."""
s = re.sub(r"\s+", " ", (s or "").strip())
return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."
def get_json(url: str, timeout: int = 15, retries: int = 2) -> dict:
"""Fetch JSON from url with retry/backoff. Raises on final failure."""
for attempt in range(retries + 1):
try:
req = urllib.request.Request(
url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
except Exception:
if attempt < retries:
time.sleep(2)
continue
raise
return {}
def load_state() -> dict:
"""Load the active fingerprint dict from disk. Returns {} on any error."""
try:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text("utf-8"))
except Exception:
pass
return {}
def save_state(state: dict):
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state), encoding="utf-8")
def severity_tag(event: str) -> str:
"""Return an emoji tag reflecting the severity/type of an NWS event string."""
e = (event or "").lower()
# Level 1 β Arizona critical (immediate action required)
if "dust storm" in e: return "π«οΈβ" # Pull Aside, Stay Alive
if "flash flood" in e: return "ππ¨" # Turn Around, Don't Drown
if "tornado" in e: return "πͺοΈπ¨" # Take Cover Now
# Level 2 β AZ hazards (preparation needed)
if "red flag" in e or "fire" in e: return "π©"
if "heat" in e: return "π‘οΈ"
if "severe thunderstorm" in e: return "βοΈ"
if "freeze" in e or "frost" in e or "winter" in e: return "βοΈ"
# Level 3 β Generic NWS severity tiers
if "warning" in e: return "π΄"
if "watch" in e: return "π "
if "advisory" in e: return "π‘"
return "βΉοΈ"
def format_time(iso_ts: str) -> str:
"""Convert an ISO 8601 timestamp to a short local HH:MM string."""
if not iso_ts:
return "??"
try:
dt = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")).astimezone()
return dt.strftime("%H:%M")
except Exception:
return "??"
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
try:
active = load_state()
# Prune fingerprints for alerts that have already expired
now_iso = datetime.now().astimezone().isoformat()
active = {k: v for k, v in active.items() if v > now_iso}
url = "https://api.weather.gov/alerts/active?zone=" + urllib.request.quote(ZONES, safe=",")
data = get_json(url)
features = data.get("features") or []
new_msgs = []
for a in features:
p = a.get("properties") or {}
event = p.get("event") or "Alert"
# Skip low-value / spammy alert types
if any(phrase in event.lower() for phrase in IGNORE_PHRASES):
continue
# Build a stable fingerprint: event name + sorted zone list
zones_str = ",".join(sorted(p.get("geocode", {}).get("UGC", [])))
sig_hash = hashlib.md5(f"{event}|{zones_str}".encode()).hexdigest()
expires = p.get("expires") or ""
if sig_hash in active:
# Already announced β update expiry if the alert was extended
if expires > active[sig_hash]:
active[sig_hash] = expires
continue
# New alert β format and queue
tag = severity_tag(event)
msg = f"{tag} {event} until {format_time(expires)}"
new_msgs.append(clamp(msg))
if expires:
active[sig_hash] = expires
save_state(active)
if not new_msgs:
print(json.dumps({"response": ""}))
return
# Cap at 3 messages per cycle to avoid flooding the mesh
print(json.dumps({"responses": new_msgs[:3]}, ensure_ascii=False))
except Exception as e:
print(f"[wx_alerts] poll failed: {e}", file=sys.stderr)
print(json.dumps({"response": ""}))
if __name__ == "__main__":
main()
Trigger - mm_wx_alerts_trigger.py - On Demand
This is the on-demand script, which replies to key words found in OTA messages on the mesh. It will return the active alerts based on the same info. I have it respond in the weather channel instead of DMs so others can see the info.
#!/usr/bin/env python3
# mm_meta:
# name: WX Alerts (On-Demand)
# emoji: π¨
# language: Python
"""
Returns current NWS active alerts for configured zones on demand.
Unlike the scheduled version, this always responds β even if quiet β
so the user gets explicit confirmation either way.
Trigger use: Assign to a keyword trigger (e.g. "!alerts") in MM.
Env vars:
USER_AGENT HTTP User-Agent string (default: SKYNET2 identifier)
ALERT_ZONES Comma-separated NWS UGC zone codes (default: Mesa/East Valley)
"""
import os, json, re, sys, time, socket
import urllib.request
from datetime import datetime
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT = os.getenv("USER_AGENT", "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
MAX_LEN = 200
ACCEPT = "application/geo+json"
DEFAULT_ZONES = "AZC013,AZZ537,AZZ538,AZZ540,AZZ541,AZZ542,AZZ543,AZZ544,AZZ545,AZZ546,AZZ547,AZZ548,AZZ549,AZZ550,AZZ551,AZZ554,AZZ555"
ZONES = os.getenv("ALERT_ZONES", DEFAULT_ZONES)
IGNORE_PHRASES = [
"air quality",
"hydrologic outlook",
"hazardous weather outlook",
"child abduction",
]
# ---------------------------------------------------------------------------
# Helpers (kept identical to mm_wx_alerts.py)
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
"""Collapse whitespace and hard-truncate to MAX_LEN characters."""
s = re.sub(r"\s+", " ", (s or "").strip())
return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."
def get_json(url: str, timeout: int = 15, retries: int = 2) -> dict:
"""Fetch JSON from url with retry/backoff. Raises on final failure."""
for attempt in range(retries + 1):
try:
req = urllib.request.Request(
url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
except Exception:
if attempt < retries:
time.sleep(2)
continue
raise
return {}
def severity_tag(event: str) -> str:
"""Return an emoji tag reflecting the severity/type of an NWS event string."""
e = (event or "").lower()
if "dust storm" in e: return "π«οΈβ"
if "flash flood" in e: return "ππ¨"
if "tornado" in e: return "πͺοΈπ¨"
if "red flag" in e or "fire" in e: return "π©"
if "heat" in e: return "π‘οΈ"
if "severe thunderstorm" in e: return "βοΈ"
if "freeze" in e or "frost" in e or "winter" in e: return "βοΈ"
if "warning" in e: return "π΄"
if "watch" in e: return "π "
if "advisory" in e: return "π‘"
return "βΉοΈ"
def format_time(iso_ts: str) -> str:
"""Convert an ISO 8601 timestamp to a short local HH:MM string."""
if not iso_ts:
return "??"
try:
dt = datetime.fromisoformat(iso_ts.replace("Z", "+00:00")).astimezone()
return dt.strftime("%H:%M")
except Exception:
return "??"
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
try:
url = "https://api.weather.gov/alerts/active?zone=" + urllib.request.quote(ZONES, safe=",")
data = get_json(url)
features = data.get("features") or []
msgs = []
for a in features:
p = a.get("properties") or {}
event = p.get("event") or "Alert"
if any(phrase in event.lower() for phrase in IGNORE_PHRASES):
continue
tag = severity_tag(event)
expires = p.get("expires") or ""
msg = f"{tag} {event} until {format_time(expires)}"
msgs.append(clamp(msg))
if not msgs:
# Always give the user an explicit "all clear" on demand
print(json.dumps({"response": "β
No active weather alerts for Mesa."}))
return
print(json.dumps({"responses": msgs[:3]}, ensure_ascii=False))
except Exception as e:
print(f"[wx_alerts_trigger] failed: {e}", file=sys.stderr)
print(json.dumps({"response": "β οΈ Error fetching alerts."}))
if __name__ == "__main__":
main()
Timer Script - mm_outlook.py - 15JUN through 30SEP
This is a script that is turned on manually between 15JUN and 30SEP each year, which is Arizona's monsoon season. This script produces the Hazardous Weather Outlook (HWO) to send OTA.Β
#!/usr/bin/env python3
# mm_meta:
# name: WX Hazardous Outlook (Scheduled)
# emoji: β οΈ
# language: Python
"""
Fetches the NWS Hazardous Weather Outlook (HWO) for the configured zone
and broadcasts a condensed summary to the mesh once daily. The HWO is the
"what to watch for this week" product β it's worth daily broadcasting during
monsoon season (roughly Jun 15 β Sep 30) when convective threats develop fast.
During other seasons, use mm_wx_outlook_trigger.py for on-demand access
and disable this timer in MM to avoid RF noise from low-threat outlooks.
Scheduled use: Enable MM Timer trigger daily during monsoon season.
On-demand use: See mm_wx_outlook_trigger.py.
Env vars:
USER_AGENT HTTP User-Agent string (default: SKYNET2 identifier)
OUTLOOK_ZONE NWS CWA (office) zone code (default: AZ β Phoenix/Tucson)
"""
import os, json, re, sys, time
import urllib.request
from pathlib import Path
from datetime import datetime
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USER_AGENT = os.getenv("USER_AGENT", "SKYNET2-MeshMonitor-WX/1.0 (contact: [email protected])")
OUTLOOK_ZONE = os.getenv("OUTLOOK_ZONE", "AZ") # State abbreviation for NWS HWO product
MAX_LEN = 200
ACCEPT = "application/geo+json"
STATE_DIR = Path("/data/scripts/state")
STATE_FILE = STATE_DIR / "wx_outlook_last.json"
# The NWS HWO product code is "HWO" β filter to our state's issuing office
# Phoenix NWS (PSR) and Tucson NWS (TWC) both issue HWOs for AZ zones.
HWO_URL = "https://api.weather.gov/products/types/HWO/locations/PSR"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clamp(s: str) -> str:
"""Strip blank lines, collapse inline whitespace, truncate to MAX_LEN."""
lines = [re.sub(r"[ \t]+", " ", (ln or "").strip()) for ln in (s or "").splitlines()]
s = "\n".join(ln for ln in lines if ln).strip()
return s if len(s) <= MAX_LEN else s[:MAX_LEN - 3] + "..."
def get_json(url: str, timeout: int = 20, retries: int = 2) -> dict:
"""Fetch JSON from url with retry/backoff. Raises on final failure."""
for attempt in range(retries + 1):
try:
req = urllib.request.Request(
url, headers={"User-Agent": USER_AGENT, "Accept": ACCEPT}
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
except Exception:
if attempt < retries:
time.sleep(2)
continue
raise
return {}
def load_last_id() -> str | None:
try:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text("utf-8")).get("id")
except Exception:
pass
return None
def save_last_id(product_id: str):
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(
json.dumps({"id": product_id, "saved": datetime.now().isoformat()}),
encoding="utf-8",
)
def extract_day1_summary(text: str) -> str:
"""
Pull the Day 1 (today's) threat summary from the HWO product text.
The HWO has a fixed structure: .DAY ONE... followed by a paragraph.
Falls back to the first non-header paragraph if structure is unusual.
"""
if not text:
return ""
# Normalize line endings
text = re.sub(r"\r\n?", "\n", text)
# Try to extract .DAY ONE. section
m = re.search(r"\.DAY ONE[^.]*\.\s*\n+(.*?)(?=\n\.[A-Z]|\Z)", text, re.S | re.I)
if m:
raw = m.group(1).strip()
# Collapse internal whitespace/newlines to a single space
return re.sub(r"\s+", " ", raw)
# Fallback: grab first meaningful paragraph (skip header lines starting with &&, $$ etc.)
for para in text.split("\n\n"):
clean = re.sub(r"\s+", " ", para.strip())
if len(clean) > 40 and not clean.startswith(("&&", "$$", "HAZARDOUS", "National")):
return clean
return ""
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
try:
listing = get_json(HWO_URL)
products = listing.get("@graph") or []
if not products:
print(json.dumps({"response": ""}))
return
# Most recent HWO is first in the list
latest = products[0]
prod_id = latest.get("id") or latest.get("@id") or ""
prod_url = latest.get("@id") or ""
# Deduplicate β only broadcast when a new HWO has been issued
if prod_id and prod_id == load_last_id():
print(json.dumps({"response": ""}))
return
# Fetch the full product text
if not prod_url:
print(json.dumps({"response": ""}))
return
product = get_json(prod_url)
raw_text = product.get("productText") or ""
summary = extract_day1_summary(raw_text)
if not summary or len(summary) < 20:
# NWS issued an HWO with no significant Day 1 content β skip broadcast
if prod_id:
save_last_id(prod_id)
print(json.dumps({"response": ""}))
return
msg = clamp(f"β οΈ HWO Day 1\n{summary}")
if prod_id:
save_last_id(prod_id)
print(json.dumps({"response": msg}, ensure_ascii=False))
except Exception as e:
print(f"[wx_outlook] failed: {e}", file=sys.stderr)
print(json.dumps({"response": ""}))
if __name__ == "__main__":
main()
Β