From 49a6c58ea8ff90197ca0cd8e708b8c15f05a8e3c Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Sat, 21 Feb 2026 02:15:43 -0800 Subject: [PATCH] Add Hydrabase P2P mirror worker Introduce a Hydrabase P2P mirror worker and integrate it into the web UI and server flows. Adds core/hydrabase_worker.py: a background thread with a capped queue (1000), enqueue API, rate limiting, basic stats (sent/dropped/errors), and logic to send JSON requests over a provided WebSocket (responses received and discarded). Integrates the worker into web_server.py (import, startup init, status/pause/resume endpoints, and enqueues queries from multiple search endpoints when dev mode is enabled). Adds UI elements, JavaScript polling/toggle logic, and CSS styling for a Hydrabase status button in webui (index.html, static/script.js, static/style.css) to display and control worker state. --- core/hydrabase_worker.py | 145 ++++++++++++++++++++++++++++++++++ web_server.py | 107 ++++++++++++++++++++++++- webui/index.html | 16 ++++ webui/static/script.js | 72 +++++++++++++++++ webui/static/style.css | 165 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 core/hydrabase_worker.py diff --git a/core/hydrabase_worker.py b/core/hydrabase_worker.py new file mode 100644 index 00000000..e9558cce --- /dev/null +++ b/core/hydrabase_worker.py @@ -0,0 +1,145 @@ +""" +Hydrabase P2P Mirror Worker + +Background worker that intercepts search queries and mirrors them to the +Hydrabase P2P network via WebSocket. Fire-and-forget — responses are received +(required by protocol) but discarded. Only processes items when the Hydrabase +WebSocket is connected; items are silently dropped when not connected. +""" + +import json +import logging +import queue +import threading +import time + +logger = logging.getLogger(__name__) + + +class HydrabaseWorker: + def __init__(self, get_ws_and_lock): + """ + Args: + get_ws_and_lock: Callable returning (ws, lock) tuple for the + Hydrabase WebSocket connection. + """ + self.get_ws_and_lock = get_ws_and_lock + + # Worker state + self.running = False + self.paused = False + self.should_stop = False + self.thread = None + + # Queue with cap + self.queue = queue.Queue(maxsize=1000) + + # Statistics + self.stats = { + 'sent': 0, + 'dropped': 0, + 'errors': 0 + } + + def start(self): + if self.running: + return + self.running = True + self.should_stop = False + self.thread = threading.Thread(target=self._run, daemon=True) + self.thread.start() + logger.info("Hydrabase P2P mirror worker started") + + def stop(self): + if not self.running: + return + self.should_stop = True + self.running = False + if self.thread: + self.thread.join(timeout=5) + logger.info("Hydrabase P2P mirror worker stopped") + + def pause(self): + if not self.running: + return + self.paused = True + + def resume(self): + if not self.running: + return + self.paused = False + + def enqueue(self, query, query_type): + """Non-blocking enqueue. Drops oldest item if queue is full.""" + if not query or not self.running: + return + item = {'query': query, 'type': query_type} + try: + self.queue.put_nowait(item) + except queue.Full: + # Drop oldest, then add new + try: + self.queue.get_nowait() + except queue.Empty: + pass + try: + self.queue.put_nowait(item) + except queue.Full: + pass + + def get_stats(self): + is_actually_running = self.running and (self.thread is not None and self.thread.is_alive()) + return { + 'enabled': True, + 'running': is_actually_running and not self.paused, + 'paused': self.paused, + 'queue_size': self.queue.qsize(), + 'stats': self.stats.copy() + } + + def _run(self): + while not self.should_stop: + try: + if self.paused: + time.sleep(1) + continue + + # Non-blocking dequeue with timeout + try: + item = self.queue.get(timeout=1) + except queue.Empty: + continue + + self._process_item(item) + time.sleep(0.5) # Rate limit + + except Exception as e: + logger.error(f"Error in Hydrabase worker loop: {e}") + self.stats['errors'] += 1 + time.sleep(2) + + def _process_item(self, item): + ws, lock = self.get_ws_and_lock() + + if ws is None: + self.stats['dropped'] += 1 + return + + payload = json.dumps({ + 'request': { + 'type': item['type'], + 'query': item['query'] + } + }) + + try: + with lock: + if not ws.connected: + self.stats['dropped'] += 1 + return + ws.send(payload) + ws.recv() # Required by protocol, response discarded + self.stats['sent'] += 1 + except Exception as e: + logger.debug(f"Hydrabase send failed: {e}") + self.stats['dropped'] += 1 diff --git a/web_server.py b/web_server.py index fdf85960..2616e04c 100644 --- a/web_server.py +++ b/web_server.py @@ -71,6 +71,7 @@ from beatport_unified_scraper import BeatportUnifiedScraper from core.musicbrainz_worker import MusicBrainzWorker from core.audiodb_worker import AudioDBWorker from core.deezer_worker import DeezerWorker +from core.hydrabase_worker import HydrabaseWorker # --- Flask App Setup --- base_dir = os.path.abspath(os.path.dirname(__file__)) @@ -3941,6 +3942,12 @@ def enhanced_search(): logger.info(f"Enhanced search initiated for: '{query}'") + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, 'track') + hydrabase_worker.enqueue(query, 'album') + hydrabase_worker.enqueue(query, 'artist') + try: # Search local database for artists database = get_database() @@ -5578,6 +5585,10 @@ def get_artist_discography(artist_id): # Get optional artist name for fallback searches artist_name = request.args.get('artist_name', '') + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled and artist_name: + hydrabase_worker.enqueue(artist_name, 'discography') + # Determine which source to use spotify_available = spotify_client and spotify_client.is_spotify_authenticated() @@ -6791,7 +6802,11 @@ def search_match(): if not query: return jsonify({"results": []}) - + + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, context) + if context == 'artist': # Search for artists artist_matches = spotify_client.search_artists(query, limit=8) @@ -16423,6 +16438,10 @@ def search_spotify(): if not query: return jsonify({"error": "Query parameter 'q' is required"}), 400 + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, search_type) + # Search using spotify_client tracks = spotify_client.search_tracks(query, limit=limit) @@ -16456,6 +16475,10 @@ def search_spotify_tracks(): if not query: return jsonify({"error": "Query parameter is required"}), 400 + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, 'track') + # Search using spotify_client tracks = spotify_client.search_tracks(query, limit=limit) @@ -16487,6 +16510,10 @@ def search_itunes_tracks(): if not query: return jsonify({"error": "Query parameter is required"}), 400 + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, 'track') + # Search using iTunes client itunes_client = iTunesClient() tracks = itunes_client.search_tracks(query, limit=limit) @@ -21804,6 +21831,10 @@ def search_artists_for_playlist(): if not query: return jsonify({"success": False, "error": "Query required"}), 400 + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, 'artist') + # Search Spotify for artists results = spotify_client.sp.search(q=query, type='artist', limit=10) @@ -26256,6 +26287,76 @@ def deezer_resume(): # ================================================================================================ +# ================================================================================================ +# HYDRABASE P2P MIRROR WORKER +# ================================================================================================ + +# --- Hydrabase Worker Initialization --- +hydrabase_worker = None +try: + def _get_hydrabase_ws_and_lock(): + return (_hydrabase_ws, _hydrabase_lock) + hydrabase_worker = HydrabaseWorker(get_ws_and_lock=_get_hydrabase_ws_and_lock) + hydrabase_worker.start() + print("✅ Hydrabase P2P mirror worker initialized and started") +except Exception as e: + print(f"⚠️ Hydrabase worker initialization failed: {e}") + hydrabase_worker = None + +# --- Hydrabase Worker API Endpoints --- + +@app.route('/api/hydrabase-worker/status', methods=['GET']) +def hydrabase_worker_status(): + """Get Hydrabase P2P mirror worker status for UI polling""" + try: + if hydrabase_worker is None: + return jsonify({ + 'enabled': False, + 'running': False, + 'paused': False, + 'queue_size': 0, + 'stats': {'sent': 0, 'dropped': 0, 'errors': 0} + }), 200 + + status = hydrabase_worker.get_stats() + return jsonify(status), 200 + except Exception as e: + logger.error(f"Error getting Hydrabase worker status: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/api/hydrabase-worker/pause', methods=['POST']) +def hydrabase_worker_pause(): + """Pause Hydrabase P2P mirror worker""" + try: + if hydrabase_worker is None: + return jsonify({'error': 'Hydrabase worker not initialized'}), 400 + + hydrabase_worker.pause() + logger.info("Hydrabase worker paused via UI") + return jsonify({'status': 'paused'}), 200 + except Exception as e: + logger.error(f"Error pausing Hydrabase worker: {e}") + return jsonify({'error': str(e)}), 500 + +@app.route('/api/hydrabase-worker/resume', methods=['POST']) +def hydrabase_worker_resume(): + """Resume Hydrabase P2P mirror worker""" + try: + if hydrabase_worker is None: + return jsonify({'error': 'Hydrabase worker not initialized'}), 400 + + hydrabase_worker.resume() + logger.info("Hydrabase worker resumed via UI") + return jsonify({'status': 'running'}), 200 + except Exception as e: + logger.error(f"Error resuming Hydrabase worker: {e}") + return jsonify({'error': str(e)}), 500 + +# ================================================================================================ +# END HYDRABASE P2P MIRROR WORKER +# ================================================================================================ + + # ================================================================================================ # IMPORT / STAGING SYSTEM # ================================================================================================ @@ -26430,6 +26531,10 @@ def import_search_albums(): if not query: return jsonify({'success': False, 'error': 'Missing query parameter'}), 400 + # Mirror to Hydrabase P2P network + if hydrabase_worker and dev_mode_enabled: + hydrabase_worker.enqueue(query, 'album') + limit = min(int(request.args.get('limit', 12)), 50) albums = spotify_client.search_albums(query, limit=limit) diff --git a/webui/index.html b/webui/index.html index bbff5ac4..0d8769de 100644 --- a/webui/index.html +++ b/webui/index.html @@ -246,6 +246,22 @@ + +