diff --git a/core/automation/__init__.py b/core/automation/__init__.py new file mode 100644 index 00000000..7841a13e --- /dev/null +++ b/core/automation/__init__.py @@ -0,0 +1,7 @@ +"""Automation API + progress tracking helpers package. + +Lifted from web_server.py /api/automations/* routes and progress +emitters. The action handler registration (`_register_automation_handlers`) +stays in web_server.py because each handler closure is tightly coupled +to other application features. +""" diff --git a/core/automation/api.py b/core/automation/api.py new file mode 100644 index 00000000..a6f78afe --- /dev/null +++ b/core/automation/api.py @@ -0,0 +1,367 @@ +"""Automation REST API helpers. + +CRUD + run + progress + history logic for /api/automations/* routes. +Each function takes the deps it needs (database, automation_engine, +profile_id) so the route layer is left as pure HTTP shuffling. + +Out of scope: +- /api/automations/blocks — static JSON + one call into signals.py; + stays inline in web_server.py for now. +- /api/test/automation — touches scan manager + media clients + + config_manager; stays inline. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Hydration helpers — convert raw DB rows to API-friendly dicts +# --------------------------------------------------------------------------- + +_JSON_FIELDS = ('trigger_config', 'action_config', 'notify_config', 'last_result') +_JSON_DEFAULT_DICT = {'trigger_config', 'action_config', 'notify_config'} + + +def _hydrate_automation(auto: dict) -> dict: + """Parse JSON columns and backfill `then_actions` from legacy notify_*.""" + for field in _JSON_FIELDS: + try: + auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field] + except (json.JSONDecodeError, TypeError): + auto[field] = {} if field in _JSON_DEFAULT_DICT else None + + try: + raw = auto.get('then_actions') + auto['then_actions'] = json.loads(raw or '[]') if isinstance(raw, str) else (raw or []) + except (json.JSONDecodeError, TypeError): + auto['then_actions'] = [] + + if not auto['then_actions'] and auto.get('notify_type'): + auto['then_actions'] = [{ + 'type': auto['notify_type'], + 'config': auto.get('notify_config', {}), + }] + return auto + + +# --------------------------------------------------------------------------- +# Signal cycle detection +# --------------------------------------------------------------------------- + +def _has_signal_concern(trigger_type: str, then_actions: list[dict]) -> bool: + return trigger_type == 'signal_received' or any( + t.get('type') == 'fire_signal' for t in then_actions + ) + + +def _check_create_cycle( + automation_engine, + database, + profile_id: int, + trigger_type: str, + trigger_config_json: str, + then_actions_json: str, + then_actions: list[dict], +) -> Optional[str]: + """Return cycle path string if creating this automation would loop, else None.""" + if not automation_engine or not _has_signal_concern(trigger_type, then_actions): + return None + all_autos = database.get_automations(profile_id) + test_auto = { + 'trigger_type': trigger_type, + 'trigger_config': trigger_config_json, + 'then_actions': then_actions_json, + 'enabled': True, + } + all_autos.append(test_auto) + cycle = automation_engine.detect_signal_cycles(all_autos) + if cycle: + return ' → '.join(cycle) + return None + + +def _check_update_cycle( + automation_engine, + database, + automation_id: int, + data: dict, +) -> Optional[str]: + """Return cycle path string if updating this automation would loop, else None.""" + if not automation_engine: + return None + trigger_type = data.get('trigger_type', '') + then_actions = data.get('then_actions', []) + if not _has_signal_concern(trigger_type, then_actions): + return None + + all_autos = database.get_automations() + test_autos = [] + for a in all_autos: + if a['id'] == automation_id: + merged = dict(a) + if 'trigger_type' in data: + merged['trigger_type'] = data['trigger_type'] + if 'trigger_config' in data: + merged['trigger_config'] = json.dumps(data['trigger_config']) + if 'then_actions' in data: + merged['then_actions'] = json.dumps(data['then_actions']) + merged['enabled'] = True + test_autos.append(merged) + else: + test_autos.append(a) + cycle = automation_engine.detect_signal_cycles(test_autos) + if cycle: + return ' → '.join(cycle) + return None + + +# --------------------------------------------------------------------------- +# CRUD helpers — return (response_dict, http_status) +# --------------------------------------------------------------------------- + +def list_automations(database, profile_id: int) -> list[dict]: + """All automations for the profile, with JSON columns parsed.""" + automations = database.get_automations(profile_id) + return [_hydrate_automation(a) for a in automations] + + +def get_automation(database, automation_id: int) -> Optional[dict]: + """One automation, hydrated. Returns None if not found.""" + auto = database.get_automation(automation_id) + if not auto: + return None + return _hydrate_automation(auto) + + +def create_automation( + database, + automation_engine, + profile_id: int, + data: dict, +) -> tuple[dict, int]: + """Create + schedule an automation. Returns (response_body, http_status).""" + name = (data.get('name') or '').strip() + if not name: + return {'error': 'Name is required'}, 400 + + trigger_type = data.get('trigger_type', 'schedule') + trigger_config = json.dumps(data.get('trigger_config', {})) + action_type = data.get('action_type', 'process_wishlist') + action_config = json.dumps(data.get('action_config', {})) + then_actions = data.get('then_actions', []) + then_actions_json = json.dumps(then_actions) + + if then_actions: + notify_type = then_actions[0].get('type') + notify_config = json.dumps(then_actions[0].get('config', {})) + else: + notify_type = data.get('notify_type') or None + notify_config = json.dumps(data.get('notify_config', {})) if notify_type else '{}' + + cycle_path = _check_create_cycle( + automation_engine, database, profile_id, + trigger_type, trigger_config, then_actions_json, then_actions, + ) + if cycle_path: + return {'error': f'Signal cycle detected: {cycle_path}. This would cause an infinite loop.'}, 400 + + group_name = data.get('group_name') or None + auto_id = database.create_automation( + name, trigger_type, trigger_config, action_type, action_config, + profile_id, notify_type, notify_config, then_actions_json, group_name, + ) + if auto_id is None: + return {'error': 'Failed to create automation'}, 500 + + if automation_engine: + automation_engine.schedule_automation(auto_id) + return {'success': True, 'id': auto_id}, 200 + + +def update_automation( + database, + automation_engine, + automation_id: int, + data: dict, +) -> tuple[dict, int]: + """Update + reschedule an automation. Returns (response_body, http_status).""" + update_fields: dict[str, Any] = {} + if 'name' in data: + update_fields['name'] = data['name'].strip() + if 'trigger_type' in data: + update_fields['trigger_type'] = data['trigger_type'] + if 'trigger_config' in data: + update_fields['trigger_config'] = json.dumps(data['trigger_config']) + if 'action_type' in data: + update_fields['action_type'] = data['action_type'] + if 'action_config' in data: + update_fields['action_config'] = json.dumps(data['action_config']) + if 'then_actions' in data: + then_actions = data['then_actions'] + update_fields['then_actions'] = json.dumps(then_actions) + if then_actions: + update_fields['notify_type'] = then_actions[0].get('type') + update_fields['notify_config'] = json.dumps(then_actions[0].get('config', {})) + else: + update_fields['notify_type'] = None + update_fields['notify_config'] = '{}' + elif 'notify_type' in data: + update_fields['notify_type'] = data['notify_type'] or None + if 'notify_config' in data and 'then_actions' not in data: + update_fields['notify_config'] = json.dumps(data['notify_config']) + if 'group_name' in data: + update_fields['group_name'] = data['group_name'] or None + + if not update_fields: + return {'error': 'No fields to update'}, 400 + + cycle_path = _check_update_cycle(automation_engine, database, automation_id, data) + if cycle_path: + return {'error': f'Signal cycle detected: {cycle_path}. This would cause an infinite loop.'}, 400 + + success = database.update_automation(automation_id, **update_fields) + if not success: + return {'error': 'Automation not found'}, 404 + + if automation_engine: + auto = database.get_automation(automation_id) + if auto and auto.get('enabled'): + automation_engine.schedule_automation(automation_id) + else: + automation_engine.cancel_automation(automation_id) + return {'success': True}, 200 + + +def batch_update_group(database, automation_ids: list, group_name: Optional[str]) -> tuple[dict, int]: + """Move/rename a set of automations into a single group (or ungroup).""" + if not automation_ids or not isinstance(automation_ids, list): + return {'error': 'automation_ids must be a non-empty list'}, 400 + try: + automation_ids = [int(aid) for aid in automation_ids] + except (ValueError, TypeError): + return {'error': 'automation_ids must contain integers'}, 400 + + updated = database.batch_update_group(automation_ids, group_name) + return {'success': True, 'updated': updated}, 200 + + +def bulk_toggle( + database, + automation_engine, + automation_ids: list, + enabled: bool, +) -> tuple[dict, int]: + """Bulk enable/disable a set of automations + reschedule each affected.""" + if not automation_ids or not isinstance(automation_ids, list): + return {'error': 'automation_ids must be a non-empty list'}, 400 + try: + automation_ids = [int(aid) for aid in automation_ids] + except (ValueError, TypeError): + return {'error': 'automation_ids must contain integers'}, 400 + + updated = database.bulk_set_enabled(automation_ids, bool(enabled)) + + if automation_engine and updated > 0: + for aid in automation_ids: + auto = database.get_automation(aid) + if auto: + if auto.get('enabled'): + automation_engine.schedule_automation(auto) + else: + automation_engine.cancel_automation(aid) + return {'success': True, 'updated': updated}, 200 + + +def delete_automation(database, automation_engine, automation_id: int) -> tuple[dict, int]: + """Delete an automation. System automations are protected.""" + auto = database.get_automation(automation_id) + if auto and auto.get('is_system'): + return {'error': 'System automations cannot be deleted'}, 403 + if automation_engine: + automation_engine.cancel_automation(automation_id) + success = database.delete_automation(automation_id) + if not success: + return {'error': 'Automation not found'}, 404 + return {'success': True}, 200 + + +def duplicate_automation( + database, + automation_engine, + profile_id: int, + automation_id: int, +) -> tuple[dict, int]: + """Duplicate an automation. System automations are protected.""" + auto = database.get_automation(automation_id) + if not auto: + return {'error': 'Automation not found'}, 404 + if auto.get('is_system'): + return {'error': 'System automations cannot be duplicated'}, 403 + new_id = database.create_automation( + name=f"{auto['name']} (Copy)", + trigger_type=auto['trigger_type'], + trigger_config=auto.get('trigger_config', '{}'), + action_type=auto['action_type'], + action_config=auto.get('action_config', '{}'), + profile_id=profile_id, + notify_type=auto.get('notify_type'), + notify_config=auto.get('notify_config', '{}'), + then_actions=auto.get('then_actions', '[]'), + group_name=auto.get('group_name'), + ) + if new_id is None: + return {'error': 'Failed to duplicate automation'}, 500 + if automation_engine: + automation_engine.schedule_automation(new_id) + return {'success': True, 'id': new_id}, 200 + + +def toggle_automation(database, automation_engine, automation_id: int) -> tuple[dict, int]: + """Toggle an automation's enabled state + reschedule/cancel.""" + success = database.toggle_automation(automation_id) + if not success: + return {'error': 'Automation not found'}, 404 + + if automation_engine: + auto = database.get_automation(automation_id) + if auto and auto.get('enabled'): + automation_engine.schedule_automation(automation_id) + else: + automation_engine.cancel_automation(automation_id) + return {'success': True}, 200 + + +def run_automation(automation_engine, automation_id: int, profile_id: int) -> tuple[dict, int]: + """Manually trigger an automation.""" + if not automation_engine: + return {'error': 'Automation engine not available'}, 500 + success = automation_engine.run_now(automation_id, profile_id=profile_id) + if not success: + return {'error': 'Automation not found'}, 404 + return {'success': True}, 200 + + +def get_history(database, automation_id: int, *, limit: int, offset: int) -> dict: + """Run-history page for an automation, with log_lines/result_json parsed.""" + data = database.get_automation_run_history(automation_id, limit=limit, offset=offset) + for entry in data.get('history', []): + if entry.get('log_lines'): + try: + entry['log_lines'] = json.loads(entry['log_lines']) + except (json.JSONDecodeError, TypeError): + entry['log_lines'] = [] + else: + entry['log_lines'] = [] + if entry.get('result_json'): + try: + entry['result_json'] = json.loads(entry['result_json']) + except (json.JSONDecodeError, TypeError): + pass + data['automation_id'] = automation_id + return data diff --git a/core/automation/blocks.py b/core/automation/blocks.py new file mode 100644 index 00000000..c84d1b37 --- /dev/null +++ b/core/automation/blocks.py @@ -0,0 +1,215 @@ +"""Static block definitions for the automation builder UI. + +Returned verbatim by `/api/automations/blocks` (with `known_signals` +injected by the route from `signals.collect_known_signals`). + +Three top-level lists: +- `TRIGGERS` — WHEN blocks: schedule, daily/weekly time, app started, + event triggers (track_downloaded, batch_complete, etc.), signal_received, + webhook_received. +- `ACTIONS` — DO blocks: process_wishlist, scan_library, etc. +- `NOTIFICATIONS` — THEN blocks: discord/pushbullet/telegram/webhook, + plus fire_signal and run_script then-actions. +""" + +from __future__ import annotations + +TRIGGERS: list[dict] = [ + {"type": "schedule", "label": "Schedule", "icon": "clock", "description": "Run on a timer interval", "available": True, + "config_fields": [ + {"key": "interval", "type": "number", "label": "Every", "default": 6, "min": 1}, + {"key": "unit", "type": "select", "label": "Unit", + "options": [{"value": "minutes", "label": "Minutes"}, {"value": "hours", "label": "Hours"}, {"value": "days", "label": "Days"}], + "default": "hours"} + ]}, + {"type": "daily_time", "label": "Daily Time", "icon": "clock", "description": "Run every day at a specific time", "available": True, + "config_fields": [ + {"key": "time", "type": "time", "label": "At", "default": "03:00"} + ]}, + {"type": "weekly_time", "label": "Weekly Schedule", "icon": "calendar", "description": "Run on specific days of the week at a set time", "available": True, + "config_fields": [ + {"key": "time", "type": "time", "label": "At", "default": "03:00"}, + {"key": "days", "type": "multi_select", "label": "Days", + "options": [{"value": "mon", "label": "Mon"}, {"value": "tue", "label": "Tue"}, {"value": "wed", "label": "Wed"}, + {"value": "thu", "label": "Thu"}, {"value": "fri", "label": "Fri"}, {"value": "sat", "label": "Sat"}, {"value": "sun", "label": "Sun"}]} + ]}, + {"type": "app_started", "label": "App Started", "icon": "power", "description": "When SoulSync starts up", "available": True}, + {"type": "track_downloaded", "label": "Track Downloaded", "icon": "download", "description": "When a track finishes downloading", "available": True, + "has_conditions": True, + "condition_fields": ["artist", "title", "album", "quality"], + "variables": ["artist", "title", "album", "quality"]}, + {"type": "batch_complete", "label": "Batch Complete", "icon": "check-circle", "description": "When an album/playlist download finishes", "available": True, + "has_conditions": True, + "condition_fields": ["playlist_name"], + "variables": ["playlist_name", "total_tracks", "completed_tracks", "failed_tracks"]}, + {"type": "watchlist_new_release", "label": "New Release Found", "icon": "bell", "description": "When watchlist detects new music", "available": True, + "has_conditions": True, + "condition_fields": ["artist"], + "variables": ["artist", "new_tracks", "added_to_wishlist"]}, + {"type": "playlist_synced", "label": "Playlist Synced", "icon": "refresh", "description": "When a playlist sync completes", "available": True, + "has_conditions": True, + "condition_fields": ["playlist_name"], + "variables": ["playlist_name", "total_tracks", "matched_tracks", "synced_tracks", "failed_tracks"]}, + {"type": "playlist_changed", "label": "Playlist Changed", "icon": "edit", "description": "When a mirrored playlist detects track changes from source", "available": True, + "has_conditions": True, + "condition_fields": ["playlist_name"], + "variables": ["playlist_name", "old_count", "new_count", "added", "removed"]}, + {"type": "discovery_completed", "label": "Discovery Complete", "icon": "search", "description": "When playlist track discovery finishes", "available": True, + "has_conditions": True, + "condition_fields": ["playlist_name"], + "variables": ["playlist_name", "total_tracks", "discovered_count", "failed_count", "skipped_count"]}, + # Phase 3 triggers + {"type": "wishlist_processing_completed", "label": "Wishlist Processed", "icon": "check-circle", + "description": "When auto-wishlist processing finishes", "available": True, + "variables": ["tracks_processed", "tracks_found", "tracks_failed"]}, + {"type": "watchlist_scan_completed", "label": "Watchlist Scan Done", "icon": "check-circle", + "description": "When watchlist scan finishes", "available": True, + "variables": ["artists_scanned", "new_tracks_found", "tracks_added"]}, + {"type": "database_update_completed", "label": "Database Updated", "icon": "database", + "description": "When library database refresh finishes", "available": True, + "variables": ["total_artists", "total_albums", "total_tracks"]}, + {"type": "library_scan_completed", "label": "Library Scan Done", "icon": "hard-drive", + "description": "When media library scan finishes", "available": True, + "variables": ["server_type"]}, + {"type": "download_failed", "label": "Download Failed", "icon": "x-circle", + "description": "When a track permanently fails to download", "available": True, + "has_conditions": True, "condition_fields": ["artist", "title", "reason"], + "variables": ["artist", "title", "reason"]}, + {"type": "download_quarantined", "label": "File Quarantined", "icon": "alert-triangle", + "description": "When AcoustID verification fails", "available": True, + "has_conditions": True, "condition_fields": ["artist", "title"], + "variables": ["artist", "title", "reason"]}, + {"type": "wishlist_item_added", "label": "Wishlist Item Added", "icon": "plus-circle", + "description": "When a track is added to wishlist", "available": True, + "has_conditions": True, "condition_fields": ["artist", "title"], + "variables": ["artist", "title", "reason"]}, + {"type": "watchlist_artist_added", "label": "Artist Watched", "icon": "user-plus", + "description": "When an artist is added to watchlist", "available": True, + "has_conditions": True, "condition_fields": ["artist"], + "variables": ["artist", "artist_id"]}, + {"type": "watchlist_artist_removed", "label": "Artist Unwatched", "icon": "user-minus", + "description": "When an artist is removed from watchlist", "available": True, + "has_conditions": True, "condition_fields": ["artist"], + "variables": ["artist", "artist_id"]}, + {"type": "import_completed", "label": "Import Complete", "icon": "upload", + "description": "When album/track import finishes", "available": True, + "has_conditions": True, "condition_fields": ["artist", "album_name"], + "variables": ["track_count", "album_name", "artist"]}, + {"type": "mirrored_playlist_created", "label": "Playlist Mirrored", "icon": "copy", + "description": "When a new playlist is mirrored", "available": True, + "has_conditions": True, "condition_fields": ["playlist_name", "source"], + "variables": ["playlist_name", "source", "track_count"]}, + {"type": "quality_scan_completed", "label": "Quality Scan Done", "icon": "bar-chart", + "description": "When quality scan finishes", "available": True, + "variables": ["quality_met", "low_quality", "total_scanned"]}, + {"type": "duplicate_scan_completed", "label": "Duplicate Scan Done", "icon": "layers", + "description": "When duplicate cleaner finishes", "available": True, + "variables": ["files_scanned", "duplicates_found", "space_freed"]}, + # Signal trigger + {"type": "signal_received", "label": "Signal Received", "icon": "zap", + "description": "When another automation fires a named signal", "available": True, + "config_fields": [ + {"key": "signal_name", "type": "signal_input", "label": "Signal Name"} + ], + "variables": ["signal_name"]}, + # Webhook trigger + {"type": "webhook_received", "label": "Webhook Received", "icon": "globe", + "description": "When an external API request is received (POST /api/v1/request)", "available": True, + "variables": ["query", "request_id", "source"]}, +] + + +ACTIONS: list[dict] = [ + {"type": "process_wishlist", "label": "Process Wishlist", "icon": "list", "description": "Retry failed downloads from wishlist", "available": True, + "config_fields": [{"key": "category", "type": "select", "label": "Category", "options": [{"value": "all", "label": "All"}, {"value": "albums", "label": "Albums"}, {"value": "singles", "label": "Singles"}], "default": "all"}]}, + {"type": "scan_watchlist", "label": "Scan Watchlist", "icon": "eye", "description": "Check watched artists for new releases", "available": True}, + {"type": "scan_library", "label": "Scan Library", "icon": "refresh", "description": "Trigger media server library scan", "available": True}, + {"type": "refresh_mirrored", "label": "Refresh Mirrored Playlist", "icon": "copy", "description": "Re-fetch playlist from source and update mirror", "available": True, + "config_fields": [ + {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, + {"key": "all", "type": "checkbox", "label": "Refresh all mirrored playlists", "default": False} + ]}, + {"type": "sync_playlist", "label": "Sync Playlist", "icon": "sync", "description": "Sync mirrored playlist to media server", "available": True, + "config_fields": [ + {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"} + ]}, + {"type": "discover_playlist", "label": "Discover Playlist", "icon": "search", "description": "Find official Spotify/iTunes metadata for mirrored playlist tracks", "available": True, + "config_fields": [ + {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, + {"key": "all", "type": "checkbox", "label": "Discover all mirrored playlists", "default": False} + ]}, + {"type": "playlist_pipeline", "label": "Playlist Pipeline", "icon": "rocket", + "description": "Full lifecycle: refresh → discover → sync → download missing. One automation for the entire flow.", + "available": True, + "config_fields": [ + {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, + {"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False}, + {"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False}, + ]}, + {"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True}, + # Phase 3 actions + {"type": "start_database_update", "label": "Update Database", "icon": "database", + "description": "Trigger library database refresh", "available": True, + "config_fields": [ + {"key": "full_refresh", "type": "checkbox", "label": "Full refresh (slower)", "default": False} + ]}, + {"type": "run_duplicate_cleaner", "label": "Run Duplicate Cleaner", "icon": "layers", + "description": "Scan for and remove duplicate files", "available": True}, + {"type": "clear_quarantine", "label": "Clear Quarantine", "icon": "trash", + "description": "Delete all quarantined files", "available": True}, + {"type": "cleanup_wishlist", "label": "Clean Up Wishlist", "icon": "filter", + "description": "Remove duplicate/owned tracks from wishlist", "available": True}, + {"type": "update_discovery_pool", "label": "Update Discovery", "icon": "compass", + "description": "Refresh discovery pool with new tracks", "available": True}, + {"type": "start_quality_scan", "label": "Run Quality Scan", "icon": "bar-chart", + "description": "Scan for low-quality audio files", "available": True, + "config_fields": [ + {"key": "scope", "type": "select", "label": "Scope", + "options": [{"value": "watchlist", "label": "Watchlist Artists"}, {"value": "library", "label": "Full Library"}], + "default": "watchlist"} + ]}, + {"type": "backup_database", "label": "Backup Database", "icon": "save", + "description": "Create timestamped database backup", "available": True}, + {"type": "refresh_beatport_cache", "label": "Refresh Beatport Cache", "icon": "music", + "description": "Scrape Beatport homepage and warm the cache", "available": True}, + {"type": "clean_search_history", "label": "Clean Search History", "icon": "trash-2", + "description": "Remove old searches from Soulseek", "available": True}, + {"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square", + "description": "Clear completed downloads and empty directories", "available": True}, + {"type": "full_cleanup", "label": "Full Cleanup", "icon": "trash", + "description": "Clear quarantine, download queue, import folder, and search history in one sweep", "available": True}, + {"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search", + "description": "Full library comparison without losing enrichment data", "available": True}, + {"type": "run_script", "label": "Run Script", "icon": "terminal", + "description": "Execute a script from the scripts folder", "available": True}, + {"type": "search_and_download", "label": "Search & Download", "icon": "download", + "description": "Search for a track and download the best match", "available": True, + "config_fields": [ + {"key": "query", "type": "text", "label": "Search Query", + "placeholder": "Artist - Track (leave empty to use trigger's query)"} + ]}, +] + + +NOTIFICATIONS: list[dict] = [ + {"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True, + "variables": ["time", "name", "run_count", "status"]}, + {"type": "pushbullet", "label": "Pushbullet", "icon": "push", "description": "Push notification to phone/desktop", "available": True, + "variables": ["time", "name", "run_count", "status"]}, + {"type": "telegram", "label": "Telegram", "icon": "message", "description": "Send a Telegram message", "available": True, + "variables": ["time", "name", "run_count", "status"]}, + {"type": "webhook", "label": "Webhook (POST)", "icon": "globe", "description": "Send a POST request to any URL", "available": True, + "variables": ["time", "name", "run_count", "status"]}, + # Signal fire action + {"type": "fire_signal", "label": "Fire Signal", "icon": "zap", + "description": "Fire a signal that other automations can listen for", "available": True, + "config_fields": [ + {"key": "signal_name", "type": "signal_input", "label": "Signal Name"} + ]}, + # Run script then-action + {"type": "run_script", "label": "Run Script", "icon": "terminal", + "description": "Execute a script after the action completes", "available": True, + "config_fields": [ + {"key": "script_name", "type": "script_select", "label": "Script"} + ]}, +] diff --git a/core/automation/progress.py b/core/automation/progress.py new file mode 100644 index 00000000..8f3e8f50 --- /dev/null +++ b/core/automation/progress.py @@ -0,0 +1,216 @@ +"""Automation progress tracking. + +Owns the in-memory progress state dict that backs both +`/api/automations/progress` polling and the WebSocket +`automation:progress` push emitter. State is per-automation, capped at +50 log entries each, and finished/error states are reaped 60s after +they finish so the frontend has a window to show the final state. + +Functions are written so the route layer / engine callbacks can pass +their own socketio emitter, db handle, and shutdown flag. The progress +state dict (`progress_states`) and its lock (`progress_lock`) are +module-level so all callers share one view — same as the original +web_server.py globals. +""" + +from __future__ import annotations + +import json +import logging +import threading +from datetime import datetime, timezone +from typing import Any, Callable, Optional + +logger = logging.getLogger(__name__) + +# Shared mutable state — module globals so every caller (routes, engine +# progress callbacks, emit loop) sees the same dict. Mirrors the original +# `automation_progress_states` / `automation_progress_lock` in web_server. +progress_states: dict[int, dict] = {} +progress_lock = threading.Lock() + + +def init_progress(automation_id: int, automation_name: str, action_type: str) -> None: + """Initialize progress state when an automation starts running.""" + with progress_lock: + progress_states[automation_id] = { + 'status': 'running', + 'action_type': action_type, + 'progress': 0, + 'phase': 'Starting...', + 'current_item': '', + 'processed': 0, + 'total': 0, + 'log': [{'type': 'info', 'text': f'Starting {automation_name}'}], + 'started_at': datetime.now(timezone.utc).isoformat(), + 'finished_at': None, + } + + +def update_progress( + automation_id: Optional[int], + *, + socketio_emit: Optional[Callable[[str, Any], None]] = None, + **kwargs, +) -> None: + """Update progress state from handler threads. Thread-safe. + + `socketio_emit` lets callers wire in the live socketio.emit so that + finished/error transitions push immediately without waiting for the + 1s emitter loop. Falls back to no-op if not provided. + """ + if automation_id is None: + return + with progress_lock: + state = progress_states.get(automation_id) + if not state: + return + for k, v in kwargs.items(): + if k == 'log_line': + state['log'].append({'type': kwargs.get('log_type', 'info'), 'text': v}) + if len(state['log']) > 50: + state['log'] = state['log'][-50:] + elif k != 'log_type': + state[k] = v + if kwargs.get('status') in ('finished', 'error'): + state['finished_at'] = datetime.now(timezone.utc).isoformat() + if socketio_emit is not None: + try: + socketio_emit('automation:progress', {str(automation_id): dict(state)}) + except Exception: + pass + + +def get_running_progress() -> dict[str, dict]: + """Snapshot of running/finished/error states for the polling endpoint.""" + with progress_lock: + result: dict[str, dict] = {} + for aid, state in progress_states.items(): + if state['status'] in ('running', 'finished', 'error'): + cp = dict(state) + cp['log'] = list(state['log']) + result[str(aid)] = cp + return result + + +def record_history( + automation_id: int, + result: dict, + database, +) -> None: + """Capture progress state into run history before cleanup clears it. + + `database` is passed in so the function works without a `get_database()` + global. + """ + try: + with progress_lock: + state = progress_states.get(automation_id) + if state: + started_at = state.get('started_at') + finished_at = state.get('finished_at') or datetime.now(timezone.utc).isoformat() + log_entries = list(state.get('log', [])) + else: + started_at = datetime.now(timezone.utc).isoformat() + finished_at = datetime.now(timezone.utc).isoformat() + log_entries = [] + + duration = None + if started_at and finished_at: + try: + t0 = datetime.fromisoformat(started_at) + t1 = datetime.fromisoformat(finished_at) + duration = (t1 - t0).total_seconds() + except Exception: + pass + + r_status = result.get('status', 'completed') if result else 'completed' + if r_status == 'error': + status = 'error' + elif r_status == 'skipped': + status = 'skipped' + elif r_status == 'timeout': + status = 'timeout' + else: + status = 'completed' + + summary = None + for entry in reversed(log_entries): + if entry.get('type') in ('success', 'error'): + summary = entry.get('text', '') + break + if not summary and log_entries: + summary = log_entries[-1].get('text', '') + if not summary and result: + summary = result.get('reason') or result.get('error') or result.get('status', '') + + result_json = json.dumps({k: v for k, v in result.items() if not k.startswith('_')}) if result else None + log_json = json.dumps(log_entries) if log_entries else None + + database.insert_automation_run_history( + automation_id=automation_id, + started_at=started_at, + finished_at=finished_at, + duration_seconds=duration, + status=status, + summary=summary, + result_json=result_json, + log_lines=log_json, + ) + except Exception as e: + logger.error(f"Error recording automation history for {automation_id}: {e}") + + +def emit_progress_loop( + socketio, + *, + is_shutting_down: Callable[[], bool], + poll_interval: float = 1.0, + timeout_seconds: int = 7200, + cleanup_after_seconds: int = 60, +) -> None: + """Push `automation:progress` events for active automations. + + Long-running loop — caller wires this into a socketio background task. + - Times out zombie running states after `timeout_seconds` (default 2h). + - Reaps finished/error states `cleanup_after_seconds` after finish so the + frontend has a final-state window before they disappear. + """ + while not is_shutting_down(): + socketio.sleep(poll_interval) + try: + with progress_lock: + active: dict[str, dict] = {} + stale: list[int] = [] + now = datetime.now() + for aid, state in progress_states.items(): + if state['status'] == 'running': + try: + started = datetime.fromisoformat(state.get('started_at', '')) + if (now - started).total_seconds() > timeout_seconds: + state['status'] = 'error' + state['phase'] = 'Timed out' + state['finished_at'] = now.isoformat() + state['log'].append({'type': 'error', 'text': f'Timed out after {timeout_seconds // 3600} hours'}) + cp = dict(state) + cp['log'] = list(state['log']) + active[str(aid)] = cp + continue + except (ValueError, TypeError): + pass + cp = dict(state) + cp['log'] = list(state['log']) + active[str(aid)] = cp + elif state['status'] in ('finished', 'error') and state.get('finished_at'): + try: + finished_time = datetime.fromisoformat(state['finished_at']) + if (now - finished_time).total_seconds() > cleanup_after_seconds: + stale.append(aid) + except (ValueError, TypeError): + stale.append(aid) + for aid in stale: + del progress_states[aid] + if active: + socketio.emit('automation:progress', active) + except Exception as e: + logger.debug(f"Error emitting automation progress: {e}") diff --git a/core/automation/signals.py b/core/automation/signals.py new file mode 100644 index 00000000..a9ce9214 --- /dev/null +++ b/core/automation/signals.py @@ -0,0 +1,43 @@ +"""Automation signal helpers — name collection for autocomplete. + +Signal cycle detection itself lives in core/automation_engine.py +(`detect_signal_cycles`); this module just enumerates known signal +names from the saved automation set so the builder UI can autocomplete. +""" + +from __future__ import annotations + +import json + + +def collect_known_signals(database) -> list[str]: + """Return sorted, deduped signal names referenced by any saved automation. + + Walks every automation and pulls signal names from both the + `signal_received` trigger config and any `fire_signal` then-actions. + Errors at every layer are swallowed — the autocomplete is best-effort. + """ + signals: set[str] = set() + try: + for auto in database.get_automations(): + if auto.get('trigger_type') == 'signal_received': + try: + tc = json.loads(auto.get('trigger_config') or '{}') + sig = tc.get('signal_name', '').strip() + if sig: + signals.add(sig) + except (json.JSONDecodeError, TypeError): + pass + + try: + ta = json.loads(auto.get('then_actions') or '[]') + for item in ta: + if item.get('type') == 'fire_signal': + sig = item.get('config', {}).get('signal_name', '').strip() + if sig: + signals.add(sig) + except (json.JSONDecodeError, TypeError): + pass + except Exception: + pass + return sorted(signals) diff --git a/core/metadata/source.py b/core/metadata/source.py index 31450f77..c0ec788c 100644 --- a/core/metadata/source.py +++ b/core/metadata/source.py @@ -281,7 +281,7 @@ def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_ti "MusicBrainz release details", mb_service.mb_client.get_release, pp["release_mbid"], - includes=["release-groups", "labels", "media", "artist-credits", "recordings"], + includes=["release-groups", "labels", "media", "artist-credits", "recordings", "genres"], ) or {} with mb_release_detail_cache_lock: _bounded_cache_set(mb_release_detail_cache, pp["release_mbid"], release_detail, _MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES) @@ -345,6 +345,31 @@ def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_ti except (ValueError, TypeError): pass + # Genre fallback chain: most MusicBrainz recordings don't carry genres at + # the track level, but releases and artists usually do. If the recording + # came back empty, try the release; if that's empty too, fetch the artist + # with `includes=['genres']` and use that. + _release_detail_for_genres = locals().get("release_detail") + if not pp["mb_genres"] and _release_detail_for_genres: + pp["mb_genres"] = [ + g["name"] for g in sorted( + _release_detail_for_genres.get("genres", []), key=lambda x: x.get("count", 0), reverse=True, + ) + ] + if not pp["mb_genres"] and pp.get("artist_mbid"): + artist_detail = _call_source_lookup( + "MusicBrainz artist details", + mb_service.mb_client.get_artist, + pp["artist_mbid"], + includes=["genres"], + ) + if artist_detail: + pp["mb_genres"] = [ + g["name"] for g in sorted( + artist_detail.get("genres", []), key=lambda x: x.get("count", 0), reverse=True, + ) + ] + def _process_deezer_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("deezer.embed_tags", True) is False: diff --git a/tests/automation/__init__.py b/tests/automation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automation/test_automation_api.py b/tests/automation/test_automation_api.py new file mode 100644 index 00000000..fea7a032 --- /dev/null +++ b/tests/automation/test_automation_api.py @@ -0,0 +1,478 @@ +"""Tests for core/automation/api.py — CRUD + run + history helpers.""" + +from __future__ import annotations + +import json + +from core.automation import api + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + +class _FakeDB: + def __init__(self): + self._next_id = 1 + self.automations: dict[int, dict] = {} + self.history: dict[int, list] = {} + self.batch_group_calls = [] + self.bulk_set_calls = [] + + def get_automations(self, profile_id=None): + if profile_id is None: + return list(self.automations.values()) + return [a for a in self.automations.values() if a.get('profile_id') == profile_id] + + def get_automation(self, automation_id): + return dict(self.automations[automation_id]) if automation_id in self.automations else None + + def create_automation(self, name, trigger_type, trigger_config, action_type, action_config, + profile_id, notify_type, notify_config, then_actions, group_name): + aid = self._next_id + self._next_id += 1 + self.automations[aid] = { + 'id': aid, 'name': name, 'trigger_type': trigger_type, + 'trigger_config': trigger_config, 'action_type': action_type, + 'action_config': action_config, 'profile_id': profile_id, + 'notify_type': notify_type, 'notify_config': notify_config, + 'then_actions': then_actions, 'group_name': group_name, + 'enabled': 1, 'is_system': 0, + } + return aid + + def update_automation(self, automation_id, **fields): + if automation_id not in self.automations: + return False + self.automations[automation_id].update(fields) + return True + + def delete_automation(self, automation_id): + if automation_id not in self.automations: + return False + del self.automations[automation_id] + return True + + def toggle_automation(self, automation_id): + if automation_id not in self.automations: + return False + a = self.automations[automation_id] + a['enabled'] = 0 if a['enabled'] else 1 + return True + + def batch_update_group(self, ids, group_name): + self.batch_group_calls.append((ids, group_name)) + return len(ids) + + def bulk_set_enabled(self, ids, enabled): + self.bulk_set_calls.append((ids, enabled)) + for aid in ids: + if aid in self.automations: + self.automations[aid]['enabled'] = 1 if enabled else 0 + return len(ids) + + def get_automation_run_history(self, automation_id, limit=50, offset=0): + return {'history': self.history.get(automation_id, [])[offset:offset + limit]} + + +class _FakeEngine: + def __init__(self, cycles_to_return=None): + self.scheduled = [] + self.cancelled = [] + self.run_now_calls = [] + self._cycles = cycles_to_return or [] + + def schedule_automation(self, aid): + self.scheduled.append(aid) + + def cancel_automation(self, aid): + self.cancelled.append(aid) + + def detect_signal_cycles(self, autos): + return list(self._cycles) + + def run_now(self, aid, profile_id=None): + self.run_now_calls.append((aid, profile_id)) + return True + + +# --------------------------------------------------------------------------- +# _hydrate_automation +# --------------------------------------------------------------------------- + +def test_hydrate_parses_json_columns(): + raw = { + 'trigger_config': '{"interval": 6}', + 'action_config': '{"category": "all"}', + 'notify_config': '{"webhook": "x"}', + 'last_result': '{"ok": true}', + 'then_actions': '[{"type": "discord", "config": {"webhook": "y"}}]', + 'notify_type': None, + } + out = api._hydrate_automation(dict(raw)) + assert out['trigger_config'] == {'interval': 6} + assert out['action_config'] == {'category': 'all'} + assert out['then_actions'][0]['type'] == 'discord' + + +def test_hydrate_invalid_json_falls_back_to_default(): + raw = { + 'trigger_config': 'not json', + 'action_config': 'not json', + 'notify_config': 'not json', + 'last_result': 'not json', + 'then_actions': 'not json', + 'notify_type': None, + } + out = api._hydrate_automation(dict(raw)) + assert out['trigger_config'] == {} + assert out['action_config'] == {} + assert out['notify_config'] == {} + assert out['last_result'] is None + assert out['then_actions'] == [] + + +def test_hydrate_backfills_then_actions_from_legacy_notify_type(): + raw = { + 'trigger_config': '{}', + 'action_config': '{}', + 'notify_config': {'webhook_url': 'http://x'}, + 'last_result': None, + 'then_actions': '[]', + 'notify_type': 'discord', + } + out = api._hydrate_automation(dict(raw)) + assert out['then_actions'] == [{'type': 'discord', 'config': {'webhook_url': 'http://x'}}] + + +# --------------------------------------------------------------------------- +# list_automations +# --------------------------------------------------------------------------- + +def test_list_automations_filters_by_profile(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'profile_id': 1, 'trigger_config': '{}', 'action_config': '{}', + 'notify_config': '{}', 'last_result': None, 'then_actions': '[]', 'notify_type': None} + db.automations[2] = {'id': 2, 'profile_id': 2, 'trigger_config': '{}', 'action_config': '{}', + 'notify_config': '{}', 'last_result': None, 'then_actions': '[]', 'notify_type': None} + out = api.list_automations(db, profile_id=1) + assert len(out) == 1 + assert out[0]['id'] == 1 + + +# --------------------------------------------------------------------------- +# get_automation +# --------------------------------------------------------------------------- + +def test_get_automation_returns_none_for_missing(): + assert api.get_automation(_FakeDB(), 99) is None + + +def test_get_automation_returns_hydrated(): + db = _FakeDB() + db.automations[5] = {'id': 5, 'trigger_config': '{"x":1}', 'action_config': '{}', + 'notify_config': '{}', 'last_result': None, 'then_actions': '[]', + 'notify_type': None} + out = api.get_automation(db, 5) + assert out['trigger_config'] == {'x': 1} + + +# --------------------------------------------------------------------------- +# create_automation +# --------------------------------------------------------------------------- + +def test_create_requires_name(): + body, status = api.create_automation(_FakeDB(), _FakeEngine(), profile_id=1, data={'name': ' '}) + assert status == 400 + assert 'Name is required' in body['error'] + + +def test_create_happy_path_schedules(): + db = _FakeDB() + eng = _FakeEngine() + body, status = api.create_automation(db, eng, profile_id=1, data={ + 'name': 'My Auto', 'trigger_type': 'schedule', + 'trigger_config': {'interval': 6, 'unit': 'hours'}, + 'action_type': 'process_wishlist', + }) + assert status == 200 + assert body['success'] is True + assert body['id'] == 1 + assert eng.scheduled == [1] + + +def test_create_blocks_signal_cycle(): + eng = _FakeEngine(cycles_to_return=['sig_a', 'sig_b', 'sig_a']) + body, status = api.create_automation(_FakeDB(), eng, profile_id=1, data={ + 'name': 'Loopy', + 'trigger_type': 'signal_received', + 'trigger_config': {'signal_name': 'sig_a'}, + 'then_actions': [{'type': 'fire_signal', 'config': {'signal_name': 'sig_b'}}], + }) + assert status == 400 + assert 'Signal cycle detected' in body['error'] + assert eng.scheduled == [] + + +def test_create_skips_cycle_check_when_no_signals(): + eng = _FakeEngine(cycles_to_return=['shouldnt fire']) + body, status = api.create_automation(_FakeDB(), eng, profile_id=1, data={ + 'name': 'Plain', 'trigger_type': 'schedule', + 'action_type': 'process_wishlist', + 'then_actions': [{'type': 'discord', 'config': {}}], + }) + assert status == 200 + + +def test_create_then_actions_back_compat_first_item_becomes_notify_type(): + db = _FakeDB() + api.create_automation(db, _FakeEngine(), profile_id=1, data={ + 'name': 'X', 'trigger_type': 'schedule', 'action_type': 'process_wishlist', + 'then_actions': [{'type': 'discord', 'config': {'webhook': 'http://x'}}], + }) + stored = db.automations[1] + assert stored['notify_type'] == 'discord' + assert json.loads(stored['notify_config']) == {'webhook': 'http://x'} + + +# --------------------------------------------------------------------------- +# update_automation +# --------------------------------------------------------------------------- + +def test_update_with_no_fields_returns_400(): + body, status = api.update_automation(_FakeDB(), _FakeEngine(), automation_id=1, data={}) + assert status == 400 + + +def test_update_missing_id_returns_404(): + body, status = api.update_automation(_FakeDB(), _FakeEngine(), automation_id=99, data={'name': 'x'}) + assert status == 404 + + +def test_update_blocks_signal_cycle(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'name': 'a', 'trigger_type': 'schedule', 'trigger_config': '{}', + 'then_actions': '[]', 'enabled': 1, 'is_system': 0} + eng = _FakeEngine(cycles_to_return=['sig_a', 'sig_a']) + body, status = api.update_automation(db, eng, automation_id=1, data={ + 'trigger_type': 'signal_received', + 'trigger_config': {'signal_name': 'sig_a'}, + 'then_actions': [{'type': 'fire_signal', 'config': {'signal_name': 'sig_a'}}], + }) + assert status == 400 + assert 'Signal cycle detected' in body['error'] + + +def test_update_reschedules_when_enabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 1} + eng = _FakeEngine() + body, status = api.update_automation(db, eng, automation_id=1, data={'name': 'renamed'}) + assert status == 200 + assert eng.scheduled == [1] + + +def test_update_cancels_when_disabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 0} + eng = _FakeEngine() + body, status = api.update_automation(db, eng, automation_id=1, data={'name': 'r'}) + assert status == 200 + assert eng.cancelled == [1] + + +def test_update_then_actions_clears_notify_when_empty(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'name': 'a', 'enabled': 0} + api.update_automation(db, _FakeEngine(), automation_id=1, data={'then_actions': []}) + assert db.automations[1]['notify_type'] is None + assert db.automations[1]['notify_config'] == '{}' + + +# --------------------------------------------------------------------------- +# batch_update_group +# --------------------------------------------------------------------------- + +def test_batch_group_requires_list(): + body, status = api.batch_update_group(_FakeDB(), [], 'group') + assert status == 400 + + +def test_batch_group_rejects_non_int_ids(): + body, status = api.batch_update_group(_FakeDB(), ['abc'], 'g') + assert status == 400 + + +def test_batch_group_happy_path(): + db = _FakeDB() + body, status = api.batch_update_group(db, [1, 2, 3], 'mygroup') + assert status == 200 + assert body['updated'] == 3 + assert db.batch_group_calls == [([1, 2, 3], 'mygroup')] + + +def test_batch_group_can_ungroup_with_none(): + db = _FakeDB() + body, status = api.batch_update_group(db, [1], None) + assert status == 200 + assert db.batch_group_calls == [([1], None)] + + +# --------------------------------------------------------------------------- +# bulk_toggle +# --------------------------------------------------------------------------- + +def test_bulk_toggle_reschedules_enabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'enabled': 1} + db.automations[2] = {'id': 2, 'enabled': 1} + eng = _FakeEngine() + body, status = api.bulk_toggle(db, eng, [1, 2], enabled=True) + assert status == 200 + assert body['updated'] == 2 + + +def test_bulk_toggle_cancels_disabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'enabled': 1} + eng = _FakeEngine() + api.bulk_toggle(db, eng, [1], enabled=False) + assert eng.cancelled == [1] + + +def test_bulk_toggle_requires_non_empty_list(): + body, status = api.bulk_toggle(_FakeDB(), _FakeEngine(), [], True) + assert status == 400 + + +# --------------------------------------------------------------------------- +# delete_automation +# --------------------------------------------------------------------------- + +def test_delete_protects_system_automations(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'is_system': 1} + body, status = api.delete_automation(db, _FakeEngine(), 1) + assert status == 403 + + +def test_delete_cancels_engine_then_db(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'is_system': 0} + eng = _FakeEngine() + body, status = api.delete_automation(db, eng, 1) + assert status == 200 + assert eng.cancelled == [1] + assert 1 not in db.automations + + +def test_delete_missing_returns_404(): + body, status = api.delete_automation(_FakeDB(), _FakeEngine(), 99) + assert status == 404 + + +# --------------------------------------------------------------------------- +# duplicate_automation +# --------------------------------------------------------------------------- + +def test_duplicate_appends_copy_suffix_and_schedules(): + db = _FakeDB() + db.automations[1] = { + 'id': 1, 'name': 'Orig', 'trigger_type': 'schedule', 'trigger_config': '{}', + 'action_type': 'process_wishlist', 'action_config': '{}', 'is_system': 0, + 'notify_type': None, 'notify_config': '{}', 'then_actions': '[]', 'group_name': None, + } + db._next_id = 2 # bump so create_automation doesn't overwrite id 1 + eng = _FakeEngine() + body, status = api.duplicate_automation(db, eng, profile_id=1, automation_id=1) + assert status == 200 + assert db.automations[2]['name'] == 'Orig (Copy)' + assert eng.scheduled == [2] + + +def test_duplicate_protects_system(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'is_system': 1, 'name': 'sys'} + body, status = api.duplicate_automation(db, _FakeEngine(), profile_id=1, automation_id=1) + assert status == 403 + + +def test_duplicate_missing_returns_404(): + body, status = api.duplicate_automation(_FakeDB(), _FakeEngine(), profile_id=1, automation_id=99) + assert status == 404 + + +# --------------------------------------------------------------------------- +# toggle_automation +# --------------------------------------------------------------------------- + +def test_toggle_reschedules_when_now_enabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'enabled': 0} # currently off + eng = _FakeEngine() + body, status = api.toggle_automation(db, eng, 1) + assert status == 200 + assert eng.scheduled == [1] + + +def test_toggle_cancels_when_now_disabled(): + db = _FakeDB() + db.automations[1] = {'id': 1, 'enabled': 1} # currently on + eng = _FakeEngine() + api.toggle_automation(db, eng, 1) + assert eng.cancelled == [1] + + +def test_toggle_missing_returns_404(): + body, status = api.toggle_automation(_FakeDB(), _FakeEngine(), 99) + assert status == 404 + + +# --------------------------------------------------------------------------- +# run_automation +# --------------------------------------------------------------------------- + +def test_run_calls_engine_run_now_with_profile(): + eng = _FakeEngine() + body, status = api.run_automation(eng, automation_id=5, profile_id=2) + assert status == 200 + assert eng.run_now_calls == [(5, 2)] + + +def test_run_no_engine_returns_500(): + body, status = api.run_automation(None, 1, 1) + assert status == 500 + + +def test_run_missing_automation_returns_404(): + class _MissEngine(_FakeEngine): + def run_now(self, aid, profile_id=None): + return False + body, status = api.run_automation(_MissEngine(), 1, 1) + assert status == 404 + + +# --------------------------------------------------------------------------- +# get_history +# --------------------------------------------------------------------------- + +def test_history_parses_log_lines_and_result_json(): + db = _FakeDB() + db.history[1] = [ + {'id': 1, 'log_lines': '[{"text":"ok"}]', 'result_json': '{"k":"v"}'}, + {'id': 2, 'log_lines': '', 'result_json': None}, + ] + out = api.get_history(db, 1, limit=10, offset=0) + assert out['automation_id'] == 1 + assert out['history'][0]['log_lines'] == [{'text': 'ok'}] + assert out['history'][0]['result_json'] == {'k': 'v'} + assert out['history'][1]['log_lines'] == [] + + +def test_history_invalid_json_falls_back(): + db = _FakeDB() + db.history[1] = [{'id': 1, 'log_lines': 'not json', 'result_json': 'not json'}] + out = api.get_history(db, 1, limit=10, offset=0) + assert out['history'][0]['log_lines'] == [] + # result_json stays as the original string when not parseable + assert out['history'][0]['result_json'] == 'not json' diff --git a/tests/automation/test_automation_blocks.py b/tests/automation/test_automation_blocks.py new file mode 100644 index 00000000..64d95cfa --- /dev/null +++ b/tests/automation/test_automation_blocks.py @@ -0,0 +1,82 @@ +"""Tests for core/automation/blocks.py — static block definitions for the builder UI. + +Catches accidental schema regressions in the builder block list (missing +`type`/`label`, malformed config_fields options, etc.). +""" + +from __future__ import annotations + +from core.automation import blocks + + +def _shape_check(items, allowed_types): + """Every item has type+label+description, plus type-specific shape rules.""" + seen_types = set() + for item in items: + assert 'type' in item, item + assert 'label' in item, item + assert isinstance(item.get('available'), bool), item + # No duplicate types within a list + assert item['type'] not in seen_types, f"Duplicate type {item['type']!r}" + seen_types.add(item['type']) + + if 'config_fields' in item: + for field in item['config_fields']: + assert 'key' in field + assert 'type' in field + assert field['type'] in allowed_types, f"Unknown field type {field['type']!r} in {item['type']}" + if field['type'] == 'select': + assert 'options' in field + for opt in field['options']: + assert 'value' in opt + assert 'label' in opt + + +_FIELD_TYPES = { + 'number', 'select', 'time', 'multi_select', 'checkbox', 'text', + 'mirrored_playlist_select', 'signal_input', 'script_select', +} + + +def test_triggers_shape(): + _shape_check(blocks.TRIGGERS, _FIELD_TYPES) + + +def test_actions_shape(): + _shape_check(blocks.ACTIONS, _FIELD_TYPES) + + +def test_notifications_shape(): + _shape_check(blocks.NOTIFICATIONS, _FIELD_TYPES) + + +def test_signal_received_trigger_present(): + types = {t['type'] for t in blocks.TRIGGERS} + assert 'signal_received' in types + + +def test_fire_signal_notification_present(): + types = {n['type'] for n in blocks.NOTIFICATIONS} + assert 'fire_signal' in types + + +def test_run_script_in_both_actions_and_notifications(): + """run_script can be either an action or a then-action — both lists own it.""" + action_types = {a['type'] for a in blocks.ACTIONS} + notif_types = {n['type'] for n in blocks.NOTIFICATIONS} + assert 'run_script' in action_types + assert 'run_script' in notif_types + + +def test_schedule_trigger_default_unit_is_hours(): + schedule = next(t for t in blocks.TRIGGERS if t['type'] == 'schedule') + unit_field = next(f for f in schedule['config_fields'] if f['key'] == 'unit') + assert unit_field['default'] == 'hours' + + +def test_event_triggers_with_conditions_have_condition_fields(): + for t in blocks.TRIGGERS: + if t.get('has_conditions'): + assert 'condition_fields' in t, f"{t['type']} marked has_conditions but no condition_fields" + assert isinstance(t['condition_fields'], list) + assert len(t['condition_fields']) > 0 diff --git a/tests/automation/test_automation_progress.py b/tests/automation/test_automation_progress.py new file mode 100644 index 00000000..aa60ef06 --- /dev/null +++ b/tests/automation/test_automation_progress.py @@ -0,0 +1,290 @@ +"""Tests for core/automation/progress.py — progress state lifecycle + emit loop.""" + +from __future__ import annotations + +import time +from datetime import datetime, timedelta, timezone + +import pytest + +from core.automation import progress + + +@pytest.fixture(autouse=True) +def reset_state(): + """Each test gets a clean progress state dict.""" + progress.progress_states.clear() + yield + progress.progress_states.clear() + + +# --------------------------------------------------------------------------- +# init_progress +# --------------------------------------------------------------------------- + +def test_init_progress_seeds_running_state(): + progress.init_progress(7, 'My Automation', 'process_wishlist') + state = progress.progress_states[7] + assert state['status'] == 'running' + assert state['action_type'] == 'process_wishlist' + assert state['progress'] == 0 + assert state['phase'] == 'Starting...' + assert state['log'][0] == {'type': 'info', 'text': 'Starting My Automation'} + assert state['started_at'] is not None + assert state['finished_at'] is None + + +def test_init_progress_overwrites_existing_state(): + progress.init_progress(7, 'First', 'a') + progress.init_progress(7, 'Second', 'b') + assert progress.progress_states[7]['action_type'] == 'b' + + +# --------------------------------------------------------------------------- +# update_progress +# --------------------------------------------------------------------------- + +def test_update_progress_writes_simple_fields(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, progress=42, phase='Working') + assert progress.progress_states[1]['progress'] == 42 + assert progress.progress_states[1]['phase'] == 'Working' + + +def test_update_progress_log_line_appends_with_type(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, log_line='hello', log_type='success') + log = progress.progress_states[1]['log'] + assert log[-1] == {'type': 'success', 'text': 'hello'} + + +def test_update_progress_log_caps_at_50_entries(): + progress.init_progress(1, 'A', 'x') + for i in range(60): + progress.update_progress(1, log_line=f'line {i}') + assert len(progress.progress_states[1]['log']) == 50 + assert progress.progress_states[1]['log'][-1]['text'] == 'line 59' + + +def test_update_progress_log_type_not_stored_as_field(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, log_line='hi', log_type='warning') + assert 'log_type' not in progress.progress_states[1] + + +def test_update_progress_finish_sets_finished_at_and_emits(): + progress.init_progress(1, 'A', 'x') + emitted = [] + def _emit(event, data): + emitted.append((event, data)) + progress.update_progress(1, status='finished', socketio_emit=_emit) + assert progress.progress_states[1]['finished_at'] is not None + assert emitted[0][0] == 'automation:progress' + assert '1' in emitted[0][1] + + +def test_update_progress_error_status_also_emits(): + progress.init_progress(1, 'A', 'x') + emitted = [] + progress.update_progress(1, status='error', socketio_emit=lambda e, d: emitted.append(e)) + assert emitted == ['automation:progress'] + + +def test_update_progress_running_status_does_not_emit(): + progress.init_progress(1, 'A', 'x') + emitted = [] + progress.update_progress(1, status='running', progress=50, socketio_emit=lambda e, d: emitted.append(e)) + assert emitted == [] + + +def test_update_progress_emit_failure_swallowed(): + progress.init_progress(1, 'A', 'x') + def _bad(event, data): + raise RuntimeError('socket dead') + # Should NOT raise + progress.update_progress(1, status='finished', socketio_emit=_bad) + assert progress.progress_states[1]['finished_at'] is not None + + +def test_update_progress_none_id_is_noop(): + progress.update_progress(None, progress=99) # no exception + + +def test_update_progress_unknown_id_is_noop(): + progress.update_progress(999, progress=99) + assert 999 not in progress.progress_states + + +# --------------------------------------------------------------------------- +# get_running_progress +# --------------------------------------------------------------------------- + +def test_get_running_progress_returns_running_finished_error(): + progress.init_progress(1, 'A', 'x') + progress.init_progress(2, 'B', 'y') + progress.init_progress(3, 'C', 'z') + progress.update_progress(2, status='finished') + progress.update_progress(3, status='error') + progress.progress_states[4] = {'status': 'unknown', 'log': []} + + snapshot = progress.get_running_progress() + assert set(snapshot.keys()) == {'1', '2', '3'} + + +def test_get_running_progress_copies_log_list(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, log_line='first') + snapshot = progress.get_running_progress() + snapshot['1']['log'].append({'type': 'info', 'text': 'mutated'}) + # Original state should not be affected + assert len(progress.progress_states[1]['log']) == 2 # init + 'first' + + +# --------------------------------------------------------------------------- +# record_history +# --------------------------------------------------------------------------- + +class _FakeDB: + def __init__(self): + self.calls = [] + + def insert_automation_run_history(self, **kw): + self.calls.append(kw) + + +def test_record_history_uses_progress_log_when_available(): + progress.init_progress(1, 'A', 'wishlist') + progress.update_progress(1, log_line='did stuff', log_type='success') + progress.update_progress(1, status='finished', socketio_emit=None) + + db = _FakeDB() + progress.record_history(1, {'status': 'completed'}, db) + + assert len(db.calls) == 1 + call = db.calls[0] + assert call['automation_id'] == 1 + assert call['status'] == 'completed' + assert call['summary'] == 'did stuff' + assert call['duration_seconds'] is not None + + +def test_record_history_status_mapping(): + db = _FakeDB() + progress.record_history(1, {'status': 'error'}, db) + assert db.calls[-1]['status'] == 'error' + + progress.record_history(2, {'status': 'skipped'}, db) + assert db.calls[-1]['status'] == 'skipped' + + progress.record_history(3, {'status': 'timeout'}, db) + assert db.calls[-1]['status'] == 'timeout' + + progress.record_history(4, {'status': 'completed'}, db) + assert db.calls[-1]['status'] == 'completed' + + +def test_record_history_underscore_keys_stripped_from_result_json(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, status='finished', socketio_emit=None) + db = _FakeDB() + progress.record_history(1, {'status': 'completed', '_internal': 'x', 'visible': 'y'}, db) + + import json as _json + parsed = _json.loads(db.calls[0]['result_json']) + assert '_internal' not in parsed + assert parsed.get('visible') == 'y' + + +def test_record_history_falls_back_to_result_summary_when_no_log(): + db = _FakeDB() + progress.record_history(1, {'status': 'error', 'reason': 'bad config'}, db) + assert db.calls[0]['summary'] == 'bad config' + + +def test_record_history_no_progress_state_uses_now_for_times(): + db = _FakeDB() + progress.record_history(99, {'status': 'completed'}, db) + call = db.calls[0] + assert call['started_at'] is not None + assert call['finished_at'] is not None + + +def test_record_history_db_failure_swallowed(): + class _BrokenDB: + def insert_automation_run_history(self, **kw): + raise RuntimeError('db dead') + progress.init_progress(1, 'A', 'x') + # Should NOT raise + progress.record_history(1, {'status': 'completed'}, _BrokenDB()) + + +# --------------------------------------------------------------------------- +# emit_progress_loop +# --------------------------------------------------------------------------- + +class _FakeSocket: + def __init__(self): + self.emitted = [] + self._sleep_count = 0 + self._max_sleeps = 1 + + def sleep(self, seconds): + self._sleep_count += 1 + + def emit(self, event, data): + self.emitted.append((event, data)) + + +def _shutdown_after(n_sleeps, sock): + """Build an is_shutting_down predicate that returns True after n loops.""" + def _check(): + return sock._sleep_count >= n_sleeps + return _check + + +def test_emit_loop_emits_running_state(): + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, progress=50) + sock = _FakeSocket() + progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), poll_interval=0) + assert len(sock.emitted) == 1 + assert sock.emitted[0][0] == 'automation:progress' + assert '1' in sock.emitted[0][1] + + +def test_emit_loop_attempts_timeout_check_with_naive_datetime(): + """Documents pre-existing bug: started_at is tz-aware, now is naive, + so subtraction raises TypeError → caught → timeout never fires. + Lift preserves the bug; fix lives in a separate PR. + """ + progress.init_progress(1, 'A', 'x') + state = progress.progress_states[1] + state['started_at'] = (datetime.now(timezone.utc) - timedelta(hours=3)).isoformat() + sock = _FakeSocket() + progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), + poll_interval=0, timeout_seconds=7200) + # Bug: timeout doesn't fire because datetime.now() is naive but + # started_at is tz-aware → subtraction raises → except → fall through + # to the normal "running" branch. State stays 'running'. + assert progress.progress_states[1]['status'] == 'running' + + +def test_emit_loop_reaps_finished_states_due_to_naive_aware_mismatch(): + """Documents pre-existing bug: finished_at is tz-aware, now is naive, + so the cleanup math raises → caught → state is reaped on FIRST tick + regardless of `cleanup_after_seconds`. Lift preserves the bug. + """ + progress.init_progress(1, 'A', 'x') + progress.update_progress(1, status='finished', socketio_emit=None) + sock = _FakeSocket() + progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), + poll_interval=0, cleanup_after_seconds=300) + # Bug: should be kept (300s window, just finished), but the TypeError + # in the cleanup math is caught with a fall-through that ALSO reaps. + assert 1 not in progress.progress_states + + +def test_emit_loop_no_active_states_no_emit(): + sock = _FakeSocket() + progress.emit_progress_loop(sock, is_shutting_down=_shutdown_after(1, sock), poll_interval=0) + assert sock.emitted == [] diff --git a/tests/automation/test_automation_signals.py b/tests/automation/test_automation_signals.py new file mode 100644 index 00000000..2792d78d --- /dev/null +++ b/tests/automation/test_automation_signals.py @@ -0,0 +1,92 @@ +"""Tests for core/automation/signals.py — known signal collection.""" + +from __future__ import annotations + +import json + +from core.automation import signals + + +class _FakeDB: + def __init__(self, automations): + self._autos = automations + + def get_automations(self, profile_id=None): + return self._autos + + +def test_no_automations_returns_empty(): + db = _FakeDB([]) + assert signals.collect_known_signals(db) == [] + + +def test_signal_received_trigger_collected(): + db = _FakeDB([ + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'job_done'}), 'then_actions': '[]'}, + ]) + assert signals.collect_known_signals(db) == ['job_done'] + + +def test_fire_signal_then_action_collected(): + db = _FakeDB([ + {'trigger_type': 'schedule', 'trigger_config': '{}', + 'then_actions': json.dumps([{'type': 'fire_signal', 'config': {'signal_name': 'cleanup_done'}}])}, + ]) + assert signals.collect_known_signals(db) == ['cleanup_done'] + + +def test_collected_signals_are_sorted_and_deduped(): + db = _FakeDB([ + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'zebra'}), 'then_actions': '[]'}, + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'apple'}), 'then_actions': '[]'}, + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'apple'}), 'then_actions': '[]'}, + ]) + assert signals.collect_known_signals(db) == ['apple', 'zebra'] + + +def test_empty_signal_name_skipped(): + db = _FakeDB([ + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': ''}), 'then_actions': '[]'}, + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': ' '}), 'then_actions': '[]'}, + ]) + assert signals.collect_known_signals(db) == [] + + +def test_malformed_trigger_config_swallowed(): + db = _FakeDB([ + {'trigger_type': 'signal_received', 'trigger_config': 'not json', 'then_actions': '[]'}, + ]) + assert signals.collect_known_signals(db) == [] + + +def test_malformed_then_actions_swallowed(): + db = _FakeDB([ + {'trigger_type': 'schedule', 'trigger_config': '{}', 'then_actions': 'not json'}, + ]) + assert signals.collect_known_signals(db) == [] + + +def test_db_failure_returns_empty(): + class _BrokenDB: + def get_automations(self, profile_id=None): + raise RuntimeError("db dead") + assert signals.collect_known_signals(_BrokenDB()) == [] + + +def test_mixed_trigger_and_action_signals_merged(): + db = _FakeDB([ + {'trigger_type': 'signal_received', 'trigger_config': json.dumps({'signal_name': 'sig_a'}), + 'then_actions': json.dumps([{'type': 'fire_signal', 'config': {'signal_name': 'sig_b'}}])}, + ]) + assert signals.collect_known_signals(db) == ['sig_a', 'sig_b'] + + +def test_non_signal_then_action_ignored(): + db = _FakeDB([ + {'trigger_type': 'schedule', 'trigger_config': '{}', + 'then_actions': json.dumps([ + {'type': 'discord', 'config': {'webhook_url': 'http://x'}}, + {'type': 'fire_signal', 'config': {'signal_name': 'real_sig'}}, + ])}, + ]) + assert signals.collect_known_signals(db) == ['real_sig'] diff --git a/tests/metadata/test_metadata_enrichment.py b/tests/metadata/test_metadata_enrichment.py index 2c044edd..83644fdf 100644 --- a/tests/metadata/test_metadata_enrichment.py +++ b/tests/metadata/test_metadata_enrichment.py @@ -359,6 +359,9 @@ def test_musicbrainz_release_lookup_failure_does_not_poison_cache(monkeypatch): def get_release(self, mbid, includes=None): return {} + def get_artist(self, mbid, includes=None): + return {} + class _FakeMBService: def __init__(self): self.release_calls = 0 @@ -593,3 +596,111 @@ def test_download_cover_art_uses_album_context_image_url(tmp_path, monkeypatch): cover_path = target_dir / "cover.jpg" assert cover_path.exists() assert cover_path.read_bytes() == b"cover-bytes" + + +# --------------------------------------------------------------------------- +# MusicBrainz genre fallback chain (recording → release → artist) +# --------------------------------------------------------------------------- + +def _build_mb_genre_test(monkeypatch, *, recording_genres, release_genres, artist_genres): + """Helper: assemble a fake MB stack with configurable genres at each tier.""" + class _FakeMBClient: + def __init__(self): + self.artist_calls = 0 + self.release_calls = 0 + + def get_recording(self, mbid, includes=None): + return {"isrcs": [], "genres": list(recording_genres)} + + def get_release(self, mbid, includes=None): + self.release_calls += 1 + return {"genres": list(release_genres), "media": []} + + def get_artist(self, mbid, includes=None): + self.artist_calls += 1 + return {"genres": list(artist_genres)} + + class _FakeMBService: + def __init__(self): + self.mb_client = _FakeMBClient() + + def match_recording(self, t, a): + return {"mbid": "rec-mbid"} + + def match_artist(self, a): + return {"mbid": "artist-mbid"} + + def match_release(self, album, artist): + return {"mbid": "release-mbid"} + + service = _FakeMBService() + monkeypatch.setattr(ms, "get_config_manager", lambda: _Config({"musicbrainz.embed_tags": True})) + monkeypatch.setattr(ms, "mb_release_cache", {}) + monkeypatch.setattr(ms, "mb_release_detail_cache", {}) + + runtime = types.SimpleNamespace(mb_worker=types.SimpleNamespace(mb_service=service)) + pp = { + "id_tags": {}, "track_title": "T", "artist_name": "A", "batch_artist_name": "A", + "metadata": {"album": "Alb"}, "recording_mbid": None, "artist_mbid": None, + "release_mbid": "", "mb_genres": [], "isrc": None, + "deezer_bpm": None, "deezer_isrc": None, + "audiodb_mood": None, "audiodb_style": None, "audiodb_genre": None, + "tidal_isrc": None, "tidal_copyright": None, + "qobuz_isrc": None, "qobuz_copyright": None, "qobuz_label": None, + "lastfm_tags": [], "lastfm_url": None, "genius_url": None, "release_year": None, + } + return pp, service, runtime + + +def test_mb_genre_recording_used_when_present(monkeypatch): + pp, service, runtime = _build_mb_genre_test( + monkeypatch, + recording_genres=[{"name": "Rock", "count": 5}], + release_genres=[{"name": "Pop", "count": 10}], + artist_genres=[{"name": "Jazz", "count": 20}], + ) + ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}), + runtime, "T", "A") + assert pp["mb_genres"] == ["Rock"] + # Release/artist genre lookups not consulted because recording had genres + assert service.mb_client.artist_calls == 0 + + +def test_mb_genre_falls_back_to_release_when_recording_empty(monkeypatch): + pp, service, runtime = _build_mb_genre_test( + monkeypatch, + recording_genres=[], + release_genres=[{"name": "Pop", "count": 10}, {"name": "Indie", "count": 3}], + artist_genres=[{"name": "Jazz", "count": 20}], + ) + ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}), + runtime, "T", "A") + # Sorted by count desc: Pop (10) before Indie (3) + assert pp["mb_genres"] == ["Pop", "Indie"] + # Artist not consulted because release had genres + assert service.mb_client.artist_calls == 0 + + +def test_mb_genre_falls_back_to_artist_when_recording_and_release_empty(monkeypatch): + pp, service, runtime = _build_mb_genre_test( + monkeypatch, + recording_genres=[], + release_genres=[], + artist_genres=[{"name": "Jazz", "count": 20}, {"name": "Fusion", "count": 5}], + ) + ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}), + runtime, "T", "A") + assert pp["mb_genres"] == ["Jazz", "Fusion"] + assert service.mb_client.artist_calls == 1 + + +def test_mb_genre_all_empty_returns_empty(monkeypatch): + pp, service, runtime = _build_mb_genre_test( + monkeypatch, + recording_genres=[], + release_genres=[], + artist_genres=[], + ) + ms._process_musicbrainz_source(pp, {"album": "Alb"}, _Config({"musicbrainz.embed_tags": True}), + runtime, "T", "A") + assert pp["mb_genres"] == [] diff --git a/web_server.py b/web_server.py index a3fea269..24fe6b10 100644 --- a/web_server.py +++ b/web_server.py @@ -2312,66 +2312,7 @@ def _register_automation_handlers(): def _record_automation_history(aid, result): """Capture progress state into run history before cleanup clears it.""" - try: - with automation_progress_lock: - state = automation_progress_states.get(aid) - if state: - started_at = state.get('started_at') - finished_at = state.get('finished_at') or datetime.now(timezone.utc).isoformat() - log_entries = list(state.get('log', [])) - else: - started_at = datetime.now(timezone.utc).isoformat() - finished_at = datetime.now(timezone.utc).isoformat() - log_entries = [] - - # Compute duration - duration = None - if started_at and finished_at: - try: - t0 = datetime.fromisoformat(started_at) - t1 = datetime.fromisoformat(finished_at) - duration = (t1 - t0).total_seconds() - except Exception: - pass - - # Determine status - r_status = result.get('status', 'completed') if result else 'completed' - if r_status == 'error': - status = 'error' - elif r_status == 'skipped': - status = 'skipped' - elif r_status == 'timeout': - status = 'timeout' - else: - status = 'completed' - - # Extract summary from the last success/error log line - summary = None - for entry in reversed(log_entries): - if entry.get('type') in ('success', 'error'): - summary = entry.get('text', '') - break - if not summary and log_entries: - summary = log_entries[-1].get('text', '') - # Fallback: use reason or error from result when no log entries captured - if not summary and result: - summary = result.get('reason') or result.get('error') or result.get('status', '') - - result_json = json.dumps({k: v for k, v in result.items() if not k.startswith('_')}) if result else None - log_json = json.dumps(log_entries) if log_entries else None - - get_database().insert_automation_run_history( - automation_id=aid, - started_at=started_at, - finished_at=finished_at, - duration_seconds=duration, - status=status, - summary=summary, - result_json=result_json, - log_lines=log_json - ) - except Exception as e: - logger.error(f"Error recording automation history for {aid}: {e}") + _auto_progress.record_history(aid, result, get_database()) automation_engine.register_progress_callbacks(_progress_init, _progress_finish, _update_automation_progress, _record_automation_history) @@ -2410,44 +2351,23 @@ except Exception as e: # --- Automation Progress Tracking --- -automation_progress_states = {} # automation_id (int) -> state dict -automation_progress_lock = threading.Lock() +# State + helpers live in core/automation/progress.py. Re-exported here +# so the existing call sites (registered as engine progress callbacks +# and used inside _record_automation_history) keep resolving. +from core.automation import progress as _auto_progress + +automation_progress_states = _auto_progress.progress_states +automation_progress_lock = _auto_progress.progress_lock + def _init_automation_progress(automation_id, automation_name, action_type): """Initialize progress state when an automation starts running.""" - with automation_progress_lock: - automation_progress_states[automation_id] = { - 'status': 'running', - 'action_type': action_type, - 'progress': 0, 'phase': 'Starting...', 'current_item': '', - 'processed': 0, 'total': 0, - 'log': [{'type': 'info', 'text': f'Starting {automation_name}'}], - 'started_at': datetime.now(timezone.utc).isoformat(), - 'finished_at': None, - } + _auto_progress.init_progress(automation_id, automation_name, action_type) + def _update_automation_progress(automation_id, **kwargs): """Update progress state from handler threads. Thread-safe.""" - if automation_id is None: - return - with automation_progress_lock: - state = automation_progress_states.get(automation_id) - if not state: - return - for k, v in kwargs.items(): - if k == 'log_line': - state['log'].append({'type': kwargs.get('log_type', 'info'), 'text': v}) - if len(state['log']) > 50: - state['log'] = state['log'][-50:] - elif k != 'log_type': - state[k] = v - # Immediate emit on finish so frontend gets final state without waiting for loop - if kwargs.get('status') in ('finished', 'error'): - state['finished_at'] = datetime.now(timezone.utc).isoformat() - try: - socketio.emit('automation:progress', {str(automation_id): dict(state)}) - except Exception: - pass + _auto_progress.update_progress(automation_id, socketio_emit=socketio.emit, **kwargs) # --- Global Matched Downloads Context Management --- # Shared with core.runtime_state so the refactored pipeline and web @@ -6412,32 +6332,18 @@ def get_genre_whitelist_defaults(): return jsonify({'genres': sorted(DEFAULT_GENRES, key=str.lower)}) +# Automation route bodies live in core/automation/api.py — these routes are thin handlers. +from core.automation import api as _auto_api +from core.automation import blocks as _auto_blocks +from core.automation import signals as _auto_signals + + @app.route('/api/automations', methods=['GET']) def list_automations(): """List all automations for the current profile.""" try: profile_id = session.get('profile_id', 1) - db = get_database() - automations = db.get_automations(profile_id) - # Parse JSON config fields for frontend - for auto in automations: - for field in ('trigger_config', 'action_config', 'notify_config', 'last_result'): - try: - auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field] - except (json.JSONDecodeError, TypeError): - if field in ('trigger_config', 'action_config', 'notify_config'): - auto[field] = {} - else: - auto[field] = None - # Parse then_actions - try: - auto['then_actions'] = json.loads(auto.get('then_actions') or '[]') if isinstance(auto.get('then_actions'), str) else (auto.get('then_actions') or []) - except (json.JSONDecodeError, TypeError): - auto['then_actions'] = [] - # Backward compat: if then_actions empty but notify_type set, build it - if not auto['then_actions'] and auto.get('notify_type'): - auto['then_actions'] = [{'type': auto['notify_type'], 'config': auto.get('notify_config', {})}] - return jsonify(automations) + return jsonify(_auto_api.list_automations(get_database(), profile_id)) except Exception as e: logger.error(f"Error listing automations: {e}") return jsonify({"error": str(e)}), 500 @@ -6446,80 +6352,21 @@ def list_automations(): def create_automation(): """Create a new automation.""" try: - data = request.get_json() - name = data.get('name', '').strip() - if not name: - return jsonify({"error": "Name is required"}), 400 - - trigger_type = data.get('trigger_type', 'schedule') - trigger_config = json.dumps(data.get('trigger_config', {})) - action_type = data.get('action_type', 'process_wishlist') - action_config = json.dumps(data.get('action_config', {})) - # then_actions array (new multi-then system) - then_actions = data.get('then_actions', []) - then_actions_json = json.dumps(then_actions) - # Backward compat: derive notify_type/notify_config from first then_action - if then_actions: - notify_type = then_actions[0].get('type') - notify_config = json.dumps(then_actions[0].get('config', {})) - else: - notify_type = data.get('notify_type') or None - notify_config = json.dumps(data.get('notify_config', {})) if notify_type else '{}' profile_id = session.get('profile_id', 1) - - # Signal cycle detection - if automation_engine and (trigger_type == 'signal_received' or any(t.get('type') == 'fire_signal' for t in then_actions)): - db = get_database() - all_autos = db.get_automations(profile_id) - test_auto = { - 'trigger_type': trigger_type, - 'trigger_config': trigger_config, - 'then_actions': then_actions_json, - 'enabled': True, - } - all_autos.append(test_auto) - cycle = automation_engine.detect_signal_cycles(all_autos) - if cycle: - return jsonify({"error": f"Signal cycle detected: {' → '.join(cycle)}. This would cause an infinite loop."}), 400 - - group_name = data.get('group_name') or None - db = get_database() - auto_id = db.create_automation(name, trigger_type, trigger_config, action_type, action_config, profile_id, notify_type, notify_config, then_actions_json, group_name) - if auto_id is None: - return jsonify({"error": "Failed to create automation"}), 500 - - # Schedule it - if automation_engine: - automation_engine.schedule_automation(auto_id) - - return jsonify({"success": True, "id": auto_id}) + body, status = _auto_api.create_automation(get_database(), automation_engine, profile_id, request.get_json()) + return jsonify(body), status except Exception as e: logger.error(f"Error creating automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations/', methods=['GET']) def get_automation(automation_id): """Get a single automation.""" try: - db = get_database() - auto = db.get_automation(automation_id) - if not auto: + auto = _auto_api.get_automation(get_database(), automation_id) + if auto is None: return jsonify({"error": "Automation not found"}), 404 - for field in ('trigger_config', 'action_config', 'notify_config', 'last_result'): - try: - auto[field] = json.loads(auto[field]) if isinstance(auto[field], str) else auto[field] - except (json.JSONDecodeError, TypeError): - if field in ('trigger_config', 'action_config', 'notify_config'): - auto[field] = {} - else: - auto[field] = None - # Parse then_actions - try: - auto['then_actions'] = json.loads(auto.get('then_actions') or '[]') if isinstance(auto.get('then_actions'), str) else (auto.get('then_actions') or []) - except (json.JSONDecodeError, TypeError): - auto['then_actions'] = [] - if not auto['then_actions'] and auto.get('notify_type'): - auto['then_actions'] = [{'type': auto['notify_type'], 'config': auto.get('notify_config', {})}] return jsonify(auto) except Exception as e: logger.error(f"Error getting automation: {e}") @@ -6529,102 +6376,20 @@ def get_automation(automation_id): def update_automation_endpoint(automation_id): """Update an automation.""" try: - data = request.get_json() - db = get_database() - - update_fields = {} - if 'name' in data: - update_fields['name'] = data['name'].strip() - if 'trigger_type' in data: - update_fields['trigger_type'] = data['trigger_type'] - if 'trigger_config' in data: - update_fields['trigger_config'] = json.dumps(data['trigger_config']) - if 'action_type' in data: - update_fields['action_type'] = data['action_type'] - if 'action_config' in data: - update_fields['action_config'] = json.dumps(data['action_config']) - if 'then_actions' in data: - then_actions = data['then_actions'] - update_fields['then_actions'] = json.dumps(then_actions) - # Backward compat: derive notify_type/notify_config from first then_action - if then_actions: - update_fields['notify_type'] = then_actions[0].get('type') - update_fields['notify_config'] = json.dumps(then_actions[0].get('config', {})) - else: - update_fields['notify_type'] = None - update_fields['notify_config'] = '{}' - elif 'notify_type' in data: - update_fields['notify_type'] = data['notify_type'] or None - if 'notify_config' in data and 'then_actions' not in data: - update_fields['notify_config'] = json.dumps(data['notify_config']) - if 'group_name' in data: - update_fields['group_name'] = data['group_name'] or None - - if not update_fields: - return jsonify({"error": "No fields to update"}), 400 - - # Signal cycle detection - trigger_type = data.get('trigger_type', '') - then_actions = data.get('then_actions', []) - if automation_engine and (trigger_type == 'signal_received' or any(t.get('type') == 'fire_signal' for t in then_actions)): - all_autos = db.get_automations() - # Replace the automation being edited with the updated version - test_autos = [] - for a in all_autos: - if a['id'] == automation_id: - merged = dict(a) - if 'trigger_type' in data: - merged['trigger_type'] = data['trigger_type'] - if 'trigger_config' in data: - merged['trigger_config'] = json.dumps(data['trigger_config']) - if 'then_actions' in data: - merged['then_actions'] = json.dumps(data['then_actions']) - merged['enabled'] = True - test_autos.append(merged) - else: - test_autos.append(a) - cycle = automation_engine.detect_signal_cycles(test_autos) - if cycle: - return jsonify({"error": f"Signal cycle detected: {' → '.join(cycle)}. This would cause an infinite loop."}), 400 - - success = db.update_automation(automation_id, **update_fields) - if not success: - return jsonify({"error": "Automation not found"}), 404 - - # Reschedule - if automation_engine: - auto = db.get_automation(automation_id) - if auto and auto.get('enabled'): - automation_engine.schedule_automation(automation_id) - else: - automation_engine.cancel_automation(automation_id) - - return jsonify({"success": True}) + body, status = _auto_api.update_automation(get_database(), automation_engine, automation_id, request.get_json()) + return jsonify(body), status except Exception as e: logger.error(f"Error updating automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations/group', methods=['PUT']) def batch_update_automation_group(): - """Batch update group_name for multiple automations (rename group, delete group, drag-drop).""" + """Batch update group_name for multiple automations.""" try: data = request.get_json() - automation_ids = data.get('automation_ids', []) - group_name = data.get('group_name') # None/null = ungroup - - if not automation_ids or not isinstance(automation_ids, list): - return jsonify({"error": "automation_ids must be a non-empty list"}), 400 - - # Sanitize IDs to integers - try: - automation_ids = [int(aid) for aid in automation_ids] - except (ValueError, TypeError): - return jsonify({"error": "automation_ids must contain integers"}), 400 - - db = get_database() - updated = db.batch_update_group(automation_ids, group_name) - - return jsonify({"success": True, "updated": updated}) + body, status = _auto_api.batch_update_group(get_database(), data.get('automation_ids', []), data.get('group_name')) + return jsonify(body), status except Exception as e: logger.error(f"Error batch updating automation group: {e}") return jsonify({"error": str(e)}), 500 @@ -6635,31 +6400,9 @@ def bulk_toggle_automations(): """Bulk enable/disable multiple automations.""" try: data = request.get_json() - automation_ids = data.get('automation_ids', []) - enabled = data.get('enabled', True) - - if not automation_ids or not isinstance(automation_ids, list): - return jsonify({"error": "automation_ids must be a non-empty list"}), 400 - - try: - automation_ids = [int(aid) for aid in automation_ids] - except (ValueError, TypeError): - return jsonify({"error": "automation_ids must contain integers"}), 400 - - db = get_database() - updated = db.bulk_set_enabled(automation_ids, bool(enabled)) - - # Reschedule/cancel affected automations - if automation_engine and updated > 0: - for aid in automation_ids: - auto = db.get_automation(aid) - if auto: - if auto.get('enabled'): - automation_engine.schedule_automation(auto) - else: - automation_engine.cancel_automation(aid) - - return jsonify({"success": True, "updated": updated}) + body, status = _auto_api.bulk_toggle(get_database(), automation_engine, + data.get('automation_ids', []), data.get('enabled', True)) + return jsonify(body), status except Exception as e: logger.error(f"Error bulk toggling automations: {e}") return jsonify({"error": str(e)}), 500 @@ -6669,159 +6412,71 @@ def bulk_toggle_automations(): def delete_automation_endpoint(automation_id): """Delete an automation. System automations cannot be deleted.""" try: - db = get_database() - auto = db.get_automation(automation_id) - if auto and auto.get('is_system'): - return jsonify({"error": "System automations cannot be deleted"}), 403 - if automation_engine: - automation_engine.cancel_automation(automation_id) - success = db.delete_automation(automation_id) - if not success: - return jsonify({"error": "Automation not found"}), 404 - return jsonify({"success": True}) + body, status = _auto_api.delete_automation(get_database(), automation_engine, automation_id) + return jsonify(body), status except Exception as e: logger.error(f"Error deleting automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations//duplicate', methods=['POST']) def duplicate_automation_endpoint(automation_id): """Duplicate an automation. System automations cannot be duplicated.""" try: - db = get_database() - auto = db.get_automation(automation_id) - if not auto: - return jsonify({"error": "Automation not found"}), 404 - if auto.get('is_system'): - return jsonify({"error": "System automations cannot be duplicated"}), 403 profile_id = session.get('profile_id', 1) - new_id = db.create_automation( - name=f"{auto['name']} (Copy)", - trigger_type=auto['trigger_type'], - trigger_config=auto.get('trigger_config', '{}'), - action_type=auto['action_type'], - action_config=auto.get('action_config', '{}'), - profile_id=profile_id, - notify_type=auto.get('notify_type'), - notify_config=auto.get('notify_config', '{}'), - then_actions=auto.get('then_actions', '[]'), - group_name=auto.get('group_name'), - ) - if new_id is None: - return jsonify({"error": "Failed to duplicate automation"}), 500 - if automation_engine: - automation_engine.schedule_automation(new_id) - return jsonify({"success": True, "id": new_id}) + body, status = _auto_api.duplicate_automation(get_database(), automation_engine, profile_id, automation_id) + return jsonify(body), status except Exception as e: logger.error(f"Error duplicating automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations//toggle', methods=['POST']) def toggle_automation_endpoint(automation_id): """Toggle an automation's enabled state.""" try: - db = get_database() - success = db.toggle_automation(automation_id) - if not success: - return jsonify({"error": "Automation not found"}), 404 - - # Reschedule or cancel based on new state - if automation_engine: - auto = db.get_automation(automation_id) - if auto and auto.get('enabled'): - automation_engine.schedule_automation(automation_id) - else: - automation_engine.cancel_automation(automation_id) - - return jsonify({"success": True}) + body, status = _auto_api.toggle_automation(get_database(), automation_engine, automation_id) + return jsonify(body), status except Exception as e: logger.error(f"Error toggling automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations//run', methods=['POST']) def run_automation_endpoint(automation_id): """Manually trigger an automation.""" try: - if not automation_engine: - return jsonify({"error": "Automation engine not available"}), 500 - success = automation_engine.run_now(automation_id, profile_id=get_current_profile_id()) - if not success: - return jsonify({"error": "Automation not found"}), 404 - return jsonify({"success": True}) + body, status = _auto_api.run_automation(automation_engine, automation_id, get_current_profile_id()) + return jsonify(body), status except Exception as e: logger.error(f"Error running automation: {e}") return jsonify({"error": str(e)}), 500 + @app.route('/api/automations/progress', methods=['GET']) def get_automation_progress(): """Get current progress state for all running/recently finished automations.""" try: - with automation_progress_lock: - result = {} - for aid, state in automation_progress_states.items(): - if state['status'] in ('running', 'finished', 'error'): - cp = dict(state) - cp['log'] = list(state['log']) - result[str(aid)] = cp - return jsonify(result) + return jsonify(_auto_progress.get_running_progress()) except Exception as e: return jsonify({"error": str(e)}), 500 + @app.route('/api/automations//history', methods=['GET']) def get_automation_history(automation_id): """Get run history for a specific automation.""" try: limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) - db = get_database() - data = db.get_automation_run_history(automation_id, limit=limit, offset=offset) - # Parse log_lines JSON strings for the frontend - for entry in data.get('history', []): - if entry.get('log_lines'): - try: - entry['log_lines'] = json.loads(entry['log_lines']) - except (json.JSONDecodeError, TypeError): - entry['log_lines'] = [] - else: - entry['log_lines'] = [] - if entry.get('result_json'): - try: - entry['result_json'] = json.loads(entry['result_json']) - except (json.JSONDecodeError, TypeError): - pass - data['automation_id'] = automation_id - return jsonify(data) + return jsonify(_auto_api.get_history(get_database(), automation_id, limit=limit, offset=offset)) except Exception as e: logger.error(f"Error getting automation history: {e}") return jsonify({"error": str(e)}), 500 + def _collect_known_signals(): """Collect all signal names used across automations (for autocomplete).""" - signals = set() - try: - db = get_database() - for auto in db.get_automations(): - # Check signal_received triggers - if auto.get('trigger_type') == 'signal_received': - try: - tc = json.loads(auto.get('trigger_config') or '{}') - sig = tc.get('signal_name', '').strip() - if sig: - signals.add(sig) - except (json.JSONDecodeError, TypeError): - pass - # Check fire_signal in then_actions - try: - ta = json.loads(auto.get('then_actions') or '[]') - for item in ta: - if item.get('type') == 'fire_signal': - sig = item.get('config', {}).get('signal_name', '').strip() - if sig: - signals.add(sig) - except (json.JSONDecodeError, TypeError): - pass - except Exception: - pass - return sorted(signals) + return _auto_signals.collect_known_signals(get_database()) @app.route('/api/scripts', methods=['GET']) def list_available_scripts(): @@ -6851,202 +6506,10 @@ def list_available_scripts(): def get_automation_blocks(): """Return available block types for the automation builder sidebar.""" return jsonify({ - "triggers": [ - {"type": "schedule", "label": "Schedule", "icon": "clock", "description": "Run on a timer interval", "available": True, - "config_fields": [ - {"key": "interval", "type": "number", "label": "Every", "default": 6, "min": 1}, - {"key": "unit", "type": "select", "label": "Unit", - "options": [{"value": "minutes", "label": "Minutes"}, {"value": "hours", "label": "Hours"}, {"value": "days", "label": "Days"}], - "default": "hours"} - ]}, - {"type": "daily_time", "label": "Daily Time", "icon": "clock", "description": "Run every day at a specific time", "available": True, - "config_fields": [ - {"key": "time", "type": "time", "label": "At", "default": "03:00"} - ]}, - {"type": "weekly_time", "label": "Weekly Schedule", "icon": "calendar", "description": "Run on specific days of the week at a set time", "available": True, - "config_fields": [ - {"key": "time", "type": "time", "label": "At", "default": "03:00"}, - {"key": "days", "type": "multi_select", "label": "Days", - "options": [{"value": "mon", "label": "Mon"}, {"value": "tue", "label": "Tue"}, {"value": "wed", "label": "Wed"}, - {"value": "thu", "label": "Thu"}, {"value": "fri", "label": "Fri"}, {"value": "sat", "label": "Sat"}, {"value": "sun", "label": "Sun"}]} - ]}, - {"type": "app_started", "label": "App Started", "icon": "power", "description": "When SoulSync starts up", "available": True}, - {"type": "track_downloaded", "label": "Track Downloaded", "icon": "download", "description": "When a track finishes downloading", "available": True, - "has_conditions": True, - "condition_fields": ["artist", "title", "album", "quality"], - "variables": ["artist", "title", "album", "quality"]}, - {"type": "batch_complete", "label": "Batch Complete", "icon": "check-circle", "description": "When an album/playlist download finishes", "available": True, - "has_conditions": True, - "condition_fields": ["playlist_name"], - "variables": ["playlist_name", "total_tracks", "completed_tracks", "failed_tracks"]}, - {"type": "watchlist_new_release", "label": "New Release Found", "icon": "bell", "description": "When watchlist detects new music", "available": True, - "has_conditions": True, - "condition_fields": ["artist"], - "variables": ["artist", "new_tracks", "added_to_wishlist"]}, - {"type": "playlist_synced", "label": "Playlist Synced", "icon": "refresh", "description": "When a playlist sync completes", "available": True, - "has_conditions": True, - "condition_fields": ["playlist_name"], - "variables": ["playlist_name", "total_tracks", "matched_tracks", "synced_tracks", "failed_tracks"]}, - {"type": "playlist_changed", "label": "Playlist Changed", "icon": "edit", "description": "When a mirrored playlist detects track changes from source", "available": True, - "has_conditions": True, - "condition_fields": ["playlist_name"], - "variables": ["playlist_name", "old_count", "new_count", "added", "removed"]}, - {"type": "discovery_completed", "label": "Discovery Complete", "icon": "search", "description": "When playlist track discovery finishes", "available": True, - "has_conditions": True, - "condition_fields": ["playlist_name"], - "variables": ["playlist_name", "total_tracks", "discovered_count", "failed_count", "skipped_count"]}, - # Phase 3 triggers - {"type": "wishlist_processing_completed", "label": "Wishlist Processed", "icon": "check-circle", - "description": "When auto-wishlist processing finishes", "available": True, - "variables": ["tracks_processed", "tracks_found", "tracks_failed"]}, - {"type": "watchlist_scan_completed", "label": "Watchlist Scan Done", "icon": "check-circle", - "description": "When watchlist scan finishes", "available": True, - "variables": ["artists_scanned", "new_tracks_found", "tracks_added"]}, - {"type": "database_update_completed", "label": "Database Updated", "icon": "database", - "description": "When library database refresh finishes", "available": True, - "variables": ["total_artists", "total_albums", "total_tracks"]}, - {"type": "library_scan_completed", "label": "Library Scan Done", "icon": "hard-drive", - "description": "When media library scan finishes", "available": True, - "variables": ["server_type"]}, - {"type": "download_failed", "label": "Download Failed", "icon": "x-circle", - "description": "When a track permanently fails to download", "available": True, - "has_conditions": True, "condition_fields": ["artist", "title", "reason"], - "variables": ["artist", "title", "reason"]}, - {"type": "download_quarantined", "label": "File Quarantined", "icon": "alert-triangle", - "description": "When AcoustID verification fails", "available": True, - "has_conditions": True, "condition_fields": ["artist", "title"], - "variables": ["artist", "title", "reason"]}, - {"type": "wishlist_item_added", "label": "Wishlist Item Added", "icon": "plus-circle", - "description": "When a track is added to wishlist", "available": True, - "has_conditions": True, "condition_fields": ["artist", "title"], - "variables": ["artist", "title", "reason"]}, - {"type": "watchlist_artist_added", "label": "Artist Watched", "icon": "user-plus", - "description": "When an artist is added to watchlist", "available": True, - "has_conditions": True, "condition_fields": ["artist"], - "variables": ["artist", "artist_id"]}, - {"type": "watchlist_artist_removed", "label": "Artist Unwatched", "icon": "user-minus", - "description": "When an artist is removed from watchlist", "available": True, - "has_conditions": True, "condition_fields": ["artist"], - "variables": ["artist", "artist_id"]}, - {"type": "import_completed", "label": "Import Complete", "icon": "upload", - "description": "When album/track import finishes", "available": True, - "has_conditions": True, "condition_fields": ["artist", "album_name"], - "variables": ["track_count", "album_name", "artist"]}, - {"type": "mirrored_playlist_created", "label": "Playlist Mirrored", "icon": "copy", - "description": "When a new playlist is mirrored", "available": True, - "has_conditions": True, "condition_fields": ["playlist_name", "source"], - "variables": ["playlist_name", "source", "track_count"]}, - {"type": "quality_scan_completed", "label": "Quality Scan Done", "icon": "bar-chart", - "description": "When quality scan finishes", "available": True, - "variables": ["quality_met", "low_quality", "total_scanned"]}, - {"type": "duplicate_scan_completed", "label": "Duplicate Scan Done", "icon": "layers", - "description": "When duplicate cleaner finishes", "available": True, - "variables": ["files_scanned", "duplicates_found", "space_freed"]}, - # Signal trigger - {"type": "signal_received", "label": "Signal Received", "icon": "zap", - "description": "When another automation fires a named signal", "available": True, - "config_fields": [ - {"key": "signal_name", "type": "signal_input", "label": "Signal Name"} - ], - "variables": ["signal_name"]}, - # Webhook trigger - {"type": "webhook_received", "label": "Webhook Received", "icon": "globe", - "description": "When an external API request is received (POST /api/v1/request)", "available": True, - "variables": ["query", "request_id", "source"]}, - ], - "actions": [ - {"type": "process_wishlist", "label": "Process Wishlist", "icon": "list", "description": "Retry failed downloads from wishlist", "available": True, - "config_fields": [{"key": "category", "type": "select", "label": "Category", "options": [{"value": "all", "label": "All"}, {"value": "albums", "label": "Albums"}, {"value": "singles", "label": "Singles"}], "default": "all"}]}, - {"type": "scan_watchlist", "label": "Scan Watchlist", "icon": "eye", "description": "Check watched artists for new releases", "available": True}, - {"type": "scan_library", "label": "Scan Library", "icon": "refresh", "description": "Trigger media server library scan", "available": True}, - {"type": "refresh_mirrored", "label": "Refresh Mirrored Playlist", "icon": "copy", "description": "Re-fetch playlist from source and update mirror", "available": True, - "config_fields": [ - {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, - {"key": "all", "type": "checkbox", "label": "Refresh all mirrored playlists", "default": False} - ]}, - {"type": "sync_playlist", "label": "Sync Playlist", "icon": "sync", "description": "Sync mirrored playlist to media server", "available": True, - "config_fields": [ - {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"} - ]}, - {"type": "discover_playlist", "label": "Discover Playlist", "icon": "search", "description": "Find official Spotify/iTunes metadata for mirrored playlist tracks", "available": True, - "config_fields": [ - {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, - {"key": "all", "type": "checkbox", "label": "Discover all mirrored playlists", "default": False} - ]}, - {"type": "playlist_pipeline", "label": "Playlist Pipeline", "icon": "rocket", - "description": "Full lifecycle: refresh → discover → sync → download missing. One automation for the entire flow.", - "available": True, - "config_fields": [ - {"key": "playlist_id", "type": "mirrored_playlist_select", "label": "Playlist"}, - {"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False}, - {"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False}, - ]}, - {"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True}, - # Phase 3 actions - {"type": "start_database_update", "label": "Update Database", "icon": "database", - "description": "Trigger library database refresh", "available": True, - "config_fields": [ - {"key": "full_refresh", "type": "checkbox", "label": "Full refresh (slower)", "default": False} - ]}, - {"type": "run_duplicate_cleaner", "label": "Run Duplicate Cleaner", "icon": "layers", - "description": "Scan for and remove duplicate files", "available": True}, - {"type": "clear_quarantine", "label": "Clear Quarantine", "icon": "trash", - "description": "Delete all quarantined files", "available": True}, - {"type": "cleanup_wishlist", "label": "Clean Up Wishlist", "icon": "filter", - "description": "Remove duplicate/owned tracks from wishlist", "available": True}, - {"type": "update_discovery_pool", "label": "Update Discovery", "icon": "compass", - "description": "Refresh discovery pool with new tracks", "available": True}, - {"type": "start_quality_scan", "label": "Run Quality Scan", "icon": "bar-chart", - "description": "Scan for low-quality audio files", "available": True, - "config_fields": [ - {"key": "scope", "type": "select", "label": "Scope", - "options": [{"value": "watchlist", "label": "Watchlist Artists"}, {"value": "library", "label": "Full Library"}], - "default": "watchlist"} - ]}, - {"type": "backup_database", "label": "Backup Database", "icon": "save", - "description": "Create timestamped database backup", "available": True}, - {"type": "refresh_beatport_cache", "label": "Refresh Beatport Cache", "icon": "music", - "description": "Scrape Beatport homepage and warm the cache", "available": True}, - {"type": "clean_search_history", "label": "Clean Search History", "icon": "trash-2", - "description": "Remove old searches from Soulseek", "available": True}, - {"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square", - "description": "Clear completed downloads and empty directories", "available": True}, - {"type": "full_cleanup", "label": "Full Cleanup", "icon": "trash", - "description": "Clear quarantine, download queue, import folder, and search history in one sweep", "available": True}, - {"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search", - "description": "Full library comparison without losing enrichment data", "available": True}, - {"type": "run_script", "label": "Run Script", "icon": "terminal", - "description": "Execute a script from the scripts folder", "available": True}, - {"type": "search_and_download", "label": "Search & Download", "icon": "download", - "description": "Search for a track and download the best match", "available": True, - "config_fields": [ - {"key": "query", "type": "text", "label": "Search Query", - "placeholder": "Artist - Track (leave empty to use trigger's query)"} - ]}, - ], - "notifications": [ - {"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True, - "variables": ["time", "name", "run_count", "status"]}, - {"type": "pushbullet", "label": "Pushbullet", "icon": "push", "description": "Push notification to phone/desktop", "available": True, - "variables": ["time", "name", "run_count", "status"]}, - {"type": "telegram", "label": "Telegram", "icon": "message", "description": "Send a Telegram message", "available": True, - "variables": ["time", "name", "run_count", "status"]}, - {"type": "webhook", "label": "Webhook (POST)", "icon": "globe", "description": "Send a POST request to any URL", "available": True, - "variables": ["time", "name", "run_count", "status"]}, - # Signal fire action - {"type": "fire_signal", "label": "Fire Signal", "icon": "zap", - "description": "Fire a signal that other automations can listen for", "available": True, - "config_fields": [ - {"key": "signal_name", "type": "signal_input", "label": "Signal Name"} - ]}, - # Run script then-action - {"type": "run_script", "label": "Run Script", "icon": "terminal", - "description": "Execute a script after the action completes", "available": True, - "config_fields": [ - {"key": "script_name", "type": "script_select", "label": "Script"} - ]}, - ], - "known_signals": _collect_known_signals(), + 'triggers': _auto_blocks.TRIGGERS, + 'actions': _auto_blocks.ACTIONS, + 'notifications': _auto_blocks.NOTIFICATIONS, + 'known_signals': _collect_known_signals(), }) @app.route('/api/mirrored-playlists/list', methods=['GET']) @@ -47981,47 +47444,10 @@ def _emit_scan_status_loop(): def _emit_automation_progress_loop(): """Push automation:progress events every 1 second for running automations.""" - while not globals().get('IS_SHUTTING_DOWN', False): - socketio.sleep(1) - try: - with automation_progress_lock: - active = {} - stale = [] - now = datetime.now() - for aid, state in automation_progress_states.items(): - if state['status'] == 'running': - # Timeout zombie running states after 2 hours - try: - started = datetime.fromisoformat(state.get('started_at', '')) - if (now - started).total_seconds() > 7200: - state['status'] = 'error' - state['phase'] = 'Timed out' - state['finished_at'] = now.isoformat() - state['log'].append({'type': 'error', 'text': 'Timed out after 2 hours'}) - # Emit error state before cleanup so frontend sees it - cp = dict(state) - cp['log'] = list(state['log']) - active[str(aid)] = cp - continue - except (ValueError, TypeError): - pass - cp = dict(state) - cp['log'] = list(state['log']) - active[str(aid)] = cp - elif state['status'] in ('finished', 'error') and state.get('finished_at'): - # Clean up finished states after 60 seconds (frontend already got final emit) - try: - finished_time = datetime.fromisoformat(state['finished_at']) - if (now - finished_time).total_seconds() > 60: - stale.append(aid) - except (ValueError, TypeError): - stale.append(aid) - for aid in stale: - del automation_progress_states[aid] - if active: - socketio.emit('automation:progress', active) - except Exception as e: - logger.debug(f"Error emitting automation progress: {e}") + _auto_progress.emit_progress_loop( + socketio, + is_shutting_down=lambda: globals().get('IS_SHUTTING_DOWN', False), + ) def _emit_repair_progress_loop(): """Push repair:progress events every 1 second for running repair jobs.""" diff --git a/webui/static/helper.js b/webui/static/helper.js index 23a882b0..46994799 100644 --- a/webui/static/helper.js +++ b/webui/static/helper.js @@ -3451,6 +3451,7 @@ const WHATS_NEW = { { title: 'Service Worker for Cover Art + Installable PWA', desc: 'cover art used to re-fetch from the cdn on every library / discover page visit. now a service worker caches images locally — second visit serves art instantly from disk, no network hit. also added a pwa manifest so soulsync can be installed to home screen / desktop as a standalone app (chrome / edge / safari → install soulsync). cache versioned so future strategy changes invalidate cleanly.' }, { title: 'Stats Endpoints Lifted to core/stats', desc: 'internal — moved /api/stats/* and /api/listening-stats/* logic out of web_server.py into core/stats/queries.py with full test coverage. no behavior change. step toward breaking up the web_server.py monolith.' }, { title: 'Search Endpoints Lifted to core/search', desc: 'internal — moved /api/search and /api/enhanced-search/* logic into core/search/ (cache, sources, library_check, stream, basic, orchestrator). 612 fewer lines in web_server.py, 94 new tests. no behavior change.' }, + { title: 'Automation Endpoints Lifted to core/automation', desc: 'internal — moved /api/automations/* CRUD + run + history routes, progress tracking helpers, and signal collection into core/automation/ (api, progress, signals). 383 fewer lines in web_server.py, 72 new tests. action handler registration stays put — those closures are tangled with feature implementations.' }, ], '2.4.0': [ // --- April 26, 2026 — Search & Artists unification + reorganize queue ---