Merge pull request #392 from Nezreka/refactor/lift-automation-to-core

Refactor/lift automation to core
pull/394/head
BoulderBadgeDad 4 weeks ago committed by GitHub
commit caf5ee9e98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,7 @@
"""Automation API + progress tracking helpers package.
Lifted from web_server.py /api/automations/* routes and progress
emitters. The action handler registration (`_register_automation_handlers`)
stays in web_server.py because each handler closure is tightly coupled
to other application features.
"""

@ -0,0 +1,367 @@
"""Automation REST API helpers.
CRUD + run + progress + history logic for /api/automations/* routes.
Each function takes the deps it needs (database, automation_engine,
profile_id) so the route layer is left as pure HTTP shuffling.
Out of scope:
- /api/automations/blocks static JSON + one call into signals.py;
stays inline in web_server.py for now.
- /api/test/automation touches scan manager + media clients +
config_manager; stays inline.
"""
from __future__ import annotations
import json
import logging
from typing import Any, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Hydration helpers — convert raw DB rows to API-friendly dicts
# ---------------------------------------------------------------------------
_JSON_FIELDS = ('trigger_config', 'action_config', 'notify_config', 'last_result')
_JSON_DEFAULT_DICT = {'trigger_config', 'action_config', 'notify_config'}
def _hydrate_automation(auto: dict) -> dict:
"""Parse JSON columns and backfill `then_actions` from legacy notify_*."""
for field in _JSON_FIELDS:
try:
auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field]
except (json.JSONDecodeError, TypeError):
auto[field] = {} if field in _JSON_DEFAULT_DICT else None
try:
raw = auto.get('then_actions')
auto['then_actions'] = json.loads(raw or '[]') if isinstance(raw, str) else (raw or [])
except (json.JSONDecodeError, TypeError):
auto['then_actions'] = []
if not auto['then_actions'] and auto.get('notify_type'):
auto['then_actions'] = [{
'type': auto['notify_type'],
'config': auto.get('notify_config', {}),
}]
return auto
# ---------------------------------------------------------------------------
# Signal cycle detection
# ---------------------------------------------------------------------------
def _has_signal_concern(trigger_type: str, then_actions: list[dict]) -> bool:
return trigger_type == 'signal_received' or any(
t.get('type') == 'fire_signal' for t in then_actions
)
def _check_create_cycle(
automation_engine,
database,
profile_id: int,
trigger_type: str,
trigger_config_json: str,
then_actions_json: str,
then_actions: list[dict],
) -> Optional[str]:
"""Return cycle path string if creating this automation would loop, else None."""
if not automation_engine or not _has_signal_concern(trigger_type, then_actions):
return None
all_autos = database.get_automations(profile_id)
test_auto = {
'trigger_type': trigger_type,
'trigger_config': trigger_config_json,
'then_actions': then_actions_json,
'enabled': True,
}
all_autos.append(test_auto)
cycle = automation_engine.detect_signal_cycles(all_autos)
if cycle:
return ''.join(cycle)
return None
def _check_update_cycle(
automation_engine,
database,
automation_id: int,
data: dict,
) -> Optional[str]:
"""Return cycle path string if updating this automation would loop, else None."""
if not automation_engine:
return None
trigger_type = data.get('trigger_type', '')
then_actions = data.get('then_actions', [])
if not _has_signal_concern(trigger_type, then_actions):
return None
all_autos = database.get_automations()
test_autos = []
for a in all_autos:
if a['id'] == automation_id:
merged = dict(a)
if 'trigger_type' in data:
merged['trigger_type'] = data['trigger_type']
if 'trigger_config' in data:
merged['trigger_config'] = json.dumps(data['trigger_config'])
if 'then_actions' in data:
merged['then_actions'] = json.dumps(data['then_actions'])
merged['enabled'] = True
test_autos.append(merged)
else:
test_autos.append(a)
cycle = automation_engine.detect_signal_cycles(test_autos)
if cycle:
return ''.join(cycle)
return None
# ---------------------------------------------------------------------------
# CRUD helpers — return (response_dict, http_status)
# ---------------------------------------------------------------------------
def list_automations(database, profile_id: int) -> list[dict]:
"""All automations for the profile, with JSON columns parsed."""
automations = database.get_automations(profile_id)
return [_hydrate_automation(a) for a in automations]
def get_automation(database, automation_id: int) -> Optional[dict]:
"""One automation, hydrated. Returns None if not found."""
auto = database.get_automation(automation_id)
if not auto:
return None
return _hydrate_automation(auto)
def create_automation(
database,
automation_engine,
profile_id: int,
data: dict,
) -> tuple[dict, int]:
"""Create + schedule an automation. Returns (response_body, http_status)."""
name = (data.get('name') or '').strip()
if not name:
return {'error': 'Name is required'}, 400
trigger_type = data.get('trigger_type', 'schedule')
trigger_config = json.dumps(data.get('trigger_config', {}))
action_type = data.get('action_type', 'process_wishlist')
action_config = json.dumps(data.get('action_config', {}))
then_actions = data.get('then_actions', [])
then_actions_json = json.dumps(then_actions)
if then_actions:
notify_type = then_actions[0].get('type')
notify_config = json.dumps(then_actions[0].get('config', {}))
else:
notify_type = data.get('notify_type') or None
notify_config = json.dumps(data.get('notify_config', {})) if notify_type else '{}'
cycle_path = _check_create_cycle(
automation_engine, database, profile_id,
trigger_type, trigger_config, then_actions_json, then_actions,
)
if cycle_path:
return {'error': f'Signal cycle detected: {cycle_path}. This would cause an infinite loop.'}, 400
group_name = data.get('group_name') or None
auto_id = database.create_automation(
name, trigger_type, trigger_config, action_type, action_config,
profile_id, notify_type, notify_config, then_actions_json, group_name,
)
if auto_id is None:
return {'error': 'Failed to create automation'}, 500
if automation_engine:
automation_engine.schedule_automation(auto_id)
return {'success': True, 'id': auto_id}, 200
def update_automation(
database,
automation_engine,
automation_id: int,
data: dict,
) -> tuple[dict, int]:
"""Update + reschedule an automation. Returns (response_body, http_status)."""
update_fields: dict[str, Any] = {}
if 'name' in data:
update_fields['name'] = data['name'].strip()
if 'trigger_type' in data:
update_fields['trigger_type'] = data['trigger_type']
if 'trigger_config' in data:
update_fields['trigger_config'] = json.dumps(data['trigger_config'])
if 'action_type' in data:
update_fields['action_type'] = data['action_type']
if 'action_config' in data:
update_fields['action_config'] = json.dumps(data['action_config'])
if 'then_actions' in data:
then_actions = data['then_actions']
update_fields['then_actions'] = json.dumps(then_actions)
if then_actions:
update_fields['notify_type'] = then_actions[0].get('type')
update_fields['notify_config'] = json.dumps(then_actions[0].get('config', {}))
else:
update_fields['notify_type'] = None
update_fields['notify_config'] = '{}'
elif 'notify_type' in data:
update_fields['notify_type'] = data['notify_type'] or None
if 'notify_config' in data and 'then_actions' not in data:
update_fields['notify_config'] = json.dumps(data['notify_config'])
if 'group_name' in data:
update_fields['group_name'] = data['group_name'] or None
if not update_fields:
return {'error': 'No fields to update'}, 400
cycle_path = _check_update_cycle(automation_engine, database, automation_id, data)
if cycle_path:
return {'error': f'Signal cycle detected: {cycle_path}. This would cause an infinite loop.'}, 400
success = database.update_automation(automation_id, **update_fields)
if not success:
return {'error': 'Automation not found'}, 404
if automation_engine:
auto = database.get_automation(automation_id)
if auto and auto.get('enabled'):
automation_engine.schedule_automation(automation_id)
else:
automation_engine.cancel_automation(automation_id)
return {'success': True}, 200
def batch_update_group(database, automation_ids: list, group_name: Optional[str]) -> tuple[dict, int]:
"""Move/rename a set of automations into a single group (or ungroup)."""
if not automation_ids or not isinstance(automation_ids, list):
return {'error': 'automation_ids must be a non-empty list'}, 400
try:
automation_ids = [int(aid) for aid in automation_ids]
except (ValueError, TypeError):
return {'error': 'automation_ids must contain integers'}, 400
updated = database.batch_update_group(automation_ids, group_name)
return {'success': True, 'updated': updated}, 200
def bulk_toggle(
database,
automation_engine,
automation_ids: list,
enabled: bool,
) -> tuple[dict, int]:
"""Bulk enable/disable a set of automations + reschedule each affected."""
if not automation_ids or not isinstance(automation_ids, list):
return {'error': 'automation_ids must be a non-empty list'}, 400
try:
automation_ids = [int(aid) for aid in automation_ids]
except (ValueError, TypeError):
return {'error': 'automation_ids must contain integers'}, 400
updated = database.bulk_set_enabled(automation_ids, bool(enabled))
if automation_engine and updated > 0:
for aid in automation_ids:
auto = database.get_automation(aid)
if auto:
if auto.get('enabled'):
automation_engine.schedule_automation(auto)
else:
automation_engine.cancel_automation(aid)
return {'success': True, 'updated': updated}, 200
def delete_automation(database, automation_engine, automation_id: int) -> tuple[dict, int]:
"""Delete an automation. System automations are protected."""
auto = database.get_automation(automation_id)
if auto and auto.get('is_system'):
return {'error': 'System automations cannot be deleted'}, 403
if automation_engine:
automation_engine.cancel_automation(automation_id)
success = database.delete_automation(automation_id)
if not success:
return {'error': 'Automation not found'}, 404
return {'success': True}, 200
def duplicate_automation(
database,
automation_engine,
profile_id: int,
automation_id: int,
) -> tuple[dict, int]:
"""Duplicate an automation. System automations are protected."""
auto = database.get_automation(automation_id)
if not auto:
return {'error': 'Automation not found'}, 404
if auto.get('is_system'):
return {'error': 'System automations cannot be duplicated'}, 403
new_id = database.create_automation(
name=f"{auto['name']} (Copy)",
trigger_type=auto['trigger_type'],
trigger_config=auto.get('trigger_config', '{}'),
action_type=auto['action_type'],
action_config=auto.get('action_config', '{}'),
profile_id=profile_id,
notify_type=auto.get('notify_type'),
notify_config=auto.get('notify_config', '{}'),
then_actions=auto.get('then_actions', '[]'),
group_name=auto.get('group_name'),
)
if new_id is None:
return {'error': 'Failed to duplicate automation'}, 500
if automation_engine:
automation_engine.schedule_automation(new_id)
return {'success': True, 'id': new_id}, 200
def toggle_automation(database, automation_engine, automation_id: int) -> tuple[dict, int]:
"""Toggle an automation's enabled state + reschedule/cancel."""
success = database.toggle_automation(automation_id)
if not success:
return {'error': 'Automation not found'}, 404
if automation_engine:
auto = database.get_automation(automation_id)
if auto and auto.get('enabled'):
automation_engine.schedule_automation(automation_id)
else:
automation_engine.cancel_automation(automation_id)
return {'success': True}, 200
def run_automation(automation_engine, automation_id: int, profile_id: int) -> tuple[dict, int]:
"""Manually trigger an automation."""
if not automation_engine:
return {'error': 'Automation engine not available'}, 500
success = automation_engine.run_now(automation_id, profile_id=profile_id)
if not success:
return {'error': 'Automation not found'}, 404
return {'success': True}, 200
def get_history(database, automation_id: int, *, limit: int, offset: int) -> dict:
"""Run-history page for an automation, with log_lines/result_json parsed."""
data = database.get_automation_run_history(automation_id, limit=limit, offset=offset)
for entry in data.get('history', []):
if entry.get('log_lines'):
try:
entry['log_lines'] = json.loads(entry['log_lines'])
except (json.JSONDecodeError, TypeError):
entry['log_lines'] = []
else:
entry['log_lines'] = []
if entry.get('result_json'):
try:
entry['result_json'] = json.loads(entry['result_json'])
except (json.JSONDecodeError, TypeError):
pass
data['automation_id'] = automation_id
return data

@ -0,0 +1,215 @@
"""Static block definitions for the automation builder UI.
Returned verbatim by `/api/automations/blocks` (with `known_signals`
injected by the route from `signals.collect_known_signals`).
Three top-level lists:
- `TRIGGERS` WHEN blocks: schedule, daily/weekly time, app started,
event triggers (track_downloaded, batch_complete, etc.), signal_received,
webhook_received.
- `ACTIONS` DO blocks: process_wishlist, scan_library, etc.
- `NOTIFICATIONS` THEN blocks: discord/pushbullet/telegram/webhook,
plus fire_signal and run_script then-actions.
"""
from __future__ import annotations
TRIGGERS: list[dict] = [
{"type": "schedule", "label": "Schedule", "icon": "clock", "description": "Run on a timer interval", "available": True,
"config_fields": [
{"key": "interval", "type": "number", "label": "Every", "default": 6, "min": 1},
{"key": "unit", "type": "select", "label": "Unit",
"options": [{"value": "minutes", "label": "Minutes"}, {"value": "hours", "label": "Hours"}, {"value": "days", "label": "Days"}],
"default": "hours"}
]},
{"type": "daily_time", "label": "Daily Time", "icon": "clock", "description": "Run every day at a specific time", "available": True,
"config_fields": [
{"key": "time", "type": "time", "label": "At", "default": "03:00"}
]},
{"type": "weekly_time", "label": "Weekly Schedule", "icon": "calendar", "description": "Run on specific days of the week at a set time", "available": True,
"config_fields": [
{"key": "time", "type": "time", "label": "At", "default": "03:00"},
{"key": "days", "type": "multi_select", "label": "Days",
"options": [{"value": "mon", "label": "Mon"}, {"value": "tue", "label": "Tue"}, {"value": "wed", "label": "Wed"},
{"value": "thu", "label": "Thu"}, {"value": "fri", "label": "Fri"}, {"value": "sat", "label": "Sat"}, {"value": "sun", "label": "Sun"}]}
]},
{"type": "app_started", "label": "App Started", "icon": "power", "description": "When SoulSync starts up", "available": True},
{"type": "track_downloaded", "label": "Track Downloaded", "icon": "download", "description": "When a track finishes downloading", "available": True,
"has_conditions": True,
"condition_fields": ["artist", "title", "album", "quality"],
"variables": ["artist", "title", "album", "quality"]},
{"type": "batch_complete", "label": "Batch Complete", "icon": "check-circle", "description": "When an album/playlist download finishes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "completed_tracks", "failed_tracks"]},
{"type": "watchlist_new_release", "label": "New Release Found", "icon": "bell", "description": "When watchlist detects new music", "available": True,
"has_conditions": True,
"condition_fields": ["artist"],
"variables": ["artist", "new_tracks", "added_to_wishlist"]},
{"type": "playlist_synced", "label": "Playlist Synced", "icon": "refresh", "description": "When a playlist sync completes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "matched_tracks", "synced_tracks", "failed_tracks"]},
{"type": "playlist_changed", "label": "Playlist Changed", "icon": "edit", "description": "When a mirrored playlist detects track changes from source", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "old_count", "new_count", "added", "removed"]},
{"type": "discovery_completed", "label": "Discovery Complete", "icon": "search", "description": "When playlist track discovery finishes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "discovered_count", "failed_count", "skipped_count"]},
# Phase 3 triggers
{"type": "wishlist_processing_completed", "label": "Wishlist Processed", "icon": "check-circle",
"description": "When auto-wishlist processing finishes", "available": True,
"variables": ["tracks_processed", "tracks_found", "tracks_failed"]},
{"type": "watchlist_scan_completed", "label": "Watchlist Scan Done", "icon": "check-circle",
"description": "When watchlist scan finishes", "available": True,
"variables": ["artists_scanned", "new_tracks_found", "tracks_added"]},
{"type": "database_update_completed", "label": "Database Updated", "icon": "database",
"description": "When library database refresh finishes", "available": True,
"variables": ["total_artists", "total_albums", "total_tracks"]},
{"type": "library_scan_completed", "label": "Library Scan Done", "icon": "hard-drive",
"description": "When media library scan finishes", "available": True,
"variables": ["server_type"]},
{"type": "download_failed", "label": "Download Failed", "icon": "x-circle",
"description": "When a track permanently fails to download", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title", "reason"],
"variables": ["artist", "title", "reason"]},
{"type": "download_quarantined", "label": "File Quarantined", "icon": "alert-triangle",
"description": "When AcoustID verification fails", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title"],
"variables": ["artist", "title", "reason"]},
{"type": "wishlist_item_added", "label": "Wishlist Item Added", "icon": "plus-circle",
"description": "When a track is added to wishlist", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title"],
"variables": ["artist", "title", "reason"]},
{"type": "watchlist_artist_added", "label": "Artist Watched", "icon": "user-plus",
"description": "When an artist is added to watchlist", "available": True,
"has_conditions": True, "condition_fields": ["artist"],
"variables": ["artist", "artist_id"]},
{"type": "watchlist_artist_removed", "label": "Artist Unwatched", "icon": "user-minus",
"description": "When an artist is removed from watchlist", "available": True,
"has_conditions": True, "condition_fields": ["artist"],
"variables": ["artist", "artist_id"]},
{"type": "import_completed", "label": "Import Complete", "icon": "upload",
"description": "When album/track import finishes", "available": True,
"has_conditions": True, "condition_fields": ["artist", "album_name"],
"variables": ["track_count", "album_name", "artist"]},
{"type": "mirrored_playlist_created", "label": "Playlist Mirrored", "icon": "copy",
"description": "When a new playlist is mirrored", "available": True,
"has_conditions": True, "condition_fields": ["playlist_name", "source"],
"variables": ["playlist_name", "source", "track_count"]},
{"type": "quality_scan_completed", "label": "Quality Scan Done", "icon": "bar-chart",
"description": "When quality scan finishes", "available": True,
"variables": ["quality_met", "low_quality", "total_scanned"]},
{"type": "duplicate_scan_completed", "label": "Duplicate Scan Done", "icon": "layers",
"description": "When duplicate cleaner finishes", "available": True,
"variables": ["files_scanned", "duplicates_found", "space_freed"]},
# Signal trigger
{"type": "signal_received", "label": "Signal Received", "icon": "zap",
"description": "When another automation fires a named signal", "available": True,
"config_fields": [
{"key": "signal_name", "type": "signal_input", "label": "Signal Name"}
],
"variables": ["signal_name"]},
# Webhook trigger
{"type": "webhook_received", "label": "Webhook Received", "icon": "globe",
"description": "When an external API request is received (POST /api/v1/request)", "available": True,
"variables": ["query", "request_id", "source"]},
]
ACTIONS: list[dict] = [
{"type": "process_wishlist", "label": "Process Wishlist", "icon": "list", "description": "Retry failed downloads from wishlist", "available": True,
"config_fields": [{"key": "category", "type": "select", "label": "Category", "options": [{"value": "all", "label": "All"}, {"value": "albums", "label": "Albums"}, {"value": "singles", "label": "Singles"}], "default": "all"}]},
{"type": "scan_watchlist", "label": "Scan Watchlist", "icon": "eye", "description": "Check watched artists for new releases", "available": True},
{"type": "scan_library", "label": "Scan Library", "icon": "refresh", "description": "Trigger media server library scan", "available": True},
{"type": "refresh_mirrored", "label": "Refresh Mirrored Playlist", "icon": "copy", "description": "Re-fetch playlist from source and update mirror", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Refresh all mirrored playlists", "default": False}
]},
{"type": "sync_playlist", "label": "Sync Playlist", "icon": "sync", "description": "Sync mirrored playlist to media server", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}
]},
{"type": "discover_playlist", "label": "Discover Playlist", "icon": "search", "description": "Find official Spotify/iTunes metadata for mirrored playlist tracks", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Discover all mirrored playlists", "default": False}
]},
{"type": "playlist_pipeline", "label": "Playlist Pipeline", "icon": "rocket",
"description": "Full lifecycle: refresh → discover → sync → download missing. One automation for the entire flow.",
"available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False},
{"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False},
]},
{"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True},
# Phase 3 actions
{"type": "start_database_update", "label": "Update Database", "icon": "database",
"description": "Trigger library database refresh", "available": True,
"config_fields": [
{"key": "full_refresh", "type": "checkbox", "label": "Full refresh (slower)", "default": False}
]},
{"type": "run_duplicate_cleaner", "label": "Run Duplicate Cleaner", "icon": "layers",
"description": "Scan for and remove duplicate files", "available": True},
{"type": "clear_quarantine", "label": "Clear Quarantine", "icon": "trash",
"description": "Delete all quarantined files", "available": True},
{"type": "cleanup_wishlist", "label": "Clean Up Wishlist", "icon": "filter",
"description": "Remove duplicate/owned tracks from wishlist", "available": True},
{"type": "update_discovery_pool", "label": "Update Discovery", "icon": "compass",
"description": "Refresh discovery pool with new tracks", "available": True},
{"type": "start_quality_scan", "label": "Run Quality Scan", "icon": "bar-chart",
"description": "Scan for low-quality audio files", "available": True,
"config_fields": [
{"key": "scope", "type": "select", "label": "Scope",
"options": [{"value": "watchlist", "label": "Watchlist Artists"}, {"value": "library", "label": "Full Library"}],
"default": "watchlist"}
]},
{"type": "backup_database", "label": "Backup Database", "icon": "save",
"description": "Create timestamped database backup", "available": True},
{"type": "refresh_beatport_cache", "label": "Refresh Beatport Cache", "icon": "music",
"description": "Scrape Beatport homepage and warm the cache", "available": True},
{"type": "clean_search_history", "label": "Clean Search History", "icon": "trash-2",
"description": "Remove old searches from Soulseek", "available": True},
{"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square",
"description": "Clear completed downloads and empty directories", "available": True},
{"type": "full_cleanup", "label": "Full Cleanup", "icon": "trash",
"description": "Clear quarantine, download queue, import folder, and search history in one sweep", "available": True},
{"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search",
"description": "Full library comparison without losing enrichment data", "available": True},
{"type": "run_script", "label": "Run Script", "icon": "terminal",
"description": "Execute a script from the scripts folder", "available": True},
{"type": "search_and_download", "label": "Search & Download", "icon": "download",
"description": "Search for a track and download the best match", "available": True,
"config_fields": [
{"key": "query", "type": "text", "label": "Search Query",
"placeholder": "Artist - Track (leave empty to use trigger's query)"}
]},
]
NOTIFICATIONS: list[dict] = [
{"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "pushbullet", "label": "Pushbullet", "icon": "push", "description": "Push notification to phone/desktop", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "telegram", "label": "Telegram", "icon": "message", "description": "Send a Telegram message", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "webhook", "label": "Webhook (POST)", "icon": "globe", "description": "Send a POST request to any URL", "available": True,
"variables": ["time", "name", "run_count", "status"]},
# Signal fire action
{"type": "fire_signal", "label": "Fire Signal", "icon": "zap",
"description": "Fire a signal that other automations can listen for", "available": True,
"config_fields": [
{"key": "signal_name", "type": "signal_input", "label": "Signal Name"}
]},
# Run script then-action
{"type": "run_script", "label": "Run Script", "icon": "terminal",
"description": "Execute a script after the action completes", "available": True,
"config_fields": [
{"key": "script_name", "type": "script_select", "label": "Script"}
]},
]

@ -0,0 +1,216 @@
"""Automation progress tracking.
Owns the in-memory progress state dict that backs both
`/api/automations/progress` polling and the WebSocket
`automation:progress` push emitter. State is per-automation, capped at
50 log entries each, and finished/error states are reaped 60s after
they finish so the frontend has a window to show the final state.
Functions are written so the route layer / engine callbacks can pass
their own socketio emitter, db handle, and shutdown flag. The progress
state dict (`progress_states`) and its lock (`progress_lock`) are
module-level so all callers share one view same as the original
web_server.py globals.
"""
from __future__ import annotations
import json
import logging
import threading
from datetime import datetime, timezone
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
# Shared mutable state — module globals so every caller (routes, engine
# progress callbacks, emit loop) sees the same dict. Mirrors the original
# `automation_progress_states` / `automation_progress_lock` in web_server.
progress_states: dict[int, dict] = {}
progress_lock = threading.Lock()
def init_progress(automation_id: int, automation_name: str, action_type: str) -> None:
"""Initialize progress state when an automation starts running."""
with progress_lock:
progress_states[automation_id] = {
'status': 'running',
'action_type': action_type,
'progress': 0,
'phase': 'Starting...',
'current_item': '',
'processed': 0,
'total': 0,
'log': [{'type': 'info', 'text': f'Starting {automation_name}'}],
'started_at': datetime.now(timezone.utc).isoformat(),
'finished_at': None,
}
def update_progress(
automation_id: Optional[int],
*,
socketio_emit: Optional[Callable[[str, Any], None]] = None,
**kwargs,
) -> None:
"""Update progress state from handler threads. Thread-safe.
`socketio_emit` lets callers wire in the live socketio.emit so that
finished/error transitions push immediately without waiting for the
1s emitter loop. Falls back to no-op if not provided.
"""
if automation_id is None:
return
with progress_lock:
state = progress_states.get(automation_id)
if not state:
return
for k, v in kwargs.items():
if k == 'log_line':
state['log'].append({'type': kwargs.get('log_type', 'info'), 'text': v})
if len(state['log']) > 50:
state['log'] = state['log'][-50:]
elif k != 'log_type':
state[k] = v
if kwargs.get('status') in ('finished', 'error'):
state['finished_at'] = datetime.now(timezone.utc).isoformat()
if socketio_emit is not None:
try:
socketio_emit('automation:progress', {str(automation_id): dict(state)})
except Exception:
pass
def get_running_progress() -> dict[str, dict]:
"""Snapshot of running/finished/error states for the polling endpoint."""
with progress_lock:
result: dict[str, dict] = {}
for aid, state in progress_states.items():
if state['status'] in ('running', 'finished', 'error'):
cp = dict(state)
cp['log'] = list(state['log'])
result[str(aid)] = cp
return result
def record_history(
automation_id: int,
result: dict,
database,
) -> None:
"""Capture progress state into run history before cleanup clears it.
`database` is passed in so the function works without a `get_database()`
global.
"""
try:
with progress_lock:
state = progress_states.get(automation_id)
if state:
started_at = state.get('started_at')
finished_at = state.get('finished_at') or datetime.now(timezone.utc).isoformat()
log_entries = list(state.get('log', []))
else:
started_at = datetime.now(timezone.utc).isoformat()
finished_at = datetime.now(timezone.utc).isoformat()
log_entries = []
duration = None
if started_at and finished_at:
try:
t0 = datetime.fromisoformat(started_at)
t1 = datetime.fromisoformat(finished_at)
duration = (t1 - t0).total_seconds()
except Exception:
pass
r_status = result.get('status', 'completed') if result else 'completed'
if r_status == 'error':
status = 'error'
elif r_status == 'skipped':
status = 'skipped'
elif r_status == 'timeout':
status = 'timeout'
else:
status = 'completed'
summary = None
for entry in reversed(log_entries):
if entry.get('type') in ('success', 'error'):
summary = entry.get('text', '')
break
if not summary and log_entries:
summary = log_entries[-1].get('text', '')
if not summary and result:
summary = result.get('reason') or result.get('error') or result.get('status', '')
result_json = json.dumps({k: v for k, v in result.items() if not k.startswith('_')}) if result else None
log_json = json.dumps(log_entries) if log_entries else None
database.insert_automation_run_history(
automation_id=automation_id,
started_at=started_at,
finished_at=finished_at,
duration_seconds=duration,
status=status,
summary=summary,
result_json=result_json,
log_lines=log_json,
)
except Exception as e:
logger.error(f"Error recording automation history for {automation_id}: {e}")
def emit_progress_loop(
socketio,
*,
is_shutting_down: Callable[[], bool],
poll_interval: float = 1.0,
timeout_seconds: int = 7200,
cleanup_after_seconds: int = 60,
) -> None:
"""Push `automation:progress` events for active automations.
Long-running loop caller wires this into a socketio background task.
- Times out zombie running states after `timeout_seconds` (default 2h).
- Reaps finished/error states `cleanup_after_seconds` after finish so the
frontend has a final-state window before they disappear.
"""
while not is_shutting_down():
socketio.sleep(poll_interval)
try:
with progress_lock:
active: dict[str, dict] = {}
stale: list[int] = []
now = datetime.now()
for aid, state in progress_states.items():
if state['status'] == 'running':
try:
started = datetime.fromisoformat(state.get('started_at', ''))
if (now - started).total_seconds() > timeout_seconds:
state['status'] = 'error'
state['phase'] = 'Timed out'
state['finished_at'] = now.isoformat()
state['log'].append({'type': 'error', 'text': f'Timed out after {timeout_seconds // 3600} hours'})
cp = dict(state)
cp['log'] = list(state['log'])
active[str(aid)] = cp
continue
except (ValueError, TypeError):
pass
cp = dict(state)
cp['log'] = list(state['log'])
active[str(aid)] = cp
elif state['status'] in ('finished', 'error') and state.get('finished_at'):
try:
finished_time = datetime.fromisoformat(state['finished_at'])
if (now - finished_time).total_seconds() > cleanup_after_seconds:
stale.append(aid)
except (ValueError, TypeError):
stale.append(aid)
for aid in stale:
del progress_states[aid]
if active:
socketio.emit('automation:progress', active)
except Exception as e:
logger.debug(f"Error emitting automation progress: {e}")

@ -0,0 +1,43 @@
"""Automation signal helpers — name collection for autocomplete.
Signal cycle detection itself lives in core/automation_engine.py
(`detect_signal_cycles`); this module just enumerates known signal
names from the saved automation set so the builder UI can autocomplete.
"""
from __future__ import annotations
import json
def collect_known_signals(database) -> list[str]:
"""Return sorted, deduped signal names referenced by any saved automation.
Walks every automation and pulls signal names from both the
`signal_received` trigger config and any `fire_signal` then-actions.
Errors at every layer are swallowed the autocomplete is best-effort.
"""
signals: set[str] = set()
try:
for auto in database.get_automations():
if auto.get('trigger_type') == 'signal_received':
try:
tc = json.loads(auto.get('trigger_config') or '{}')
sig = tc.get('signal_name', '').strip()
if sig:
signals.add(sig)
except (json.JSONDecodeError, TypeError):
pass
try:
ta = json.loads(auto.get('then_actions') or '[]')
for item in ta:
if item.get('type') == 'fire_signal':
sig = item.get('config', {}).get('signal_name', '').strip()
if sig:
signals.add(sig)
except (json.JSONDecodeError, TypeError):
pass
except Exception:
pass
return sorted(signals)

@ -281,7 +281,7 @@ def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_ti
"MusicBrainz release details",
mb_service.mb_client.get_release,
pp["release_mbid"],
includes=["release-groups", "labels", "media", "artist-credits", "recordings"],
includes=["release-groups", "labels", "media", "artist-credits", "recordings", "genres"],
) or {}
with mb_release_detail_cache_lock:
_bounded_cache_set(mb_release_detail_cache, pp["release_mbid"], release_detail, _MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES)
@ -345,6 +345,31 @@ def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_ti
except (ValueError, TypeError):
pass
# Genre fallback chain: most MusicBrainz recordings don't carry genres at
# the track level, but releases and artists usually do. If the recording
# came back empty, try the release; if that's empty too, fetch the artist
# with `includes=['genres']` and use that.
_release_detail_for_genres = locals().get("release_detail")
if not pp["mb_genres"] and _release_detail_for_genres:
pp["mb_genres"] = [
g["name"] for g in sorted(
_release_detail_for_genres.get("genres", []), key=lambda x: x.get("count", 0), reverse=True,
)
]
if not pp["mb_genres"] and pp.get("artist_mbid"):
artist_detail = _call_source_lookup(
"MusicBrainz artist details",
mb_service.mb_client.get_artist,
pp["artist_mbid"],
includes=["genres"],
)
if artist_detail:
pp["mb_genres"] = [
g["name"] for g in sorted(
artist_detail.get("genres", []), key=lambda x: x.get("count", 0), reverse=True,
)
]
def _process_deezer_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("deezer.embed_tags", True) is False:

@ -0,0 +1,478 @@
"""Tests for core/automation/api.py — CRUD + run + history helpers."""
from __future__ import annotations
import json
from core.automation import api
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
class _FakeDB:
def __init__(self):
self._next_id = 1
self.automations: dict[int, dict] = {}
self.history: dict[int, list] = {}
self.batch_group_calls = []
self.bulk_set_calls = []
def get_automations(self, profile_id=None):
if profile_id is None:
return list(self.automations.values())
return [a for a in self.automations.values() if a.get('profile_id') == profile_id]
def get_automation(self, automation_id):
return dict(self.automations[automation_id]) if automation_id in self.automations else None
def create_automation(self, name, trigger_type, trigger_config, action_type, action_config,
profile_id, notify_type, notify_config, then_actions, group_name):
aid = self._next_id
self._next_id += 1
self.automations[aid] = {
'id': aid, 'name': name, 'trigger_type': trigger_type,
'trigger_config': trigger_config, 'action_type': action_type,
'action_config': action_config, 'profile_id': profile_id,
'notify_type': notify_type, 'notify_config': notify_config,
'then_actions': then_actions, 'group_name': group_name,
'enabled': 1, 'is_system': 0,
}
return aid
def update_automation(self, automation_id, **fields):
if automation_id not in self.automations:
return False
self.automations[automation_id].update(fields)
return True
def delete_automation(self, automation_id):
if automation_id not in self.automations:
return False
del self.automations[automation_id]
return True
def toggle_automation(self, automation_id):
if automation_id not in self.automations:
return False
a = self.automations[automation_id]
a['enabled'] = 0 if a['enabled'] else 1
return True
def batch_update_group(self, ids, group_name):
self.batch_group_calls.append((ids, group_name))
return len(ids)
def bulk_set_enabled(self, ids, enabled):
self.bulk_set_calls.append((ids, enabled))
for aid in ids:
if aid in self.automations:
self.automations[aid]['enabled'] = 1 if enabled else 0
return len(ids)
def get_automation_run_history(self, automation_id, limit=50, offset=0):
return {'history': self.history.get(automation_id, [])[offset:offset + limit]}
class _FakeEngine:
def __init__(self, cycles_to_return=None):
self.scheduled = []
self.cancelled = []
self.run_now_calls = []
self._cycles = cycles_to_return or []
def schedule_automation(self, aid):
self.scheduled.append(aid)
def cancel_automation(self, aid):
self.cancelled.append(aid)
def detect_signal_cycles(self, autos):
return list(self._cycles)
def run_now(self, aid, profile_id=None):
self.run_now_calls.append((aid, profile_id))
return True
# ---------------------------------------------------------------------------
# _hydrate_automation
# ---------------------------------------------------------------------------
def test_hydrate_parses_json_columns():
raw = {
'trigger_config': '{"interval": 6}',
'action_config': '{"category": "all"}',
'notify_config': '{"webhook": "x"}',
'last_result': '{"ok": true}',
'then_actions': '[{"type": "discord", "config": {"webhook": "y"}}]',
'notify_type': None,
}
out = api._hydrate_automation(dict(raw))
assert out['trigger_config'] == {'interval': 6}
assert out['action_config'] == {'category': 'all'}
assert out['then_actions'][0]['type'] == 'discord'
def test_hydrate_invalid_json_falls_back_to_default():
raw = {
'trigger_config': 'not json',
'action_config': 'not json',
'notify_config': 'not json',
'last_result': 'not json',
'then_actions': 'not json',
'notify_type': None,
}
out = api._hydrate_automation(dict(raw))
assert out['trigger_config'] == {}
assert out['action_config'] == {}
assert out['notify_config'] == {}
assert out['last_result'] is None
assert out['then_actions'] == []
def test_hydrate_backfills_then_actions_from_legacy_notify_type():
raw = {
'trigger_config': '{}',
'action_config': '{}',
'notify_config': {'webhook_url': 'http://x'},
'last_result': None,
'then_actions': '[]',
'notify_type': 'discord',
}
out = api._hydrate_automation(dict(raw))
assert out['then_actions'] == [{'type': 'discord', 'config': {'webhook_url': 'http://x'}}]
# ---------------------------------------------------------------------------
# list_automations
# ---------------------------------------------------------------------------
def test_list_automations_filters_by_profile():
db = _FakeDB()
db.automations[1] = {'id': 1, 'profile_id': 1, 'trigger_config': '{}', 'action_config': '{}',
'notify_config': '{}', 'last_result': None, 'then_actions': '[]', 'notify_type': None}
db.automations[2] = {'id': 2, 'profile_id': 2, 'trigger_config': '{}', 'action_config': '{}',
'notify_config': '{}', 'last_result': None, 'then_actions': '[]', 'notify_type': None}
out = api.list_automations(db, profile_id=1)
assert len(out) == 1
assert out[0]['id'] == 1
# ---------------------------------------------------------------------------
# get_automation
# ---------------------------------------------------------------------------
def test_get_automation_returns_none_for_missing():
assert api.get_automation(_FakeDB(), 99) is None
def test_get_automation_returns_hydrated():
db = _FakeDB()
db.automations[5] = {'id': 5, 'trigger_config': '{"x":1}', 'action_config': '{}',
'notify_config': '{}', 'last_result': None, 'then_actions': '[]',
'notify_type': None}
out = api.get_automation(db, 5)
assert out['trigger_config'] == {'x': 1}
# ---------------------------------------------------------------------------
# create_automation
# ---------------------------------------------------------------------------
def test_create_requires_name():
body, status = api.create_automation(_FakeDB(), _FakeEngine(), profile_id=1, data={'name': ' '})
assert status == 400
assert 'Name is required' in body['error']
def test_create_happy_path_schedules():
db = _FakeDB()
eng = _FakeEngine()
body, status = api.create_automation(db, eng, profile_id=1, data={
'name': 'My Auto', 'trigger_type': 'schedule',
'trigger_config': {'interval': 6, 'unit': 'hours'},
'action_type': 'process_wishlist',
})
assert status == 200
assert body['success'] is True
assert body['id'] == 1
assert eng.scheduled == [1]
def test_create_blocks_signal_cycle():
eng = _FakeEngine(cycles_to_return=['sig_a', 'sig_b', 'sig_a'])
body, status = api.create_automation(_FakeDB(), eng, profile_id=1, data={
'name': 'Loopy',
'trigger_type': 'signal_received',
'trigger_config': {'signal_name': 'sig_a'},
'then_actions': [{'type': 'fire_signal', 'config': {'signal_name': 'sig_b'}}],
})
assert status == 400
assert 'Signal cycle detected' in body['error']
assert eng.scheduled == []
def test_create_skips_cycle_check_when_no_signals():
eng = _FakeEngine(cycles_to_return=['shouldnt fire'])
body, status = api.create_automation(_FakeDB(), eng, profile_id=1, data={
'name': 'Plain', 'trigger_type': 'schedule',
'action_type': 'process_wishlist',
'then_actions': [{'type': 'discord', 'config': {}}],
})
assert status == 200
def test_create_then_actions_back_compat_first_item_becomes_notify_type():
db = _FakeDB()
api.create_automation(db, _FakeEngine(), profile_id=1, data={
'name': 'X', 'trigger_type': 'schedule', 'action_type': 'process_wishlist',
'then_actions': [{'type': 'discord', 'config': {'webhook': 'http://x'}}],
})
stored = db.automations[1]
assert stored['notify_type'] == 'discord'
assert json.loads(stored['notify_config']) == {'webhook': 'http://x'}
# ---------------------------------------------------------------------------
# update_automation
# ---------------------------------------------------------------------------
def test_update_with_no_fields_returns_400():
body, status = api.update_automation(_FakeDB(), _FakeEngine(), automation_id=1, data={})
assert status == 400
def test_update_missing_id_returns_404():
body, status = api.update_automation(_FakeDB(), _FakeEngine(), automation_id=99, data={'name': 'x'})
assert status == 404
def test_update_blocks_signal_cycle():
db = _FakeDB()
db.automations[1] = {'id': 1, 'name': 'a', 'trigger_type': 'schedule', 'trigger_config': '{}',
'then_actions': '[]', 'enabled': 1, 'is_system': 0}
eng = _FakeEngine(cycles_to_return=['sig_a', 'sig_a'])
body, status = api.update_automation(db, eng, automation_id=1, data={
'trigger_type': 'signal_received',
'trigger_config': {'signal_name': 'sig_a'},
'then_actions': [{'type': 'fire_signal', 'config': {'signal_name': 'sig_a'}}],
})
assert status == 400
assert 'Signal cycle detected' in body['error']
def test_update_reschedules_when_enabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 1}
eng = _FakeEngine()
body, status = api.update_automation(db, eng, automation_id=1, data={'name': 'renamed'})
assert status == 200
assert eng.scheduled == [1]
def test_update_cancels_when_disabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 0}
eng = _FakeEngine()
body, status = api.update_automation(db, eng, automation_id=1, data={'name': 'r'})
assert status == 200
assert eng.cancelled == [1]
def test_update_then_actions_clears_notify_when_empty():
db = _FakeDB()
db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 0}
api.update_automation(db, _FakeEngine(), automation_id=1, data={'then_actions': []})
assert db.automations[1]['notify_type'] is None
assert db.automations[1]['notify_config'] == '{}'
# ---------------------------------------------------------------------------
# batch_update_group
# ---------------------------------------------------------------------------
def test_batch_group_requires_list():
body, status = api.batch_update_group(_FakeDB(), [], 'group')
assert status == 400
def test_batch_group_rejects_non_int_ids():
body, status = api.batch_update_group(_FakeDB(), ['abc'], 'g')
assert status == 400
def test_batch_group_happy_path():
db = _FakeDB()
body, status = api.batch_update_group(db, [1, 2, 3], 'mygroup')
assert status == 200
assert body['updated'] == 3
assert db.batch_group_calls == [([1, 2, 3], 'mygroup')]
def test_batch_group_can_ungroup_with_none():
db = _FakeDB()
body, status = api.batch_update_group(db, [1], None)
assert status == 200
assert db.batch_group_calls == [([1], None)]
# ---------------------------------------------------------------------------
# bulk_toggle
# ---------------------------------------------------------------------------
def test_bulk_toggle_reschedules_enabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'enabled': 1}
db.automations[2] = {'id': 2, 'enabled': 1}
eng = _FakeEngine()
body, status = api.bulk_toggle(db, eng, [1, 2], enabled=True)
assert status == 200
assert body['updated'] == 2
def test_bulk_toggle_cancels_disabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'enabled': 1}
eng = _FakeEngine()
api.bulk_toggle(db, eng, [1], enabled=False)
assert eng.cancelled == [1]
def test_bulk_toggle_requires_non_empty_list():
body, status = api.bulk_toggle(_FakeDB(), _FakeEngine(), [], True)
assert status == 400
# ---------------------------------------------------------------------------
# delete_automation
# ---------------------------------------------------------------------------
def test_delete_protects_system_automations():
db = _FakeDB()
db.automations[1] = {'id': 1, 'is_system': 1}
body, status = api.delete_automation(db, _FakeEngine(), 1)
assert status == 403
def test_delete_cancels_engine_then_db():
db = _FakeDB()
db.automations[1] = {'id': 1, 'is_system': 0}
eng = _FakeEngine()
body, status = api.delete_automation(db, eng, 1)
assert status == 200
assert eng.cancelled == [1]
assert 1 not in db.automations
def test_delete_missing_returns_404():
body, status = api.delete_automation(_FakeDB(), _FakeEngine(), 99)
assert status == 404
# ---------------------------------------------------------------------------
# duplicate_automation
# ---------------------------------------------------------------------------
def test_duplicate_appends_copy_suffix_and_schedules():
db = _FakeDB()
db.automations[1] = {
'id': 1, 'name': 'Orig', 'trigger_type': 'schedule', 'trigger_config': '{}',
'action_type': 'process_wishlist', 'action_config': '{}', 'is_system': 0,
'notify_type': None, 'notify_config': '{}', 'then_actions': '[]', 'group_name': None,
}
db._next_id = 2 # bump so create_automation doesn't overwrite id 1
eng = _FakeEngine()
body, status = api.duplicate_automation(db, eng, profile_id=1, automation_id=1)
assert status == 200
assert db.automations[2]['name'] == 'Orig (Copy)'
assert eng.scheduled == [2]
def test_duplicate_protects_system():
db = _FakeDB()
db.automations[1] = {'id': 1, 'is_system': 1, 'name': 'sys'}
body, status = api.duplicate_automation(db, _FakeEngine(), profile_id=1, automation_id=1)
assert status == 403
def test_duplicate_missing_returns_404():
body, status = api.duplicate_automation(_FakeDB(), _FakeEngine(), profile_id=1, automation_id=99)
assert status == 404
# ---------------------------------------------------------------------------
# toggle_automation
# ---------------------------------------------------------------------------
def test_toggle_reschedules_when_now_enabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'enabled': 0} # currently off
eng = _FakeEngine()
body, status = api.toggle_automation(db, eng, 1)
assert status == 200
assert eng.scheduled == [1]
def test_toggle_cancels_when_now_disabled():
db = _FakeDB()
db.automations[1] = {'id': 1, 'enabled': 1} # currently on
eng = _FakeEngine()
api.toggle_automation(db, eng, 1)
assert eng.cancelled == [1]
def test_toggle_missing_returns_404():
body, status = api.toggle_automation(_FakeDB(), _FakeEngine(), 99)
assert status == 404
# ---------------------------------------------------------------------------
# run_automation
# ---------------------------------------------------------------------------
def test_run_calls_engine_run_now_with_profile():
eng = _FakeEngine()
body, status = api.run_automation(eng, automation_id=5, profile_id=2)
assert status == 200
assert eng.run_now_calls == [(5, 2)]
def test_run_no_engine_returns_500():
body, status = api.run_automation(None, 1, 1)
assert status == 500
def test_run_missing_automation_returns_404():
class _MissEngine(_FakeEngine):
def run_now(self, aid, profile_id=None):
return False
body, status = api.run_automation(_MissEngine(), 1, 1)
assert status == 404
# ---------------------------------------------------------------------------
# get_history
# ---------------------------------------------------------------------------
def test_history_parses_log_lines_and_result_json():
db = _FakeDB()
db.history[1] = [
{'id': 1, 'log_lines': '[{"text":"ok"}]', 'result_json': '{"k":"v"}'},
{'id': 2, 'log_lines': '', 'result_json': None},
]
out = api.get_history(db, 1, limit=10, offset=0)
assert out['automation_id'] == 1
assert out['history'][0]['log_lines'] == [{'text': 'ok'}]
assert out['history'][0]['result_json'] == {'k': 'v'}
assert out['history'][1]['log_lines'] == []
def test_history_invalid_json_falls_back():
db = _FakeDB()
db.history[1] = [{'id': 1, 'log_lines': 'not json', 'result_json': 'not json'}]
out = api.get_history(db, 1, limit=10, offset=0)
assert out['history'][0]['log_lines'] == []
# result_json stays as the original string when not parseable
assert out['history'][0]['result_json'] == 'not json'

@ -0,0 +1,82 @@
"""Tests for core/automation/blocks.py — static block definitions for the builder UI.
Catches accidental schema regressions in the builder block list (missing
`type`/`label`, malformed config_fields options, etc.).
"""
from __future__ import annotations
from core.automation import blocks
def _shape_check(items, allowed_types):
"""Every item has type+label+description, plus type-specific shape rules."""
seen_types = set()
for item in items:
assert 'type' in item, item
assert 'label' in item, item
assert isinstance(item.get('available'), bool), item
# No duplicate types within a list
assert item['type'] not in seen_types, f"Duplicate type {item['type']!r}"
seen_types.add(item['type'])
if 'config_fields' in item:
for field in item['config_fields']:
assert 'key' in field
assert 'type' in field
assert field['type'] in allowed_types, f"Unknown field type {field['type']!r} in {item['type']}"
if field['type'] == 'select':
assert 'options' in field
for opt in field['options']:
assert 'value' in opt
assert 'label' in opt
_FIELD_TYPES = {
'number', 'select', 'time', 'multi_select', 'checkbox', 'text',
'mirrored_playlist_select', 'signal_input', 'script_select',
}
def test_triggers_shape():
_shape_check(blocks.TRIGGERS, _FIELD_TYPES)
def test_actions_shape():
_shape_check(blocks.ACTIONS, _FIELD_TYPES)
def test_notifications_shape():
_shape_check(blocks.NOTIFICATIONS, _FIELD_TYPES)
def test_signal_received_trigger_present():
types = {t['type'] for t in blocks.TRIGGERS}
assert 'signal_received' in types
def test_fire_signal_notification_present():
types = {n['type'] for n in blocks.NOTIFICATIONS}
assert 'fire_signal' in types
def test_run_script_in_both_actions_and_notifications():
"""run_script can be either an action or a then-action — both lists own it."""
action_types = {a['type'] for a in blocks.ACTIONS}
notif_types = {n['type'] for n in blocks.NOTIFICATIONS}
assert 'run_script' in action_types
assert 'run_script' in notif_types
def test_schedule_trigger_default_unit_is_hours():
schedule = next(t for t in blocks.TRIGGERS if t['type'] == 'schedule')
unit_field = next(f for f in schedule['config_fields'] if f['key'] == 'unit')
assert unit_field['default'] == 'hours'
def test_event_triggers_with_conditions_have_condition_fields():
for t in blocks.TRIGGERS:
if t.get('has_conditions'):
assert 'condition_fields' in t, f"{t['type']} marked has_conditions but no condition_fields"
assert isinstance(t['condition_fields'], list)
assert len(t['condition_fields']) > 0

@ -0,0 +1,290 @@
"""Tests for core/automation/progress.py — progress state lifecycle + emit loop."""
from __future__ import annotations
import time
from datetime import datetime, timedelta, timezone
import pytest
from core.automation import progress
@pytest.fixture(autouse=True)
def reset_state():
"""Each test gets a clean progress state dict."""
progress.progress_states.clear()
yield
progress.progress_states.clear()
# ---------------------------------------------------------------------------
# init_progress
# ---------------------------------------------------------------------------
def test_init_progress_seeds_running_state():
progress.init_progress(7, 'My Automation', 'process_wishlist')
state = progress.progress_states[7]
assert state['status'] == 'running'
assert state['action_type'] == 'process_wishlist'
assert state['progress'] == 0
assert state['phase'] == 'Starting...'
assert state['log'][0] == {'type': 'info', 'text': 'Starting My Automation'}
assert state['started_at'] is not None
assert state['finished_at'] is None
def test_init_progress_overwrites_existing_state():
progress.init_progress(7, 'First', 'a')
progress.init_progress(7, 'Second', 'b')
assert progress.progress_states[7]['action_type'] == 'b'
# ---------------------------------------------------------------------------
# update_progress
# ---------------------------------------------------------------------------
def test_update_progress_writes_simple_fields():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, progress=42, phase='Working')
assert progress.progress_states[1]['progress'] == 42
assert progress.progress_states[1]['phase'] == 'Working'
def test_update_progress_log_line_appends_with_type():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, log_line='hello', log_type='success')
log = progress.progress_states[1]['log']
assert log[-1] == {'type': 'success', 'text': 'hello'}
def test_update_progress_log_caps_at_50_entries():
progress.init_progress(1, 'A', 'x')
for i in range(60):
progress.update_progress(1, log_line=f'line {i}')
assert len(progress.progress_states[1]['log']) == 50
assert progress.progress_states[1]['log'][-1]['text'] == 'line 59'
def test_update_progress_log_type_not_stored_as_field():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, log_line='hi', log_type='warning')
assert 'log_type' not in progress.progress_states[1]
def test_update_progress_finish_sets_finished_at_and_emits():
progress.init_progress(1, 'A', 'x')
emitted = []
def _emit(event, data):
emitted.append((event, data))
progress.update_progress(1, status='finished', socketio_emit=_emit)
assert progress.progress_states[1]['finished_at'] is not None
assert emitted[0][0] == 'automation:progress'
assert '1' in emitted[0][1]
def test_update_progress_error_status_also_emits():
progress.init_progress(1, 'A', 'x')
emitted = []
progress.update_progress(1, status='error', socketio_emit=lambda e, d: emitted.append(e))
assert emitted == ['automation:progress']
def test_update_progress_running_status_does_not_emit():
progress.init_progress(1, 'A', 'x')
emitted = []
progress.update_progress(1, status='running', progress=50, socketio_emit=lambda e, d: emitted.append(e))
assert emitted == []
def test_update_progress_emit_failure_swallowed():
progress.init_progress(1, 'A', 'x')
def _bad(event, data):
raise RuntimeError('socket dead')
# Should NOT raise
progress.update_progress(1, status='finished', socketio_emit=_bad)
assert progress.progress_states[1]['finished_at'] is not None
def test_update_progress_none_id_is_noop():
progress.update_progress(None, progress=99) # no exception
def test_update_progress_unknown_id_is_noop():
progress.update_progress(999, progress=99)
assert 999 not in progress.progress_states
# ---------------------------------------------------------------------------
# get_running_progress
# ---------------------------------------------------------------------------
def test_get_running_progress_returns_running_finished_error():
progress.init_progress(1, 'A', 'x')
progress.init_progress(2, 'B', 'y')
progress.init_progress(3, 'C', 'z')
progress.update_progress(2, status='finished')
progress.update_progress(3, status='error')
progress.progress_states[4] = {'status': 'unknown', 'log': []}
snapshot = progress.get_running_progress()
assert set(snapshot.keys()) == {'1', '2', '3'}
def test_get_running_progress_copies_log_list():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, log_line='first')
snapshot = progress.get_running_progress()
snapshot['1']['log'].append({'type': 'info', 'text': 'mutated'})
# Original state should not be affected
assert len(progress.progress_states[1]['log']) == 2 # init + 'first'
# ---------------------------------------------------------------------------
# record_history
# ---------------------------------------------------------------------------
class _FakeDB:
def __init__(self):
self.calls = []
def insert_automation_run_history(self, **kw):
self.calls.append(kw)
def test_record_history_uses_progress_log_when_available():
progress.init_progress(1, 'A', 'wishlist')
progress.update_progress(1, log_line='did stuff', log_type='success')
progress.update_progress(1, status='finished', socketio_emit=None)
db = _FakeDB()
progress.record_history(1, {'status': 'completed'}, db)
assert len(db.calls) == 1
call = db.calls[0]
assert call['automation_id'] == 1
assert call['status'] == 'completed'
assert call['summary'] == 'did stuff'
assert call['duration_seconds'] is not None
def test_record_history_status_mapping():
db = _FakeDB()
progress.record_history(1, {'status': 'error'}, db)
assert db.calls[-1]['status'] == 'error'
progress.record_history(2, {'status': 'skipped'}, db)
assert db.calls[-1]['status'] == 'skipped'
progress.record_history(3, {'status': 'timeout'}, db)
assert db.calls[-1]['status'] == 'timeout'
progress.record_history(4, {'status': 'completed'}, db)
assert db.calls[-1]['status'] == 'completed'
def test_record_history_underscore_keys_stripped_from_result_json():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, status='finished', socketio_emit=None)
db = _FakeDB()
progress.record_history(1, {'status': 'completed', '_internal': 'x', 'visible': 'y'}, db)
import json as _json
parsed = _json.loads(db.calls[0]['result_json'])
assert '_internal' not in parsed
assert parsed.get('visible') == 'y'
def test_record_history_falls_back_to_result_summary_when_no_log():
db = _FakeDB()
progress.record_history(1, {'status': 'error', 'reason': 'bad config'}, db)
assert db.calls[0]['summary'] == 'bad config'
def test_record_history_no_progress_state_uses_now_for_times():
db = _FakeDB()
progress.record_history(99, {'status': 'completed'}, db)
call = db.calls[0]
assert call['started_at'] is not None
assert call['finished_at'] is not None
def test_record_history_db_failure_swallowed():
class _BrokenDB:
def insert_automation_run_history(self, **kw):
raise RuntimeError('db dead')
progress.init_progress(1, 'A', 'x')
# Should NOT raise
progress.record_history(1, {'status': 'completed'}, _BrokenDB())
# ---------------------------------------------------------------------------
# emit_progress_loop
# ---------------------------------------------------------------------------
class _FakeSocket:
def __init__(self):
self.emitted = []
self._sleep_count = 0
self._max_sleeps = 1
def sleep(self, seconds):
self._sleep_count += 1
def emit(self, event, data):
self.emitted.append((event, data))
def _shutdown_after(n_sleeps, sock):
"""Build an is_shutting_down predicate that returns True after n loops."""
def _check():
return sock._sleep_count >= n_sleeps
return _check
def test_emit_loop_emits_running_state():
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, progress=50)
sock = _FakeSocket()
progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), poll_interval=0)
assert len(sock.emitted) == 1
assert sock.emitted[0][0] == 'automation:progress'
assert '1' in sock.emitted[0][1]
def test_emit_loop_attempts_timeout_check_with_naive_datetime():
"""Documents pre-existing bug: started_at is tz-aware, now is naive,
so subtraction raises TypeError caught timeout never fires.
Lift preserves the bug; fix lives in a separate PR.
"""
progress.init_progress(1, 'A', 'x')
state = progress.progress_states[1]
state['started_at'] = (datetime.now(timezone.utc) - timedelta(hours=3)).isoformat()
sock = _FakeSocket()
progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock),
poll_interval=0, timeout_seconds=7200)
# Bug: timeout doesn't fire because datetime.now() is naive but
# started_at is tz-aware → subtraction raises → except → fall through
# to the normal "running" branch. State stays 'running'.
assert progress.progress_states[1]['status'] == 'running'
def test_emit_loop_reaps_finished_states_due_to_naive_aware_mismatch():
"""Documents pre-existing bug: finished_at is tz-aware, now is naive,
so the cleanup math raises caught state is reaped on FIRST tick
regardless of `cleanup_after_seconds`. Lift preserves the bug.
"""
progress.init_progress(1, 'A', 'x')
progress.update_progress(1, status='finished', socketio_emit=None)
sock = _FakeSocket()
progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock),
poll_interval=0, cleanup_after_seconds=300)
# Bug: should be kept (300s window, just finished), but the TypeError
# in the cleanup math is caught with a fall-through that ALSO reaps.
assert 1 not in progress.progress_states
def test_emit_loop_no_active_states_no_emit():
sock = _FakeSocket()
progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), poll_interval=0)
assert sock.emitted == []

@ -0,0 +1,92 @@
"""Tests for core/automation/signals.py — known signal collection."""
from __future__ import annotations
import json
from core.automation import signals
class _FakeDB:
def __init__(self, automations):
self._autos = automations
def get_automations(self, profile_id=None):
return self._autos
def test_no_automations_returns_empty():
db = _FakeDB([])
assert signals.collect_known_signals(db) == []
def test_signal_received_trigger_collected():
db = _FakeDB([
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'job_done'}), 'then_actions': '[]'},
])
assert signals.collect_known_signals(db) == ['job_done']
def test_fire_signal_then_action_collected():
db = _FakeDB([
{'trigger_type': 'schedule', 'trigger_config': '{}',
'then_actions': json.dumps([{'type': 'fire_signal', 'config': {'signal_name': 'cleanup_done'}}])},
])
assert signals.collect_known_signals(db) == ['cleanup_done']
def test_collected_signals_are_sorted_and_deduped():
db = _FakeDB([
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'zebra'}), 'then_actions': '[]'},
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'apple'}), 'then_actions': '[]'},
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'apple'}), 'then_actions': '[]'},
])
assert signals.collect_known_signals(db) == ['apple', 'zebra']
def test_empty_signal_name_skipped():
db = _FakeDB([
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': ''}), 'then_actions': '[]'},
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': ' '}), 'then_actions': '[]'},
])
assert signals.collect_known_signals(db) == []
def test_malformed_trigger_config_swallowed():
db = _FakeDB([
{'trigger_type': 'signal_received', 'trigger_config': 'not json', 'then_actions': '[]'},
])
assert signals.collect_known_signals(db) == []
def test_malformed_then_actions_swallowed():
db = _FakeDB([
{'trigger_type': 'schedule', 'trigger_config': '{}', 'then_actions': 'not json'},
])
assert signals.collect_known_signals(db) == []
def test_db_failure_returns_empty():
class _BrokenDB:
def get_automations(self, profile_id=None):
raise RuntimeError("db dead")
assert signals.collect_known_signals(_BrokenDB()) == []
def test_mixed_trigger_and_action_signals_merged():
db = _FakeDB([
{'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'sig_a'}),
'then_actions': json.dumps([{'type': 'fire_signal', 'config': {'signal_name': 'sig_b'}}])},
])
assert signals.collect_known_signals(db) == ['sig_a', 'sig_b']
def test_non_signal_then_action_ignored():
db = _FakeDB([
{'trigger_type': 'schedule', 'trigger_config': '{}',
'then_actions': json.dumps([
{'type': 'discord', 'config': {'webhook_url': 'http://x'}},
{'type': 'fire_signal', 'config': {'signal_name': 'real_sig'}},
])},
])
assert signals.collect_known_signals(db) == ['real_sig']

@ -359,6 +359,9 @@ def test_musicbrainz_release_lookup_failure_does_not_poison_cache(monkeypatch):
def get_release(self, mbid, includes=None):
return {}
def get_artist(self, mbid, includes=None):
return {}
class _FakeMBService:
def __init__(self):
self.release_calls = 0
@ -593,3 +596,111 @@ def test_download_cover_art_uses_album_context_image_url(tmp_path, monkeypatch):
cover_path = target_dir / "cover.jpg"
assert cover_path.exists()
assert cover_path.read_bytes() == b"cover-bytes"
# ---------------------------------------------------------------------------
# MusicBrainz genre fallback chain (recording → release → artist)
# ---------------------------------------------------------------------------
def _build_mb_genre_test(monkeypatch, *, recording_genres, release_genres, artist_genres):
"""Helper: assemble a fake MB stack with configurable genres at each tier."""
class _FakeMBClient:
def __init__(self):
self.artist_calls = 0
self.release_calls = 0
def get_recording(self, mbid, includes=None):
return {"isrcs": [], "genres": list(recording_genres)}
def get_release(self, mbid, includes=None):
self.release_calls += 1
return {"genres": list(release_genres), "media": []}
def get_artist(self, mbid, includes=None):
self.artist_calls += 1
return {"genres": list(artist_genres)}
class _FakeMBService:
def __init__(self):
self.mb_client = _FakeMBClient()
def match_recording(self, t, a):
return {"mbid": "rec-mbid"}
def match_artist(self, a):
return {"mbid": "artist-mbid"}
def match_release(self, album, artist):
return {"mbid": "release-mbid"}
service = _FakeMBService()
monkeypatch.setattr(ms, "get_config_manager", lambda: _Config({"musicbrainz.embed_tags": True}))
monkeypatch.setattr(ms, "mb_release_cache", {})
monkeypatch.setattr(ms, "mb_release_detail_cache", {})
runtime = types.SimpleNamespace(mb_worker=types.SimpleNamespace(mb_service=service))
pp = {
"id_tags": {}, "track_title": "T", "artist_name": "A", "batch_artist_name": "A",
"metadata": {"album": "Alb"}, "recording_mbid": None, "artist_mbid": None,
"release_mbid": "", "mb_genres": [], "isrc": None,
"deezer_bpm": None, "deezer_isrc": None,
"audiodb_mood": None, "audiodb_style": None, "audiodb_genre": None,
"tidal_isrc": None, "tidal_copyright": None,
"qobuz_isrc": None, "qobuz_copyright": None, "qobuz_label": None,
"lastfm_tags": [], "lastfm_url": None, "genius_url": None, "release_year": None,
}
return pp, service, runtime
def test_mb_genre_recording_used_when_present(monkeypatch):
pp, service, runtime = _build_mb_genre_test(
monkeypatch,
recording_genres=[{"name": "Rock", "count": 5}],
release_genres=[{"name": "Pop", "count": 10}],
artist_genres=[{"name": "Jazz", "count": 20}],
)
ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}),
runtime, "T", "A")
assert pp["mb_genres"] == ["Rock"]
# Release/artist genre lookups not consulted because recording had genres
assert service.mb_client.artist_calls == 0
def test_mb_genre_falls_back_to_release_when_recording_empty(monkeypatch):
pp, service, runtime = _build_mb_genre_test(
monkeypatch,
recording_genres=[],
release_genres=[{"name": "Pop", "count": 10}, {"name": "Indie", "count": 3}],
artist_genres=[{"name": "Jazz", "count": 20}],
)
ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}),
runtime, "T", "A")
# Sorted by count desc: Pop (10) before Indie (3)
assert pp["mb_genres"] == ["Pop", "Indie"]
# Artist not consulted because release had genres
assert service.mb_client.artist_calls == 0
def test_mb_genre_falls_back_to_artist_when_recording_and_release_empty(monkeypatch):
pp, service, runtime = _build_mb_genre_test(
monkeypatch,
recording_genres=[],
release_genres=[],
artist_genres=[{"name": "Jazz", "count": 20}, {"name": "Fusion", "count": 5}],
)
ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}),
runtime, "T", "A")
assert pp["mb_genres"] == ["Jazz", "Fusion"]
assert service.mb_client.artist_calls == 1
def test_mb_genre_all_empty_returns_empty(monkeypatch):
pp, service, runtime = _build_mb_genre_test(
monkeypatch,
recording_genres=[],
release_genres=[],
artist_genres=[],
)
ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}),
runtime, "T", "A")
assert pp["mb_genres"] == []

@ -2312,66 +2312,7 @@ def _register_automation_handlers():
def _record_automation_history(aid, result):
"""Capture progress state into run history before cleanup clears it."""
try:
with automation_progress_lock:
state = automation_progress_states.get(aid)
if state:
started_at = state.get('started_at')
finished_at = state.get('finished_at') or datetime.now(timezone.utc).isoformat()
log_entries = list(state.get('log', []))
else:
started_at = datetime.now(timezone.utc).isoformat()
finished_at = datetime.now(timezone.utc).isoformat()
log_entries = []
# Compute duration
duration = None
if started_at and finished_at:
try:
t0 = datetime.fromisoformat(started_at)
t1 = datetime.fromisoformat(finished_at)
duration = (t1 - t0).total_seconds()
except Exception:
pass
# Determine status
r_status = result.get('status', 'completed') if result else 'completed'
if r_status == 'error':
status = 'error'
elif r_status == 'skipped':
status = 'skipped'
elif r_status == 'timeout':
status = 'timeout'
else:
status = 'completed'
# Extract summary from the last success/error log line
summary = None
for entry in reversed(log_entries):
if entry.get('type') in ('success', 'error'):
summary = entry.get('text', '')
break
if not summary and log_entries:
summary = log_entries[-1].get('text', '')
# Fallback: use reason or error from result when no log entries captured
if not summary and result:
summary = result.get('reason') or result.get('error') or result.get('status', '')
result_json = json.dumps({k: v for k, v in result.items() if not k.startswith('_')}) if result else None
log_json = json.dumps(log_entries) if log_entries else None
get_database().insert_automation_run_history(
automation_id=aid,
started_at=started_at,
finished_at=finished_at,
duration_seconds=duration,
status=status,
summary=summary,
result_json=result_json,
log_lines=log_json
)
except Exception as e:
logger.error(f"Error recording automation history for {aid}: {e}")
_auto_progress.record_history(aid, result, get_database())
automation_engine.register_progress_callbacks(_progress_init, _progress_finish, _update_automation_progress, _record_automation_history)
@ -2410,44 +2351,23 @@ except Exception as e:
# --- Automation Progress Tracking ---
automation_progress_states = {} # automation_id (int) -> state dict
automation_progress_lock = threading.Lock()
# State + helpers live in core/automation/progress.py. Re-exported here
# so the existing call sites (registered as engine progress callbacks
# and used inside _record_automation_history) keep resolving.
from core.automation import progress as _auto_progress
automation_progress_states = _auto_progress.progress_states
automation_progress_lock = _auto_progress.progress_lock
def _init_automation_progress(automation_id, automation_name, action_type):
"""Initialize progress state when an automation starts running."""
with automation_progress_lock:
automation_progress_states[automation_id] = {
'status': 'running',
'action_type': action_type,
'progress': 0, 'phase': 'Starting...', 'current_item': '',
'processed': 0, 'total': 0,
'log': [{'type': 'info', 'text': f'Starting {automation_name}'}],
'started_at': datetime.now(timezone.utc).isoformat(),
'finished_at': None,
}
_auto_progress.init_progress(automation_id, automation_name, action_type)
def _update_automation_progress(automation_id, **kwargs):
"""Update progress state from handler threads. Thread-safe."""
if automation_id is None:
return
with automation_progress_lock:
state = automation_progress_states.get(automation_id)
if not state:
return
for k, v in kwargs.items():
if k == 'log_line':
state['log'].append({'type': kwargs.get('log_type', 'info'), 'text': v})
if len(state['log']) > 50:
state['log'] = state['log'][-50:]
elif k != 'log_type':
state[k] = v
# Immediate emit on finish so frontend gets final state without waiting for loop
if kwargs.get('status') in ('finished', 'error'):
state['finished_at'] = datetime.now(timezone.utc).isoformat()
try:
socketio.emit('automation:progress', {str(automation_id): dict(state)})
except Exception:
pass
_auto_progress.update_progress(automation_id, socketio_emit=socketio.emit, **kwargs)
# --- Global Matched Downloads Context Management ---
# Shared with core.runtime_state so the refactored pipeline and web
@ -6412,32 +6332,18 @@ def get_genre_whitelist_defaults():
return jsonify({'genres': sorted(DEFAULT_GENRES, key=str.lower)})
# Automation route bodies live in core/automation/api.py — these routes are thin handlers.
from core.automation import api as _auto_api
from core.automation import blocks as _auto_blocks
from core.automation import signals as _auto_signals
@app.route('/api/automations', methods=['GET'])
def list_automations():
"""List all automations for the current profile."""
try:
profile_id = session.get('profile_id', 1)
db = get_database()
automations = db.get_automations(profile_id)
# Parse JSON config fields for frontend
for auto in automations:
for field in ('trigger_config', 'action_config', 'notify_config', 'last_result'):
try:
auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field]
except (json.JSONDecodeError, TypeError):
if field in ('trigger_config', 'action_config', 'notify_config'):
auto[field] = {}
else:
auto[field] = None
# Parse then_actions
try:
auto['then_actions'] = json.loads(auto.get('then_actions') or '[]') if isinstance(auto.get('then_actions'), str) else (auto.get('then_actions') or [])
except (json.JSONDecodeError, TypeError):
auto['then_actions'] = []
# Backward compat: if then_actions empty but notify_type set, build it
if not auto['then_actions'] and auto.get('notify_type'):
auto['then_actions'] = [{'type': auto['notify_type'], 'config': auto.get('notify_config', {})}]
return jsonify(automations)
return jsonify(_auto_api.list_automations(get_database(), profile_id))
except Exception as e:
logger.error(f"Error listing automations: {e}")
return jsonify({"error": str(e)}), 500
@ -6446,80 +6352,21 @@ def list_automations():
def create_automation():
"""Create a new automation."""
try:
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({"error": "Name is required"}), 400
trigger_type = data.get('trigger_type', 'schedule')
trigger_config = json.dumps(data.get('trigger_config', {}))
action_type = data.get('action_type', 'process_wishlist')
action_config = json.dumps(data.get('action_config', {}))
# then_actions array (new multi-then system)
then_actions = data.get('then_actions', [])
then_actions_json = json.dumps(then_actions)
# Backward compat: derive notify_type/notify_config from first then_action
if then_actions:
notify_type = then_actions[0].get('type')
notify_config = json.dumps(then_actions[0].get('config', {}))
else:
notify_type = data.get('notify_type') or None
notify_config = json.dumps(data.get('notify_config', {})) if notify_type else '{}'
profile_id = session.get('profile_id', 1)
# Signal cycle detection
if automation_engine and (trigger_type == 'signal_received' or any(t.get('type') == 'fire_signal' for t in then_actions)):
db = get_database()
all_autos = db.get_automations(profile_id)
test_auto = {
'trigger_type': trigger_type,
'trigger_config': trigger_config,
'then_actions': then_actions_json,
'enabled': True,
}
all_autos.append(test_auto)
cycle = automation_engine.detect_signal_cycles(all_autos)
if cycle:
return jsonify({"error": f"Signal cycle detected: {''.join(cycle)}. This would cause an infinite loop."}), 400
group_name = data.get('group_name') or None
db = get_database()
auto_id = db.create_automation(name, trigger_type, trigger_config, action_type, action_config, profile_id, notify_type, notify_config, then_actions_json, group_name)
if auto_id is None:
return jsonify({"error": "Failed to create automation"}), 500
# Schedule it
if automation_engine:
automation_engine.schedule_automation(auto_id)
return jsonify({"success": True, "id": auto_id})
body, status = _auto_api.create_automation(get_database(), automation_engine, profile_id, request.get_json())
return jsonify(body), status
except Exception as e:
logger.error(f"Error creating automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/<int:automation_id>', methods=['GET'])
def get_automation(automation_id):
"""Get a single automation."""
try:
db = get_database()
auto = db.get_automation(automation_id)
if not auto:
auto = _auto_api.get_automation(get_database(), automation_id)
if auto is None:
return jsonify({"error": "Automation not found"}), 404
for field in ('trigger_config', 'action_config', 'notify_config', 'last_result'):
try:
auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field]
except (json.JSONDecodeError, TypeError):
if field in ('trigger_config', 'action_config', 'notify_config'):
auto[field] = {}
else:
auto[field] = None
# Parse then_actions
try:
auto['then_actions'] = json.loads(auto.get('then_actions') or '[]') if isinstance(auto.get('then_actions'), str) else (auto.get('then_actions') or [])
except (json.JSONDecodeError, TypeError):
auto['then_actions'] = []
if not auto['then_actions'] and auto.get('notify_type'):
auto['then_actions'] = [{'type': auto['notify_type'], 'config': auto.get('notify_config', {})}]
return jsonify(auto)
except Exception as e:
logger.error(f"Error getting automation: {e}")
@ -6529,102 +6376,20 @@ def get_automation(automation_id):
def update_automation_endpoint(automation_id):
"""Update an automation."""
try:
data = request.get_json()
db = get_database()
update_fields = {}
if 'name' in data:
update_fields['name'] = data['name'].strip()
if 'trigger_type' in data:
update_fields['trigger_type'] = data['trigger_type']
if 'trigger_config' in data:
update_fields['trigger_config'] = json.dumps(data['trigger_config'])
if 'action_type' in data:
update_fields['action_type'] = data['action_type']
if 'action_config' in data:
update_fields['action_config'] = json.dumps(data['action_config'])
if 'then_actions' in data:
then_actions = data['then_actions']
update_fields['then_actions'] = json.dumps(then_actions)
# Backward compat: derive notify_type/notify_config from first then_action
if then_actions:
update_fields['notify_type'] = then_actions[0].get('type')
update_fields['notify_config'] = json.dumps(then_actions[0].get('config', {}))
else:
update_fields['notify_type'] = None
update_fields['notify_config'] = '{}'
elif 'notify_type' in data:
update_fields['notify_type'] = data['notify_type'] or None
if 'notify_config' in data and 'then_actions' not in data:
update_fields['notify_config'] = json.dumps(data['notify_config'])
if 'group_name' in data:
update_fields['group_name'] = data['group_name'] or None
if not update_fields:
return jsonify({"error": "No fields to update"}), 400
# Signal cycle detection
trigger_type = data.get('trigger_type', '')
then_actions = data.get('then_actions', [])
if automation_engine and (trigger_type == 'signal_received' or any(t.get('type') == 'fire_signal' for t in then_actions)):
all_autos = db.get_automations()
# Replace the automation being edited with the updated version
test_autos = []
for a in all_autos:
if a['id'] == automation_id:
merged = dict(a)
if 'trigger_type' in data:
merged['trigger_type'] = data['trigger_type']
if 'trigger_config' in data:
merged['trigger_config'] = json.dumps(data['trigger_config'])
if 'then_actions' in data:
merged['then_actions'] = json.dumps(data['then_actions'])
merged['enabled'] = True
test_autos.append(merged)
else:
test_autos.append(a)
cycle = automation_engine.detect_signal_cycles(test_autos)
if cycle:
return jsonify({"error": f"Signal cycle detected: {''.join(cycle)}. This would cause an infinite loop."}), 400
success = db.update_automation(automation_id, **update_fields)
if not success:
return jsonify({"error": "Automation not found"}), 404
# Reschedule
if automation_engine:
auto = db.get_automation(automation_id)
if auto and auto.get('enabled'):
automation_engine.schedule_automation(automation_id)
else:
automation_engine.cancel_automation(automation_id)
return jsonify({"success": True})
body, status = _auto_api.update_automation(get_database(), automation_engine, automation_id, request.get_json())
return jsonify(body), status
except Exception as e:
logger.error(f"Error updating automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/group', methods=['PUT'])
def batch_update_automation_group():
"""Batch update group_name for multiple automations (rename group, delete group, drag-drop)."""
"""Batch update group_name for multiple automations."""
try:
data = request.get_json()
automation_ids = data.get('automation_ids', [])
group_name = data.get('group_name') # None/null = ungroup
if not automation_ids or not isinstance(automation_ids, list):
return jsonify({"error": "automation_ids must be a non-empty list"}), 400
# Sanitize IDs to integers
try:
automation_ids = [int(aid) for aid in automation_ids]
except (ValueError, TypeError):
return jsonify({"error": "automation_ids must contain integers"}), 400
db = get_database()
updated = db.batch_update_group(automation_ids, group_name)
return jsonify({"success": True, "updated": updated})
body, status = _auto_api.batch_update_group(get_database(), data.get('automation_ids', []), data.get('group_name'))
return jsonify(body), status
except Exception as e:
logger.error(f"Error batch updating automation group: {e}")
return jsonify({"error": str(e)}), 500
@ -6635,31 +6400,9 @@ def bulk_toggle_automations():
"""Bulk enable/disable multiple automations."""
try:
data = request.get_json()
automation_ids = data.get('automation_ids', [])
enabled = data.get('enabled', True)
if not automation_ids or not isinstance(automation_ids, list):
return jsonify({"error": "automation_ids must be a non-empty list"}), 400
try:
automation_ids = [int(aid) for aid in automation_ids]
except (ValueError, TypeError):
return jsonify({"error": "automation_ids must contain integers"}), 400
db = get_database()
updated = db.bulk_set_enabled(automation_ids, bool(enabled))
# Reschedule/cancel affected automations
if automation_engine and updated > 0:
for aid in automation_ids:
auto = db.get_automation(aid)
if auto:
if auto.get('enabled'):
automation_engine.schedule_automation(auto)
else:
automation_engine.cancel_automation(aid)
return jsonify({"success": True, "updated": updated})
body, status = _auto_api.bulk_toggle(get_database(), automation_engine,
data.get('automation_ids', []), data.get('enabled', True))
return jsonify(body), status
except Exception as e:
logger.error(f"Error bulk toggling automations: {e}")
return jsonify({"error": str(e)}), 500
@ -6669,159 +6412,71 @@ def bulk_toggle_automations():
def delete_automation_endpoint(automation_id):
"""Delete an automation. System automations cannot be deleted."""
try:
db = get_database()
auto = db.get_automation(automation_id)
if auto and auto.get('is_system'):
return jsonify({"error": "System automations cannot be deleted"}), 403
if automation_engine:
automation_engine.cancel_automation(automation_id)
success = db.delete_automation(automation_id)
if not success:
return jsonify({"error": "Automation not found"}), 404
return jsonify({"success": True})
body, status = _auto_api.delete_automation(get_database(), automation_engine, automation_id)
return jsonify(body), status
except Exception as e:
logger.error(f"Error deleting automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/<int:automation_id>/duplicate', methods=['POST'])
def duplicate_automation_endpoint(automation_id):
"""Duplicate an automation. System automations cannot be duplicated."""
try:
db = get_database()
auto = db.get_automation(automation_id)
if not auto:
return jsonify({"error": "Automation not found"}), 404
if auto.get('is_system'):
return jsonify({"error": "System automations cannot be duplicated"}), 403
profile_id = session.get('profile_id', 1)
new_id = db.create_automation(
name=f"{auto['name']} (Copy)",
trigger_type=auto['trigger_type'],
trigger_config=auto.get('trigger_config', '{}'),
action_type=auto['action_type'],
action_config=auto.get('action_config', '{}'),
profile_id=profile_id,
notify_type=auto.get('notify_type'),
notify_config=auto.get('notify_config', '{}'),
then_actions=auto.get('then_actions', '[]'),
group_name=auto.get('group_name'),
)
if new_id is None:
return jsonify({"error": "Failed to duplicate automation"}), 500
if automation_engine:
automation_engine.schedule_automation(new_id)
return jsonify({"success": True, "id": new_id})
body, status = _auto_api.duplicate_automation(get_database(), automation_engine, profile_id, automation_id)
return jsonify(body), status
except Exception as e:
logger.error(f"Error duplicating automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/<int:automation_id>/toggle', methods=['POST'])
def toggle_automation_endpoint(automation_id):
"""Toggle an automation's enabled state."""
try:
db = get_database()
success = db.toggle_automation(automation_id)
if not success:
return jsonify({"error": "Automation not found"}), 404
# Reschedule or cancel based on new state
if automation_engine:
auto = db.get_automation(automation_id)
if auto and auto.get('enabled'):
automation_engine.schedule_automation(automation_id)
else:
automation_engine.cancel_automation(automation_id)
return jsonify({"success": True})
body, status = _auto_api.toggle_automation(get_database(), automation_engine, automation_id)
return jsonify(body), status
except Exception as e:
logger.error(f"Error toggling automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/<int:automation_id>/run', methods=['POST'])
def run_automation_endpoint(automation_id):
"""Manually trigger an automation."""
try:
if not automation_engine:
return jsonify({"error": "Automation engine not available"}), 500
success = automation_engine.run_now(automation_id, profile_id=get_current_profile_id())
if not success:
return jsonify({"error": "Automation not found"}), 404
return jsonify({"success": True})
body, status = _auto_api.run_automation(automation_engine, automation_id, get_current_profile_id())
return jsonify(body), status
except Exception as e:
logger.error(f"Error running automation: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/progress', methods=['GET'])
def get_automation_progress():
"""Get current progress state for all running/recently finished automations."""
try:
with automation_progress_lock:
result = {}
for aid, state in automation_progress_states.items():
if state['status'] in ('running', 'finished', 'error'):
cp = dict(state)
cp['log'] = list(state['log'])
result[str(aid)] = cp
return jsonify(result)
return jsonify(_auto_progress.get_running_progress())
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/automations/<int:automation_id>/history', methods=['GET'])
def get_automation_history(automation_id):
"""Get run history for a specific automation."""
try:
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
db = get_database()
data = db.get_automation_run_history(automation_id, limit=limit, offset=offset)
# Parse log_lines JSON strings for the frontend
for entry in data.get('history', []):
if entry.get('log_lines'):
try:
entry['log_lines'] = json.loads(entry['log_lines'])
except (json.JSONDecodeError, TypeError):
entry['log_lines'] = []
else:
entry['log_lines'] = []
if entry.get('result_json'):
try:
entry['result_json'] = json.loads(entry['result_json'])
except (json.JSONDecodeError, TypeError):
pass
data['automation_id'] = automation_id
return jsonify(data)
return jsonify(_auto_api.get_history(get_database(), automation_id, limit=limit, offset=offset))
except Exception as e:
logger.error(f"Error getting automation history: {e}")
return jsonify({"error": str(e)}), 500
def _collect_known_signals():
"""Collect all signal names used across automations (for autocomplete)."""
signals = set()
try:
db = get_database()
for auto in db.get_automations():
# Check signal_received triggers
if auto.get('trigger_type') == 'signal_received':
try:
tc = json.loads(auto.get('trigger_config') or '{}')
sig = tc.get('signal_name', '').strip()
if sig:
signals.add(sig)
except (json.JSONDecodeError, TypeError):
pass
# Check fire_signal in then_actions
try:
ta = json.loads(auto.get('then_actions') or '[]')
for item in ta:
if item.get('type') == 'fire_signal':
sig = item.get('config', {}).get('signal_name', '').strip()
if sig:
signals.add(sig)
except (json.JSONDecodeError, TypeError):
pass
except Exception:
pass
return sorted(signals)
return _auto_signals.collect_known_signals(get_database())
@app.route('/api/scripts', methods=['GET'])
def list_available_scripts():
@ -6851,202 +6506,10 @@ def list_available_scripts():
def get_automation_blocks():
"""Return available block types for the automation builder sidebar."""
return jsonify({
"triggers": [
{"type": "schedule", "label": "Schedule", "icon": "clock", "description": "Run on a timer interval", "available": True,
"config_fields": [
{"key": "interval", "type": "number", "label": "Every", "default": 6, "min": 1},
{"key": "unit", "type": "select", "label": "Unit",
"options": [{"value": "minutes", "label": "Minutes"}, {"value": "hours", "label": "Hours"}, {"value": "days", "label": "Days"}],
"default": "hours"}
]},
{"type": "daily_time", "label": "Daily Time", "icon": "clock", "description": "Run every day at a specific time", "available": True,
"config_fields": [
{"key": "time", "type": "time", "label": "At", "default": "03:00"}
]},
{"type": "weekly_time", "label": "Weekly Schedule", "icon": "calendar", "description": "Run on specific days of the week at a set time", "available": True,
"config_fields": [
{"key": "time", "type": "time", "label": "At", "default": "03:00"},
{"key": "days", "type": "multi_select", "label": "Days",
"options": [{"value": "mon", "label": "Mon"}, {"value": "tue", "label": "Tue"}, {"value": "wed", "label": "Wed"},
{"value": "thu", "label": "Thu"}, {"value": "fri", "label": "Fri"}, {"value": "sat", "label": "Sat"}, {"value": "sun", "label": "Sun"}]}
]},
{"type": "app_started", "label": "App Started", "icon": "power", "description": "When SoulSync starts up", "available": True},
{"type": "track_downloaded", "label": "Track Downloaded", "icon": "download", "description": "When a track finishes downloading", "available": True,
"has_conditions": True,
"condition_fields": ["artist", "title", "album", "quality"],
"variables": ["artist", "title", "album", "quality"]},
{"type": "batch_complete", "label": "Batch Complete", "icon": "check-circle", "description": "When an album/playlist download finishes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "completed_tracks", "failed_tracks"]},
{"type": "watchlist_new_release", "label": "New Release Found", "icon": "bell", "description": "When watchlist detects new music", "available": True,
"has_conditions": True,
"condition_fields": ["artist"],
"variables": ["artist", "new_tracks", "added_to_wishlist"]},
{"type": "playlist_synced", "label": "Playlist Synced", "icon": "refresh", "description": "When a playlist sync completes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "matched_tracks", "synced_tracks", "failed_tracks"]},
{"type": "playlist_changed", "label": "Playlist Changed", "icon": "edit", "description": "When a mirrored playlist detects track changes from source", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "old_count", "new_count", "added", "removed"]},
{"type": "discovery_completed", "label": "Discovery Complete", "icon": "search", "description": "When playlist track discovery finishes", "available": True,
"has_conditions": True,
"condition_fields": ["playlist_name"],
"variables": ["playlist_name", "total_tracks", "discovered_count", "failed_count", "skipped_count"]},
# Phase 3 triggers
{"type": "wishlist_processing_completed", "label": "Wishlist Processed", "icon": "check-circle",
"description": "When auto-wishlist processing finishes", "available": True,
"variables": ["tracks_processed", "tracks_found", "tracks_failed"]},
{"type": "watchlist_scan_completed", "label": "Watchlist Scan Done", "icon": "check-circle",
"description": "When watchlist scan finishes", "available": True,
"variables": ["artists_scanned", "new_tracks_found", "tracks_added"]},
{"type": "database_update_completed", "label": "Database Updated", "icon": "database",
"description": "When library database refresh finishes", "available": True,
"variables": ["total_artists", "total_albums", "total_tracks"]},
{"type": "library_scan_completed", "label": "Library Scan Done", "icon": "hard-drive",
"description": "When media library scan finishes", "available": True,
"variables": ["server_type"]},
{"type": "download_failed", "label": "Download Failed", "icon": "x-circle",
"description": "When a track permanently fails to download", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title", "reason"],
"variables": ["artist", "title", "reason"]},
{"type": "download_quarantined", "label": "File Quarantined", "icon": "alert-triangle",
"description": "When AcoustID verification fails", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title"],
"variables": ["artist", "title", "reason"]},
{"type": "wishlist_item_added", "label": "Wishlist Item Added", "icon": "plus-circle",
"description": "When a track is added to wishlist", "available": True,
"has_conditions": True, "condition_fields": ["artist", "title"],
"variables": ["artist", "title", "reason"]},
{"type": "watchlist_artist_added", "label": "Artist Watched", "icon": "user-plus",
"description": "When an artist is added to watchlist", "available": True,
"has_conditions": True, "condition_fields": ["artist"],
"variables": ["artist", "artist_id"]},
{"type": "watchlist_artist_removed", "label": "Artist Unwatched", "icon": "user-minus",
"description": "When an artist is removed from watchlist", "available": True,
"has_conditions": True, "condition_fields": ["artist"],
"variables": ["artist", "artist_id"]},
{"type": "import_completed", "label": "Import Complete", "icon": "upload",
"description": "When album/track import finishes", "available": True,
"has_conditions": True, "condition_fields": ["artist", "album_name"],
"variables": ["track_count", "album_name", "artist"]},
{"type": "mirrored_playlist_created", "label": "Playlist Mirrored", "icon": "copy",
"description": "When a new playlist is mirrored", "available": True,
"has_conditions": True, "condition_fields": ["playlist_name", "source"],
"variables": ["playlist_name", "source", "track_count"]},
{"type": "quality_scan_completed", "label": "Quality Scan Done", "icon": "bar-chart",
"description": "When quality scan finishes", "available": True,
"variables": ["quality_met", "low_quality", "total_scanned"]},
{"type": "duplicate_scan_completed", "label": "Duplicate Scan Done", "icon": "layers",
"description": "When duplicate cleaner finishes", "available": True,
"variables": ["files_scanned", "duplicates_found", "space_freed"]},
# Signal trigger
{"type": "signal_received", "label": "Signal Received", "icon": "zap",
"description": "When another automation fires a named signal", "available": True,
"config_fields": [
{"key": "signal_name", "type": "signal_input", "label": "Signal Name"}
],
"variables": ["signal_name"]},
# Webhook trigger
{"type": "webhook_received", "label": "Webhook Received", "icon": "globe",
"description": "When an external API request is received (POST /api/v1/request)", "available": True,
"variables": ["query", "request_id", "source"]},
],
"actions": [
{"type": "process_wishlist", "label": "Process Wishlist", "icon": "list", "description": "Retry failed downloads from wishlist", "available": True,
"config_fields": [{"key": "category", "type": "select", "label": "Category", "options": [{"value": "all", "label": "All"}, {"value": "albums", "label": "Albums"}, {"value": "singles", "label": "Singles"}], "default": "all"}]},
{"type": "scan_watchlist", "label": "Scan Watchlist", "icon": "eye", "description": "Check watched artists for new releases", "available": True},
{"type": "scan_library", "label": "Scan Library", "icon": "refresh", "description": "Trigger media server library scan", "available": True},
{"type": "refresh_mirrored", "label": "Refresh Mirrored Playlist", "icon": "copy", "description": "Re-fetch playlist from source and update mirror", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Refresh all mirrored playlists", "default": False}
]},
{"type": "sync_playlist", "label": "Sync Playlist", "icon": "sync", "description": "Sync mirrored playlist to media server", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}
]},
{"type": "discover_playlist", "label": "Discover Playlist", "icon": "search", "description": "Find official Spotify/iTunes metadata for mirrored playlist tracks", "available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Discover all mirrored playlists", "default": False}
]},
{"type": "playlist_pipeline", "label": "Playlist Pipeline", "icon": "rocket",
"description": "Full lifecycle: refresh → discover → sync → download missing. One automation for the entire flow.",
"available": True,
"config_fields": [
{"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"},
{"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False},
{"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False},
]},
{"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True},
# Phase 3 actions
{"type": "start_database_update", "label": "Update Database", "icon": "database",
"description": "Trigger library database refresh", "available": True,
"config_fields": [
{"key": "full_refresh", "type": "checkbox", "label": "Full refresh (slower)", "default": False}
]},
{"type": "run_duplicate_cleaner", "label": "Run Duplicate Cleaner", "icon": "layers",
"description": "Scan for and remove duplicate files", "available": True},
{"type": "clear_quarantine", "label": "Clear Quarantine", "icon": "trash",
"description": "Delete all quarantined files", "available": True},
{"type": "cleanup_wishlist", "label": "Clean Up Wishlist", "icon": "filter",
"description": "Remove duplicate/owned tracks from wishlist", "available": True},
{"type": "update_discovery_pool", "label": "Update Discovery", "icon": "compass",
"description": "Refresh discovery pool with new tracks", "available": True},
{"type": "start_quality_scan", "label": "Run Quality Scan", "icon": "bar-chart",
"description": "Scan for low-quality audio files", "available": True,
"config_fields": [
{"key": "scope", "type": "select", "label": "Scope",
"options": [{"value": "watchlist", "label": "Watchlist Artists"}, {"value": "library", "label": "Full Library"}],
"default": "watchlist"}
]},
{"type": "backup_database", "label": "Backup Database", "icon": "save",
"description": "Create timestamped database backup", "available": True},
{"type": "refresh_beatport_cache", "label": "Refresh Beatport Cache", "icon": "music",
"description": "Scrape Beatport homepage and warm the cache", "available": True},
{"type": "clean_search_history", "label": "Clean Search History", "icon": "trash-2",
"description": "Remove old searches from Soulseek", "available": True},
{"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square",
"description": "Clear completed downloads and empty directories", "available": True},
{"type": "full_cleanup", "label": "Full Cleanup", "icon": "trash",
"description": "Clear quarantine, download queue, import folder, and search history in one sweep", "available": True},
{"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search",
"description": "Full library comparison without losing enrichment data", "available": True},
{"type": "run_script", "label": "Run Script", "icon": "terminal",
"description": "Execute a script from the scripts folder", "available": True},
{"type": "search_and_download", "label": "Search & Download", "icon": "download",
"description": "Search for a track and download the best match", "available": True,
"config_fields": [
{"key": "query", "type": "text", "label": "Search Query",
"placeholder": "Artist - Track (leave empty to use trigger's query)"}
]},
],
"notifications": [
{"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "pushbullet", "label": "Pushbullet", "icon": "push", "description": "Push notification to phone/desktop", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "telegram", "label": "Telegram", "icon": "message", "description": "Send a Telegram message", "available": True,
"variables": ["time", "name", "run_count", "status"]},
{"type": "webhook", "label": "Webhook (POST)", "icon": "globe", "description": "Send a POST request to any URL", "available": True,
"variables": ["time", "name", "run_count", "status"]},
# Signal fire action
{"type": "fire_signal", "label": "Fire Signal", "icon": "zap",
"description": "Fire a signal that other automations can listen for", "available": True,
"config_fields": [
{"key": "signal_name", "type": "signal_input", "label": "Signal Name"}
]},
# Run script then-action
{"type": "run_script", "label": "Run Script", "icon": "terminal",
"description": "Execute a script after the action completes", "available": True,
"config_fields": [
{"key": "script_name", "type": "script_select", "label": "Script"}
]},
],
"known_signals": _collect_known_signals(),
'triggers': _auto_blocks.TRIGGERS,
'actions': _auto_blocks.ACTIONS,
'notifications': _auto_blocks.NOTIFICATIONS,
'known_signals': _collect_known_signals(),
})
@app.route('/api/mirrored-playlists/list', methods=['GET'])
@ -47981,47 +47444,10 @@ def _emit_scan_status_loop():
def _emit_automation_progress_loop():
"""Push automation:progress events every 1 second for running automations."""
while not globals().get('IS_SHUTTING_DOWN', False):
socketio.sleep(1)
try:
with automation_progress_lock:
active = {}
stale = []
now = datetime.now()
for aid, state in automation_progress_states.items():
if state['status'] == 'running':
# Timeout zombie running states after 2 hours
try:
started = datetime.fromisoformat(state.get('started_at', ''))
if (now - started).total_seconds() > 7200:
state['status'] = 'error'
state['phase'] = 'Timed out'
state['finished_at'] = now.isoformat()
state['log'].append({'type': 'error', 'text': 'Timed out after 2 hours'})
# Emit error state before cleanup so frontend sees it
cp = dict(state)
cp['log'] = list(state['log'])
active[str(aid)] = cp
continue
except (ValueError, TypeError):
pass
cp = dict(state)
cp['log'] = list(state['log'])
active[str(aid)] = cp
elif state['status'] in ('finished', 'error') and state.get('finished_at'):
# Clean up finished states after 60 seconds (frontend already got final emit)
try:
finished_time = datetime.fromisoformat(state['finished_at'])
if (now - finished_time).total_seconds() > 60:
stale.append(aid)
except (ValueError, TypeError):
stale.append(aid)
for aid in stale:
del automation_progress_states[aid]
if active:
socketio.emit('automation:progress', active)
except Exception as e:
logger.debug(f"Error emitting automation progress: {e}")
_auto_progress.emit_progress_loop(
socketio,
is_shutting_down=lambda: globals().get('IS_SHUTTING_DOWN', False),
)
def _emit_repair_progress_loop():
"""Push repair:progress events every 1 second for running repair jobs."""

@ -3451,6 +3451,7 @@ const WHATS_NEW = {
{ title: 'Service Worker for Cover Art + Installable PWA', desc: 'cover art used to re-fetch from the cdn on every library / discover page visit. now a service worker caches images locally — second visit serves art instantly from disk, no network hit. also added a pwa manifest so soulsync can be installed to home screen / desktop as a standalone app (chrome / edge / safari → install soulsync). cache versioned so future strategy changes invalidate cleanly.' },
{ title: 'Stats Endpoints Lifted to core/stats', desc: 'internal — moved /api/stats/* and /api/listening-stats/* logic out of web_server.py into core/stats/queries.py with full test coverage. no behavior change. step toward breaking up the web_server.py monolith.' },
{ title: 'Search Endpoints Lifted to core/search', desc: 'internal — moved /api/search and /api/enhanced-search/* logic into core/search/ (cache, sources, library_check, stream, basic, orchestrator). 612 fewer lines in web_server.py, 94 new tests. no behavior change.' },
{ title: 'Automation Endpoints Lifted to core/automation', desc: 'internal — moved /api/automations/* CRUD + run + history routes, progress tracking helpers, and signal collection into core/automation/ (api, progress, signals). 383 fewer lines in web_server.py, 72 new tests. action handler registration stays put — those closures are tangled with feature implementations.' },
],
'2.4.0': [
// --- April 26, 2026 — Search & Artists unification + reorganize queue ---

Loading…
Cancel
Save