Parse DSDPlus File Names and Import into DB
I've got a Windows 11 VM running in my Proxmox server which is where I run my copy of DSDPlus Fast Lane to monitor a local radio system. I've got it set up to record per-call audio files, and move them to an LXC, where I've set up a web interface to view/play the audio it's recorded. Using a PSQL database, I'm housing all the metadata from the files - but the trick was DSD doesnt embed the metadata in the file, it uses the file's name to provide it. As such, I needed something that could pick apart the file name for my database.
It's janky, but this is what I came up with to parse the file name (DSD uses the folder structure for the date, BTW).
#!/usr/bin/env python3
print("Running script from:", __file__)
import os
import re
import psycopg2
from datetime import datetime
AUDIO_DIR = "/var/www/html/data/audio"
DB_CONFIG = {
"host": "localhost",
"port": 5432,
"database": "dbname",
"user": "dbusername",
"password": "dbuserpassword",
}
def parse_filename(filename):
base = filename.replace(".mp3", "")
# Match the whole filename using a pattern to reliably extract all fields
match = re.match(
r"(?P<time>\d{6})_(?P<freq>\d{3})_(?P<system_type>\w+?)_(?P<system_id>\d+)-\d+_DCC(?P<dcc>\d+)_Slot(?P<slot>\d+)_"
r"(?P<call_type>\w+?)_(?P<tg_id>\d+)\[(?P<tg_alias>[^\]]+)\]_(?P<radio_id>\d+)\[(?P<radio_alias>[^\]]+)\]",
base
)
if not match:
raise ValueError(f"Filename does not match expected format: {filename}")
data = match.groupdict()
return {
"time_str": data["time"],
"freq": data["freq"],
"system_id": int(data["system_id"]),
"dcc": data["dcc"],
"slot": data["slot"],
"call_type": data["call_type"],
"talkgroup": int(data["tg_id"]),
"talkgroup_alias": data["tg_alias"].replace("_", " "),
"radio_id": int(data["radio_id"]),
"radio_alias": data["radio_alias"].replace("_", " ")
}
def main():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
for folder in os.listdir(AUDIO_DIR):
folder_path = os.path.join(AUDIO_DIR, folder)
if not os.path.isdir(folder_path) or not folder.isdigit():
continue
try:
date_obj = datetime.strptime(folder, "%Y%m%d").date()
except ValueError:
continue
for file in os.listdir(folder_path):
if not file.endswith(".mp3"):
continue
try:
data = parse_filename(file)
time_obj = datetime.strptime(data["time_str"], "%H%M%S").time()
file_path = os.path.join(folder_path, file)
cur.execute("""
SELECT COUNT(*) FROM audio_calls
WHERE file_name = %s
""", (file,))
if cur.fetchone()[0] > 0:
continue
cur.execute("""
INSERT INTO audio_calls (
file_path, file_name, date, time, system_id, dcc,
slot, call_type, talkgroup, talkgroup_alias, radio_id, radio_alias, frequency
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s
)
""", (
file_path,
file,
date_obj,
time_obj,
data["system_id"],
data["dcc"],
data["slot"],
data["call_type"],
data["talkgroup"],
data["talkgroup_alias"],
data["radio_id"],
data["radio_alias"],
data["freq"]
))
print(f"Inserted: {file}")
except Exception as e:
print(f"Error parsing filename '{file}': {e}")
conn.commit()
cur.close()
conn.close()
if __name__ == "__main__":
main()
Now when there is a new day's folder in the directory, this script will parse the file names, dump them into the database appropriately. I've set this script to run at regular intervals using cron.