mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
157 lines
5.9 KiB
157 lines
5.9 KiB
"""Generic Flask routes for enrichment-bubble status / pause / resume.
|
|
|
|
Replaces 30 near-identical per-service routes that web_server.py used
|
|
to hand-roll. The blueprint reads the registry in ``core.enrichment.services``
|
|
and dispatches:
|
|
|
|
GET /api/enrichment/<service_id>/status
|
|
POST /api/enrichment/<service_id>/pause
|
|
POST /api/enrichment/<service_id>/resume
|
|
|
|
A 404 is returned for unknown service ids. Per-service quirks (Spotify
|
|
rate-limit guard, auto-pause token cleanup, persisted-pause config keys)
|
|
are encoded as data on the ``EnrichmentService`` descriptor — there is
|
|
no branching on service id inside this module.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Callable, Optional
|
|
|
|
from flask import Blueprint, jsonify
|
|
|
|
from core.enrichment.services import EnrichmentService, get_service
|
|
from utils.logging_config import get_logger
|
|
|
|
|
|
logger = get_logger("enrichment.api")
|
|
|
|
|
|
# Hooks the host wires up so the blueprint can persist pause state and
|
|
# clean up auto-pause / yield-override sets without circular imports.
|
|
_config_set: Optional[Callable[[str, Any], None]] = None
|
|
_auto_paused_discard: Optional[Callable[[str], None]] = None
|
|
_yield_override_add: Optional[Callable[[str], None]] = None
|
|
|
|
|
|
def configure(
|
|
*,
|
|
config_set: Optional[Callable[[str, Any], None]] = None,
|
|
auto_paused_discard: Optional[Callable[[str], None]] = None,
|
|
yield_override_add: Optional[Callable[[str], None]] = None,
|
|
) -> None:
|
|
"""Wire host-side mutators that the generic routes call after pause/resume.
|
|
|
|
Each is optional — pass None for hosts that don't have a corresponding
|
|
mechanism (e.g. tests).
|
|
"""
|
|
global _config_set, _auto_paused_discard, _yield_override_add
|
|
_config_set = config_set
|
|
_auto_paused_discard = auto_paused_discard
|
|
_yield_override_add = yield_override_add
|
|
|
|
|
|
def _persist_paused(service: EnrichmentService, paused: bool) -> None:
|
|
if not service.config_paused_key or _config_set is None:
|
|
return
|
|
try:
|
|
_config_set(service.config_paused_key, paused)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Persisting pause flag for %s failed: %s", service.id, e
|
|
)
|
|
|
|
|
|
def _drop_auto_pause_marker(service: EnrichmentService) -> None:
|
|
if service.auto_pause_token is None or _auto_paused_discard is None:
|
|
return
|
|
try:
|
|
_auto_paused_discard(service.auto_pause_token)
|
|
except Exception as e:
|
|
logger.debug("auto-pause marker discard: %s", e)
|
|
|
|
|
|
def _add_yield_override(service: EnrichmentService) -> None:
|
|
if service.auto_pause_token is None or _yield_override_add is None:
|
|
return
|
|
try:
|
|
_yield_override_add(service.auto_pause_token)
|
|
except Exception as e:
|
|
logger.debug("yield override add: %s", e)
|
|
|
|
|
|
def create_blueprint() -> Blueprint:
|
|
"""Build the Flask blueprint — call once during host startup."""
|
|
bp = Blueprint('enrichment_api', __name__)
|
|
|
|
@bp.route('/api/enrichment/<service_id>/status', methods=['GET'])
|
|
def enrichment_status(service_id: str):
|
|
service = get_service(service_id)
|
|
if service is None:
|
|
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
|
try:
|
|
worker = service.get_worker()
|
|
if worker is None:
|
|
return jsonify(service.fallback_status()), 200
|
|
return jsonify(worker.get_stats()), 200
|
|
except Exception as e:
|
|
logger.error("Error getting %s enrichment status: %s", service.id, e)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@bp.route('/api/enrichment/<service_id>/pause', methods=['POST'])
|
|
def enrichment_pause(service_id: str):
|
|
service = get_service(service_id)
|
|
if service is None:
|
|
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
|
worker = service.get_worker()
|
|
if worker is None:
|
|
return jsonify({
|
|
'error': f'{service.display_name} enrichment worker not initialized',
|
|
}), 400
|
|
try:
|
|
worker.pause()
|
|
_persist_paused(service, True)
|
|
_drop_auto_pause_marker(service)
|
|
logger.info("%s worker paused via UI", service.display_name)
|
|
return jsonify({'status': 'paused'}), 200
|
|
except Exception as e:
|
|
logger.error("Error pausing %s worker: %s", service.id, e)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@bp.route('/api/enrichment/<service_id>/resume', methods=['POST'])
|
|
def enrichment_resume(service_id: str):
|
|
service = get_service(service_id)
|
|
if service is None:
|
|
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
|
worker = service.get_worker()
|
|
if worker is None:
|
|
return jsonify({
|
|
'error': f'{service.display_name} enrichment worker not initialized',
|
|
}), 400
|
|
# Pre-resume guard (e.g. Spotify rate-limit ban). Returns
|
|
# (http_status, error_message) when blocking, None when ok.
|
|
if service.pre_resume_check is not None:
|
|
try:
|
|
blocked = service.pre_resume_check()
|
|
except Exception as e:
|
|
logger.error("Pre-resume check for %s raised: %s", service.id, e)
|
|
blocked = None
|
|
if blocked is not None:
|
|
http_status, message = blocked
|
|
payload: dict = {'error': message}
|
|
if http_status == 429:
|
|
payload['rate_limited'] = True
|
|
return jsonify(payload), http_status
|
|
try:
|
|
worker.resume()
|
|
_persist_paused(service, False)
|
|
_drop_auto_pause_marker(service)
|
|
_add_yield_override(service)
|
|
logger.info("%s worker resumed via UI", service.display_name)
|
|
return jsonify({'status': 'running'}), 200
|
|
except Exception as e:
|
|
logger.error("Error resuming %s worker: %s", service.id, e)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
return bp
|