You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/tests/test_create_finding_dedup_c...

302 lines
11 KiB

"""Pin the bug fix where `_create_finding` silently dedup-skipped while
the caller's `findings_created` counter incremented anyway, causing
the maintenance job badge to inflate (e.g. "364 findings" displayed
when 0 new pending rows existed in the DB).
Now `_create_finding` returns:
- True — a NEW pending row was inserted.
- False — dedup-skipped (an equivalent row already exists with status
pending/resolved/dismissed) OR a DB error occurred.
Callers must only increment `findings_created` on True. Skipped-dedup
counter exposed separately as `findings_skipped_dedup` for log
transparency.
"""
from __future__ import annotations
import os
import sqlite3
import tempfile
from unittest.mock import MagicMock
import pytest
from core.repair_jobs.base import JobResult
from core.repair_worker import RepairWorker
@pytest.fixture
def repair_worker_with_temp_db():
"""A RepairWorker wired to a temporary SQLite db with the
`repair_findings` table created. Returns the worker; tears the db
down after the test."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
conn = sqlite3.connect(path)
conn.execute("""
CREATE TABLE repair_findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
finding_type TEXT NOT NULL,
severity TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
entity_type TEXT,
entity_id TEXT,
file_path TEXT,
title TEXT,
description TEXT,
details_json TEXT,
user_action TEXT,
resolved_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
db_mock = MagicMock()
db_mock._get_connection = lambda: sqlite3.connect(path)
worker = RepairWorker.__new__(RepairWorker)
worker.db = db_mock
yield worker, path
try:
os.remove(path)
except OSError:
pass
def test_create_finding_returns_true_on_first_insert(repair_worker_with_temp_db):
worker, _ = repair_worker_with_temp_db
result = worker._create_finding(
job_id='dup_detector',
finding_type='duplicate_tracks',
severity='info',
entity_type='track',
entity_id='123',
file_path='/foo/bar.mp3',
title='Test finding',
description='desc',
)
assert result is True
def test_create_finding_returns_false_on_dedup_pending(repair_worker_with_temp_db):
worker, _ = repair_worker_with_temp_db
# First insert
worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
# Re-call with same args — should dedup
result = worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
assert result is False
def test_create_finding_returns_false_on_dedup_dismissed(repair_worker_with_temp_db):
"""The user dismissed a finding in a prior session. A new scan
that re-discovers the same issue must NOT increment the badge —
the row exists with status='dismissed'."""
worker, path = repair_worker_with_temp_db
# Seed the DB with a dismissed finding
conn = sqlite3.connect(path)
conn.execute("""
INSERT INTO repair_findings
(job_id, finding_type, severity, status, entity_type, entity_id,
file_path, title, description)
VALUES (?, ?, 'info', 'dismissed', 'track', '123', '/foo/bar.mp3', 'T', 'D')
""", ('dup_detector', 'duplicate_tracks'))
conn.commit()
conn.close()
result = worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
assert result is False
def test_create_finding_returns_false_on_dedup_resolved(repair_worker_with_temp_db):
"""An auto-fix or manual repair previously resolved a finding.
Re-discovery of the same issue should NOT inflate the badge."""
worker, path = repair_worker_with_temp_db
conn = sqlite3.connect(path)
conn.execute("""
INSERT INTO repair_findings
(job_id, finding_type, severity, status, entity_type, entity_id,
file_path, title, description)
VALUES (?, ?, 'info', 'resolved', 'track', '123', '/foo/bar.mp3', 'T', 'D')
""", ('dup_detector', 'duplicate_tracks'))
conn.commit()
conn.close()
result = worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
assert result is False
def test_create_finding_inserts_again_when_distinct_entity(repair_worker_with_temp_db):
"""A different entity (different track id) is a NEW finding —
must NOT be dedup-blocked by an unrelated finding's existence."""
worker, _ = repair_worker_with_temp_db
worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
result = worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='456',
file_path='/foo/baz.mp3', title='T', description='D',
)
assert result is True
# ---------------------------------------------------------------------------
# JobResult — findings_skipped_dedup field is exposed
# ---------------------------------------------------------------------------
def test_job_result_has_skipped_dedup_field():
result = JobResult()
assert result.findings_skipped_dedup == 0
result.findings_skipped_dedup += 1
assert result.findings_skipped_dedup == 1
# ---------------------------------------------------------------------------
# End-to-end pattern: caller counts only true inserts
# ---------------------------------------------------------------------------
def test_caller_pattern_counts_only_real_inserts(repair_worker_with_temp_db):
"""Simulate a job loop calling create_finding 5 times for the
same finding. Only the FIRST should count toward findings_created;
the remaining 4 should count toward findings_skipped_dedup. Badge
must reflect 1, not 5."""
worker, _ = repair_worker_with_temp_db
result = JobResult()
for _ in range(5):
inserted = worker._create_finding(
job_id='dup_detector', finding_type='duplicate_tracks',
severity='info', entity_type='track', entity_id='123',
file_path='/foo/bar.mp3', title='T', description='D',
)
if inserted:
result.findings_created += 1
else:
result.findings_skipped_dedup += 1
assert result.findings_created == 1
assert result.findings_skipped_dedup == 4
# ---------------------------------------------------------------------------
# _get_pending_count_by_job — feeds the job-card "X pending" badge
# ---------------------------------------------------------------------------
def test_get_pending_count_by_job_returns_per_job_dict(repair_worker_with_temp_db):
"""Pin: helper returns ``{job_id: pending_count}`` based on rows
with status='pending' only. Job-card badge uses this so it shows
CURRENT pending state instead of the historical
``last_run.findings_created`` (which inflates after a bulk-fix
moves all findings to status='resolved')."""
worker, path = repair_worker_with_temp_db
conn = sqlite3.connect(path)
cur = conn.cursor()
# 3 pending duplicate_tracks for dup_detector
cur.executemany("""
INSERT INTO repair_findings (job_id, finding_type, severity, status,
entity_type, entity_id, file_path, title, description)
VALUES (?, 'duplicate_tracks', 'info', 'pending', 'track', ?, ?, 'T', 'D')
""", [
('dup_detector', '1', '/a'),
('dup_detector', '2', '/b'),
('dup_detector', '3', '/c'),
])
# 2 pending missing_cover_art for cover_art_filler
cur.executemany("""
INSERT INTO repair_findings (job_id, finding_type, severity, status,
entity_type, entity_id, file_path, title, description)
VALUES ('cover_art_filler', 'missing_cover_art', 'info', 'pending',
'album', ?, ?, 'T', 'D')
""", [('10', '/x'), ('11', '/y')])
# 1 RESOLVED finding — must NOT be counted
cur.execute("""
INSERT INTO repair_findings (job_id, finding_type, severity, status,
entity_type, entity_id, file_path, title, description)
VALUES ('dup_detector', 'duplicate_tracks', 'info', 'resolved',
'track', '99', '/z', 'T', 'D')
""")
# 1 DISMISSED finding — must NOT be counted
cur.execute("""
INSERT INTO repair_findings (job_id, finding_type, severity, status,
entity_type, entity_id, file_path, title, description)
VALUES ('cover_art_filler', 'missing_cover_art', 'info', 'dismissed',
'album', '88', '/w', 'T', 'D')
""")
conn.commit()
conn.close()
result = worker._get_pending_count_by_job()
assert result == {
'dup_detector': 3,
'cover_art_filler': 2,
}
def test_get_pending_count_by_job_returns_empty_when_no_pending(repair_worker_with_temp_db):
"""Pin: jobs with zero pending findings are absent from the dict
(caller handles missing key as 0)."""
worker, path = repair_worker_with_temp_db
conn = sqlite3.connect(path)
cur = conn.cursor()
cur.execute("""
INSERT INTO repair_findings (job_id, finding_type, severity, status,
entity_type, entity_id, file_path, title, description)
VALUES ('dup_detector', 'duplicate_tracks', 'info', 'resolved',
'track', '1', '/a', 'T', 'D')
""")
conn.commit()
conn.close()
result = worker._get_pending_count_by_job()
assert result == {}
def test_get_pending_count_by_job_handles_db_error_gracefully(repair_worker_with_temp_db):
"""Pin: any DB exception → returns ``{}``. Job-card badge falls
back to historical count via the ``or 0`` safety in JS."""
worker, _ = repair_worker_with_temp_db
# Replace _get_connection with one that raises.
def _raise():
raise sqlite3.OperationalError("simulated")
worker.db._get_connection = _raise
result = worker._get_pending_count_by_job()
assert result == {}