diff --git a/scripts/README.md b/scripts/README.md index 45d1c3418..897520b52 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -1,79 +1,236 @@ -### Scripts description +# StackExchange Posts Processor -This is a set of example scripts to show the capabilities of the RESTAPI interface and how to interface with it. +A comprehensive script to extract, process, and index StackExchange posts for search capabilities. -### Prepare ProxySQL +## Features -1. Launch ProxySQL: +- āœ… **Complete Pipeline**: Extracts parent posts and replies from source database +- šŸ“Š **Search Ready**: Creates full-text search indexes and processed text columns +- šŸš€ **Efficient**: Batch processing with memory optimization +- šŸ” **Duplicate Prevention**: Skip already processed posts +- šŸ“ˆ **Progress Tracking**: Real-time statistics and performance metrics +- šŸ”§ **Flexible**: Configurable source/target databases +- šŸ“ **Rich Output**: Structured JSON with tags and metadata +## Database Schema + +The script creates a comprehensive target table with these columns: + +```sql +processed_posts ( + PostId BIGINT PRIMARY KEY, + JsonData JSON NOT NULL, -- Complete post data + Embeddings BLOB NULL, -- For future ML embeddings + SearchText LONGTEXT NULL, -- Combined text for search + TitleText VARCHAR(1000) NULL, -- Cleaned title + BodyText LONGTEXT NULL, -- Cleaned body + RepliesText LONGTEXT NULL, -- Combined replies + Tags JSON NULL, -- Extracted tags + CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UpdatedAt TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + -- Indexes + KEY idx_created_at (CreatedAt), + KEY idx_tags ((CAST(Tags AS CHAR(1000)))), -- JSON tag index + FULLTEXT INDEX ft_search (SearchText, TitleText, BodyText, RepliesText) +) +``` + +## Usage + +### Basic Usage + +```bash +# Process first 1000 posts +python3 stackexchange_posts.py --limit 1000 + +# Process with custom batch size +python3 stackexchange_posts.py --limit 10000 --batch-size 500 + +# Don't skip duplicates (process all posts) +python3 stackexchange_posts.py --limit 1000 --no-skip-duplicates +``` + +### Advanced Configuration + +```bash +# Custom database connections +python3 stackexchange_posts.py \ + --source-host 192.168.1.100 \ + --source-port 3307 \ + --source-user myuser \ + --source-password mypass \ + --source-db my_stackexchange \ + --target-host 192.168.1.200 \ + --target-port 3306 \ + --target-user search_user \ + --target-password search_pass \ + --target-db search_db \ + --limit 50000 \ + --batch-size 1000 +``` + +## Search Examples + +Once processed, you can search the data using: + +### 1. MySQL Full-Text Search + +```sql +-- Basic search +SELECT PostId, Title +FROM processed_posts +WHERE MATCH(SearchText) AGAINST('mysql optimization' IN BOOLEAN MODE) +ORDER BY relevance DESC; + +-- Boolean search operators +SELECT PostId, Title +FROM processed_posts +WHERE MATCH(SearchText) AGAINST('+database -oracle' IN BOOLEAN MODE); + +-- Proximity search +SELECT PostId, Title +FROM processed_posts +WHERE MATCH(SearchText) AGAINST('"database performance"~5' IN BOOLEAN MODE); ``` -./proxysql -M --sqlite3-server --idle-threads -f -c $PROXYSQL_PATH/scripts/datadir/proxysql.cnf -D $PROXYSQL_PATH/scripts/datadir + +### 2. Tag-based Search + +```sql +-- Search by specific tags +SELECT PostId, Title +FROM processed_posts +WHERE JSON_CONTAINS(Tags, '"mysql"') AND JSON_CONTAINS(Tags, '"performance"'); +``` + +### 3. Filtered Search + +```sql +-- Search within date range +SELECT PostId, Title, JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate +FROM processed_posts +WHERE MATCH(SearchText) AGAINST('python' IN BOOLEAN MODE) +AND JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) BETWEEN '2023-01-01' AND '2023-12-31'; +``` + +## Performance Tips + +1. **Batch Size**: Use larger batches (1000-5000) for better throughput +2. **Memory**: Adjust batch size based on available memory +3. **Indexes**: The script automatically creates necessary indexes +4. **Parallel Processing**: Consider running multiple instances with different offset ranges + +## Output Example + ``` +šŸš€ StackExchange Posts Processor +================================================== +Source: 127.0.0.1:3306/stackexchange +Target: 127.0.0.1:3306/stackexchange_post +Limit: 1000 posts +Batch size: 100 +Skip duplicates: True +================================================== + +āœ… Connected to source and target databases +āœ… Target table created successfully with all search columns -2. Configure ProxySQL: +šŸ”„ Processing batch 1 - posts 1 to 100 + ā­ļø Skipping 23 duplicate posts + šŸ“ Processing 77 posts... + šŸ“Š Batch inserted 77 posts + ā±ļø Progress: 100/1000 posts (10.0%) + šŸ“ˆ Total processed: 77, Inserted: 77, Skipped: 23 + ⚔ Rate: 12.3 posts/sec +šŸŽ‰ Processing complete! + šŸ“Š Total batches: 10 + šŸ“ Total processed: 800 + āœ… Total inserted: 800 + ā­ļø Total skipped: 200 + ā±ļø Total time: 45.2 seconds + šŸš€ Average rate: 17.7 posts/sec + +āœ… Processing completed successfully! ``` -cd $RESTAPI_EXAMPLES_DIR -./proxysql_config.sh + +## Troubleshooting + +### Common Issues + +1. **Table Creation Failed**: Check database permissions +2. **Memory Issues**: Reduce batch size +3. **Slow Performance**: Optimize MySQL configuration +4. **Connection Errors**: Verify database credentials + +### Maintenance + +```sql +-- Check table status +SHOW TABLE STATUS LIKE 'processed_posts'; + +-- Rebuild full-text index +ALTER TABLE processed_posts DROP INDEX ft_search, + ADD FULLTEXT INDEX ft_search (SearchText, TitleText, BodyText, RepliesText); + +-- Count processed posts +SELECT COUNT(*) FROM processed_posts; ``` -3. Install requirements +## Requirements + +- Python 3.7+ +- mysql-connector-python +- MySQL 5.7+ (for JSON and full-text support) +Install dependencies: +```bash +pip install mysql-connector-python ``` -cd $RESTAPI_EXAMPLES_DIR/requirements -./install_requirements.sh + +## Other Scripts + +The `scripts/` directory also contains other utility scripts: + +- `nlp_search_demo.py` - Demonstrate various search techniques on processed posts: + - Full-text search with MySQL + - Boolean search with operators + - Tag-based JSON queries + - Combined search approaches + - Statistics and search analytics + - Data preparation for future semantic search + +- `add_mysql_user.sh` - Add/replace MySQL users in ProxySQL +- `change_host_status.sh` - Change host status in ProxySQL +- `flush_query_cache.sh` - Flush ProxySQL query cache +- `kill_idle_backend_conns.py` - Kill idle backend connections +- `proxysql_config.sh` - Configure ProxySQL settings +- `stats_scrapper.py` - Scrape statistics from ProxySQL + +## Search Examples + +### Using the NLP Search Demo + +```bash +# Show search statistics +python3 nlp_search_demo.py --mode stats + +# Full-text search +python3 nlp_search_demo.py --mode full-text --query "mysql performance optimization" + +# Boolean search with operators +python3 nlp_search_demo.py --mode boolean --query "+database -oracle" + +# Search by tags +python3 nlp_search_demo.py --mode tags --tags mysql performance --operator AND + +# Combined search with text and tags +python3 nlp_search_demo.py --mode combined --query "python optimization" --tags python + +# Prepare data for semantic search +python3 nlp_search_demo.py --mode similarity --query "machine learning" ``` -### Query the endpoints - -1. Flush Query Cache: `curl -i -X GET http://localhost:6070/sync/flush_query_cache` -2. Change host status: - - Assuming local ProxySQL: - ``` - curl -i -X POST -d '{ "hostgroup_id": "0", "hostname": "127.0.0.1", "port": 13306, "status": "OFFLINE_HARD" }' http://localhost:6070/sync/change_host_status - ``` - - Specifying server: - ``` - curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": "6032", "admin_user": "radmin", "admin_pass": "radmin", "hostgroup_id": "0", "hostname": "127.0.0.1", "port": 13306, "status": "OFFLINE_HARD" }' http://localhost:6070/sync/change_host_status - ``` -2. Add or replace MySQL user: - - Assuming local ProxySQL: - ``` - curl -i -X POST -d '{ "user": "sbtest1", "pass": "sbtest1" }' http://localhost:6070/sync/add_mysql_user - ``` - - Add user and load to runtime (Assuming local instance): - ``` - curl -i -X POST -d '{ "user": "sbtest1", "pass": "sbtest1", "to_runtime": 1 }' http://localhost:6070/sync/add_mysql_user - ``` - - Specifying server: - ``` - curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": "6032", "admin_user": "radmin", "admin_pass": "radmin", "user": "sbtest1", "pass": "sbtest1" }' http://localhost:6070/sync/add_mysql_user - ``` -3. Kill idle backend connections: - - Assuming local ProxySQL: - ``` - curl -i -X POST -d '{ "timeout": 10 }' http://localhost:6070/sync/kill_idle_backend_conns - ``` - - Specifying server: - ``` - curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "timeout": 10 }' http://localhost:6070/sync/kill_idle_backend_conns - ``` -4. Scrap tables from 'stats' schema: - - Assuming local ProxySQL: - ``` - curl -i -X POST -d '{ "table": "stats_mysql_users" }' http://localhost:6070/sync/scrap_stats - ``` - - Specifying server: - ``` - curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "table": "stats_mysql_users" }' http://localhost:6070/sync/scrap_stats - ``` - - Provoke script failure (non-existing table): - ``` - curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "table": "stats_mysql_servers" }' http://localhost:6070/sync/scrap_stats - ``` - -### Scripts doc - -- All scripts allows to perform the target operations on a local or remote ProxySQL instance. -- Notice how the unique 'GET' request is for 'QUERY CACHE' flushing, since it doesn't require any parameters. -- Script 'stats_scrapper.py' fails when a table that isn't present in 'stats' schema is queried. This is left as an example of the behavior of a failing script and ProxySQL log output. +## License + +Internal use only. diff --git a/scripts/nlp_search_demo.py b/scripts/nlp_search_demo.py new file mode 100755 index 000000000..234b87f44 --- /dev/null +++ b/scripts/nlp_search_demo.py @@ -0,0 +1,554 @@ +#!/usr/bin/env python3 +""" +NLP Search Demo for StackExchange Posts + +Demonstrates various search techniques on processed posts: +- Full-text search with MySQL +- Boolean search with operators +- Tag-based JSON queries +- Combined search approaches +- Statistics and search analytics +- Data preparation for future semantic search +""" + +import mysql.connector +from mysql.connector import Error, OperationalError +import json +import re +import html +from typing import List, Dict, Any, Set, Tuple +import argparse +import time +import sys +import os + + +class NLPSearchDemo: + def __init__(self, config: Dict[str, Any]): + self.config = config + self.stop_words = { + 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', + 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', + 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those', + 'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'its', 'our', 'their' + } + + def connect(self): + """Create database connection.""" + try: + conn = mysql.connector.connect(**self.config) + print("āœ… Connected to database") + return conn + except Error as e: + print(f"āŒ Connection error: {e}") + return None + + def get_table_stats(self, conn): + """Get statistics about the processed_posts table.""" + cursor = conn.cursor(dictionary=True) + + try: + # Basic table stats + cursor.execute("SELECT COUNT(*) as total_posts FROM processed_posts") + total_posts = cursor.fetchone()['total_posts'] + + cursor.execute("SELECT COUNT(*) as posts_with_tags FROM processed_posts WHERE Tags IS NOT NULL AND Tags != '[]'") + posts_with_tags = cursor.fetchone()['posts_with_tags'] + + cursor.execute("SELECT MIN(JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate'))) as earliest, " + "MAX(JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate'))) as latest " + "FROM processed_posts") + date_range = cursor.fetchone() + + # Get unique tags + cursor.execute(""" + SELECT DISTINCT Tags + FROM processed_posts + WHERE Tags IS NOT NULL AND Tags != '[]' + LIMIT 1000 + """) + tags_data = cursor.fetchall() + + # Extract all unique tags + all_tags = set() + for row in tags_data: + if row['Tags']: + try: + tags_list = json.loads(row['Tags']) + all_tags.update(tags_list) + except: + pass + + print(f"\nšŸ“Š Table Statistics:") + print(f" Total posts: {total_posts:,}") + if total_posts > 0: + print(f" Posts with tags: {posts_with_tags:,} ({posts_with_tags/total_posts*100:.1f}%)") + else: + print(f" Posts with tags: {posts_with_tags:,}") + print(f" Date range: {date_range['earliest'][:10]} to {date_range['latest'][:10]}") + print(f" Unique tags: {len(all_tags):,}") + + if all_tags: + print(f" Top tags: {', '.join(sorted(list(all_tags))[:20])}") + + except Error as e: + print(f"āŒ Error getting stats: {e}") + finally: + cursor.close() + + def full_text_search(self, conn, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Perform full-text search with MySQL.""" + cursor = conn.cursor(dictionary=True) + + start_time = time.time() + try: + sql = """ + SELECT PostId, TitleText, MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE) as relevance + FROM processed_posts + WHERE MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE) + ORDER BY relevance DESC, CreatedAt DESC LIMIT %s + """ + cursor.execute(sql, (query, query, limit)) + results = cursor.fetchall() + search_method = "full-text" + except Error: + sql = """ + SELECT PostId, TitleText, CreatedAt + FROM processed_posts + WHERE SearchText LIKE %s OR TitleText LIKE %s OR BodyText LIKE %s + ORDER BY CreatedAt DESC LIMIT %s + """ + search_term = f"%{query}%" + cursor.execute(sql, (search_term, search_term, search_term, limit)) + results = cursor.fetchall() + search_method = "LIKE" + + elapsed = time.time() - start_time + + print(f"šŸ” {search_method.title()} search for '{query}' ({elapsed:.3f}s):") + for i, row in enumerate(results, 1): + print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...") + + print(f"šŸ“Š Found {len(results)} results in {elapsed:.3f} seconds") + return results + + def boolean_search(self, conn, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Perform boolean search with operators.""" + cursor = conn.cursor(dictionary=True) + start_time = time.time() + + try: + # Try boolean mode first + sql = """ + SELECT PostId, TitleText, + MATCH(SearchText) AGAINST(%s IN BOOLEAN MODE) as relevance + FROM processed_posts + WHERE MATCH(SearchText) AGAINST(%s IN BOOLEAN MODE) + ORDER BY relevance DESC, CreatedAt DESC LIMIT %s + """ + cursor.execute(sql, (query, query, limit)) + results = cursor.fetchall() + search_method = "boolean" + except Error: + # Fallback to LIKE search + sql = """ + SELECT PostId, TitleText, CreatedAt + FROM processed_posts + WHERE SearchText LIKE %s + ORDER BY CreatedAt DESC LIMIT %s + """ + search_term = f"%{query}%" + cursor.execute(sql, (search_term, limit)) + results = cursor.fetchall() + search_method = "LIKE" + + elapsed = time.time() - start_time + + print(f"šŸ” Boolean search for '{query}' ({elapsed:.3f}s):") + for i, row in enumerate(results, 1): + print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...") + + print(f"šŸ“Š Found {len(results)} results in {elapsed:.3f} seconds") + return results + + def tag_search(self, conn, tags: List[str], operator: str = "AND", limit: int = 10) -> List[Dict[str, Any]]: + """Search by tags using JSON functions.""" + cursor = conn.cursor(dictionary=True) + + try: + # Build JSON_CONTAINS conditions + conditions = [] + params = [] + + for tag in tags: + conditions.append(f"JSON_CONTAINS(Tags, %s)") + params.append(f'"{tag}"') + + if operator.upper() == "AND": + where_clause = " AND ".join(conditions) + else: # OR + where_clause = " OR ".join(conditions) + + sql = f""" + SELECT + PostId, + TitleText, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson, + CreatedAt + FROM processed_posts + WHERE {where_clause} + ORDER BY CreatedAt DESC + LIMIT %s + """ + + start_time = time.time() + cursor.execute(sql, params + [limit]) + results = cursor.fetchall() + search_method = "JSON_CONTAINS" + + elapsed = time.time() - start_time + + tag_str = " AND ".join(tags) if operator == "AND" else " OR ".join(tags) + print(f"šŸ·ļø Tag search for {tag_str} ({elapsed:.3f}s):") + for i, row in enumerate(results, 1): + found_tags = json.loads(row['TagsJson']) if row['TagsJson'] else [] + print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...") + print(f" All tags: {', '.join(found_tags[:5])}{'...' if len(found_tags) > 5 else ''}") + print() + + print(f"šŸ“Š Found {len(results)} results in {elapsed:.3f} seconds") + return results + + except Error as e: + print(f"āŒ Tag search error: {e}") + return [] + finally: + cursor.close() + + def combined_search(self, conn, search_term: str = None, tags: List[str] = None, + date_from: str = None, date_to: str = None, limit: int = 10) -> List[Dict[str, Any]]: + """Combined search with full-text, tags, and date filtering.""" + cursor = conn.cursor(dictionary=True) + + try: + conditions = [] + params = [] + + # Full-text search condition + if search_term: + conditions.append("MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE)") + params.append(search_term) + + # Tag conditions + if tags: + for tag in tags: + conditions.append("JSON_CONTAINS(Tags, %s)") + params.append(f'"{tag}"') + + # Date conditions + if date_from: + conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) >= %s") + params.append(date_from) + + if date_to: + conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) <= %s") + params.append(date_to) + + # Build WHERE clause + where_clause = " AND ".join(conditions) if conditions else "1=1" + + # Build SELECT clause dynamically - only include relevance if search_term is provided + if search_term: + select_clause = """ + SELECT + PostId, + TitleText, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson, + MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE) as relevance, + CreatedAt + """ + order_clause = "ORDER BY relevance DESC, CreatedAt DESC" + # Add search_term again for the SELECT clause's MATCH + fulltext_params = [search_term] + params + [limit] + else: + select_clause = """ + SELECT + PostId, + TitleText, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson, + CreatedAt + """ + order_clause = "ORDER BY CreatedAt DESC" + fulltext_params = params + [limit] + + sql = f""" + {select_clause} + FROM processed_posts + WHERE {where_clause} + {order_clause} + LIMIT %s + """ + + start_time = time.time() + + try: + # First try full-text search + cursor.execute(sql, fulltext_params) + results = cursor.fetchall() + search_method = "combined" + except Error: + # Fallback to LIKE search + conditions = [] + like_params = [] + + # Add search term condition + if search_term: + conditions.append("(SearchText LIKE %s OR TitleText LIKE %s OR BodyText LIKE %s)") + like_params.extend([f"%{search_term}%"] * 3) + + # Add tag conditions + if tags: + for tag in tags: + conditions.append("JSON_CONTAINS(Tags, %s)") + like_params.append(f'"{tag}"') + + # Add date conditions + if date_from: + conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) >= %s") + like_params.append(date_from) + + if date_to: + conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) <= %s") + like_params.append(date_to) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + like_sql = f""" + SELECT + PostId, + TitleText, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson, + CreatedAt + FROM processed_posts + WHERE {where_clause} + ORDER BY CreatedAt DESC + LIMIT %s + """ + + like_params.append(limit) + cursor.execute(like_sql, like_params) + results = cursor.fetchall() + search_method = "LIKE" + + elapsed = time.time() - start_time + + print(f"šŸ” {search_method.title()} search ({elapsed:.3f}s):") + print(f" Search term: {search_term or 'None'}") + print(f" Tags: {tags or 'None'}") + print(f" Date range: {date_from or 'beginning'} to {date_to or 'end'}") + print() + + for i, row in enumerate(results, 1): + found_tags = json.loads(row['TagsJson']) if row['TagsJson'] else [] + relevance = row.get('relevance', 0.0) if search_method == "combined" else "N/A" + + print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...") + print(f" Tags: {', '.join(found_tags[:3])}{'...' if len(found_tags) > 3 else ''}") + print(f" Created: {row['CreationDate']}") + if search_method == "combined": + print(f" Relevance: {relevance:.3f}") + print() + + print(f"šŸ“Š Found {len(results)} results in {elapsed:.3f} seconds") + return results + + except Error as e: + print(f"āŒ Combined search error: {e}") + return [] + finally: + cursor.close() + + def similarity_search_preparation(self, conn, query: str, limit: int = 20) -> List[Dict[str, Any]]: + """Prepare data for future semantic search by extracting relevant terms.""" + cursor = conn.cursor(dictionary=True) + + try: + # Search and return results with text content for future embedding generation + sql = """ + SELECT + PostId, + TitleText, + BodyText, + RepliesText, + JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson + FROM processed_posts + WHERE SearchText LIKE %s + ORDER BY CreatedAt DESC + LIMIT %s + """ + + search_term = f"%{query}%" + cursor.execute(sql, (search_term, limit)) + results = cursor.fetchall() + + print(f"šŸ” Preparation for semantic search on '{query}':") + print(f" Found {len(results)} relevant posts") + + # Extract text for future embeddings + all_text = [] + for row in results: + title = row['TitleText'] or '' + body = row['BodyText'] or '' + replies = row['RepliesText'] or '' + combined = f"{title} {body} {replies}".strip() + if combined: + all_text.append(combined) + + print(f" Total text length: {sum(len(text) for text in all_text):,} characters") + if all_text: + print(f" Average text length: {sum(len(text) for text in all_text) / len(all_text):,.0f} characters") + + return results + + except Error as e: + print(f"āŒ Similarity search preparation error: {e}") + return [] + finally: + cursor.close() + + def run_demo(self, mode: str = "stats", **kwargs): + """Run the search demo with specified mode.""" + conn = self.connect() + if not conn: + return + + try: + if mode == "stats": + self.get_table_stats(conn) + elif mode == "full-text": + query = kwargs.get('query', '') + limit = kwargs.get('limit', 10) + self.full_text_search(conn, query, limit) + elif mode == "boolean": + query = kwargs.get('query', '') + limit = kwargs.get('limit', 10) + self.boolean_search(conn, query, limit) + elif mode == "tags": + tags = kwargs.get('tags', []) + operator = kwargs.get('operator', 'AND') + limit = kwargs.get('limit', 10) + self.tag_search(conn, tags, operator, limit) + elif mode == "combined": + search_term = kwargs.get('query', None) + tags = kwargs.get('tags', None) + date_from = kwargs.get('date_from', None) + date_to = kwargs.get('date_to', None) + limit = kwargs.get('limit', 10) + self.combined_search(conn, search_term, tags, date_from, date_to, limit) + elif mode == "similarity": + query = kwargs.get('query', '') + limit = kwargs.get('limit', 20) + self.similarity_search_preparation(conn, query, limit) + else: + print(f"āŒ Unknown mode: {mode}") + print("Available modes: stats, full-text, boolean, tags, combined, similarity") + finally: + if conn and conn.is_connected(): + conn.close() + + +def main(): + # Default configuration (can be overridden by environment variables) + config = { + "host": os.getenv("DB_HOST", "127.0.0.1"), + "port": int(os.getenv("DB_PORT", "3306")), + "user": os.getenv("DB_USER", "stackexchange"), + "password": os.getenv("DB_PASSWORD", "my-password"), + "database": os.getenv("DB_NAME", "stackexchange_post"), + "use_pure": True, + "ssl_disabled": True + } + + parser = argparse.ArgumentParser(description="NLP Search Demo for StackExchange Posts") + + parser.add_argument("--host", default=config['host'], help="Database host") + parser.add_argument("--port", type=int, default=config['port'], help="Database port") + parser.add_argument("--user", default=config['user'], help="Database user") + parser.add_argument("--password", default=config['password'], help="Database password") + parser.add_argument("--database", default=config['database'], help="Database name") + + parser.add_argument("--mode", default="stats", + choices=["stats", "full-text", "boolean", "tags", "combined", "similarity"], + help="Search mode to demonstrate") + + parser.add_argument("--limit", type=int, default=10, help="Number of results to return") + parser.add_argument("--operator", default="AND", choices=["AND", "OR"], help="Tag operator") + + parser.add_argument("--query", help="Search query for text-based searches") + parser.add_argument("--tags", nargs='+', help="Tags to search for") + parser.add_argument("--date-from", help="Start date (YYYY-MM-DD)") + parser.add_argument("--date-to", help="End date (YYYY-MM-DD)") + + parser.add_argument("--stats", action="store_true", help="Show table statistics") + parser.add_argument("--verbose", action="store_true", help="Show detailed output") + + args = parser.parse_args() + + # Override configuration with command line arguments + config.update({ + "host": args.host, + "port": args.port, + "user": args.user, + "password": args.password, + "database": args.database + }) + + # Handle legacy --stats flag + if args.stats: + args.mode = "stats" + + print("šŸ” NLP Search Demo for StackExchange Posts") + print("=" * 50) + print(f"Database: {config['host']}:{config['port']}/{config['database']}") + print(f"Mode: {args.mode}") + print("=" * 50) + + # Create demo instance and run + demo = NLPSearchDemo(config) + + # Prepare kwargs based on mode + kwargs = { + 'limit': args.limit, + 'operator': args.operator, + 'query': args.query, + 'tags': args.tags, + 'date_from': args.date_from, + 'date_to': args.date_to + } + + # Remove None values + kwargs = {k: v for k, v in kwargs.items() if v is not None} + + # If mode is text-based and no query provided, use the mode as query + if args.mode in ["full-text", "boolean", "similarity"] and not args.query: + # For compatibility with command-line usage like: python3 script.py --full-text "mysql optimization" + if len(sys.argv) > 2 and sys.argv[1] == "--mode" and len(sys.argv) > 4: + # Find the actual query after the mode + mode_index = sys.argv.index("--mode") + if mode_index + 2 < len(sys.argv): + query_index = mode_index + 2 + query_parts = [] + while query_index < len(sys.argv) and not sys.argv[query_index].startswith("--"): + query_parts.append(sys.argv[query_index]) + query_index += 1 + if query_parts: + kwargs['query'] = ' '.join(query_parts) + + demo.run_demo(args.mode, **kwargs) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/stackexchange_posts.py b/scripts/stackexchange_posts.py new file mode 100755 index 000000000..70584e0a2 --- /dev/null +++ b/scripts/stackexchange_posts.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 +""" +Comprehensive StackExchange Posts Processing Script + +Creates target table, extracts data from source, and processes for search. +- Retrieves parent posts (PostTypeId=1) and their replies (PostTypeId=2) +- Combines posts and tags into structured JSON +- Creates search-ready columns with full-text indexes +- Supports batch processing and duplicate checking +- Handles large datasets efficiently +""" + +import mysql.connector +from mysql.connector import Error, OperationalError +import json +import re +import html +from typing import List, Dict, Any, Set, Tuple +import argparse +import time +import sys +import os + +class StackExchangeProcessor: + def __init__(self, source_config: Dict[str, Any], target_config: Dict[str, Any]): + self.source_config = source_config + self.target_config = target_config + self.stop_words = { + 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', + 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', + 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those', + 'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'its', 'our', 'their' + } + + def clean_text(self, text: str) -> str: + """Clean and normalize text for search indexing.""" + if not text: + return "" + + # Decode HTML entities + text = html.unescape(text) + + # Remove HTML tags + text = re.sub(r'<[^>]+>', ' ', text) + + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + + # Convert to lowercase + return text.lower() + + def parse_tags(self, tags_string: str) -> Set[str]: + """Parse HTML-like tags string and extract unique tag values.""" + if not tags_string: + return set() + + # Extract content between < and > tags + tags = re.findall(r'<([^<>]+)>', tags_string) + return set(tag.strip().lower() for tag in tags if tag.strip()) + + def create_target_table(self, conn) -> bool: + """Create the target table with all necessary columns.""" + cursor = conn.cursor() + + # SQL to create table with all search columns + create_table_sql = """ + CREATE TABLE IF NOT EXISTS `processed_posts` ( + `PostId` BIGINT NOT NULL, + `JsonData` JSON NOT NULL, + `Embeddings` BLOB NULL, + `SearchText` LONGTEXT NULL COMMENT 'Combined text content for full-text search', + `TitleText` VARCHAR(1000) NULL COMMENT 'Processed title text', + `BodyText` LONGTEXT NULL COMMENT 'Processed body text', + `RepliesText` LONGTEXT NULL COMMENT 'Combined replies text', + `Tags` JSON NULL COMMENT 'Extracted tags', + `CreatedAt` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `UpdatedAt` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`PostId`), + KEY `idx_created_at` (`CreatedAt`), + -- KEY `idx_tags` ((CAST(Tags AS CHAR(1000) CHARSET utf8mb4))), -- Commented out for compatibility + FULLTEXT INDEX `ft_search` (`SearchText`, `TitleText`, `BodyText`, `RepliesText`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci + COMMENT='Structured StackExchange posts data with search capabilities' + """ + + try: + cursor.execute(create_table_sql) + conn.commit() + print("āœ… Target table created successfully with all search columns") + return True + except Error as e: + print(f"āŒ Error creating target table: {e}") + return False + finally: + cursor.close() + + def get_parent_posts(self, conn, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]: + """Retrieve parent posts (PostTypeId=1) with pagination.""" + cursor = conn.cursor(dictionary=True) + query = """ + SELECT Id, Title, CreationDate, Body, Tags + FROM Posts + WHERE PostTypeId = 1 + ORDER BY Id + LIMIT %s OFFSET %s + """ + + try: + cursor.execute(query, (limit, offset)) + posts = cursor.fetchall() + return posts + except Error as e: + print(f"Error retrieving parent posts: {e}") + return [] + finally: + cursor.close() + + def get_child_posts(self, conn, parent_ids: List[int], chunk_size: int = 1000) -> Dict[int, List[str]]: + """Retrieve child posts for given parent IDs with chunking.""" + if not parent_ids: + return {} + + parent_to_children = {} + + # Process parent IDs in chunks + for i in range(0, len(parent_ids), chunk_size): + chunk = parent_ids[i:i + chunk_size] + + cursor = conn.cursor(dictionary=True) + query = """ + SELECT ParentId, Body, Id as ReplyId + FROM Posts + WHERE PostTypeId = 2 AND ParentId IN (%s) + ORDER BY ParentId, ReplyId + """ % (','.join(['%s'] * len(chunk))) + + try: + cursor.execute(query, chunk) + child_posts = cursor.fetchall() + + for child in child_posts: + parent_id = child['ParentId'] + if parent_id not in parent_to_children: + parent_to_children[parent_id] = [] + parent_to_children[parent_id].append(child['Body']) + + except Error as e: + print(f"Error retrieving child posts (chunk {i//chunk_size + 1}): {e}") + finally: + cursor.close() + + return parent_to_children + + def get_existing_posts(self, conn, post_ids: List[int]) -> Set[int]: + """Check which post IDs already exist in the target table.""" + if not post_ids: + return set() + + cursor = conn.cursor() + placeholders = ','.join(['%s'] * len(post_ids)) + query = f"SELECT PostId FROM processed_posts WHERE PostId IN ({placeholders})" + + try: + cursor.execute(query, post_ids) + existing_ids = {row[0] for row in cursor.fetchall()} + return existing_ids + except Error as e: + print(f"Error checking existing posts: {e}") + return set() + finally: + cursor.close() + + def process_post_for_search(self, post_data: Dict[str, Any], replies: List[str], tags: Set[str]) -> Dict[str, str]: + """Process a post and extract search-ready text.""" + # Extract title + title = self.clean_text(post_data.get('Title', '')) + + # Extract body + body = self.clean_text(post_data.get('Body', '')) + + # Process replies + replies_text = ' '.join([self.clean_text(reply) for reply in replies if reply]) + + # Combine all text for search + combined_text = f"{title} {body} {replies_text}" + + # Add tags to search text + if tags: + combined_text += ' ' + ' '.join(tags) + + return { + 'title_text': title, + 'body_text': body, + 'replies_text': replies_text, + 'search_text': combined_text, + 'tags': list(tags) if tags else [] + } + + def insert_posts_batch(self, conn, posts_data: List[tuple]) -> int: + """Insert multiple posts in a batch.""" + if not posts_data: + return 0 + + cursor = conn.cursor() + query = """ + INSERT INTO processed_posts (PostId, JsonData, SearchText, TitleText, BodyText, RepliesText, Tags) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + JsonData = VALUES(JsonData), + SearchText = VALUES(SearchText), + TitleText = VALUES(TitleText), + BodyText = VALUES(BodyText), + RepliesText = VALUES(RepliesText), + Tags = VALUES(Tags), + UpdatedAt = CURRENT_TIMESTAMP + """ + + try: + cursor.executemany(query, posts_data) + conn.commit() + inserted = cursor.rowcount + print(f" šŸ“Š Batch inserted {inserted} posts") + return inserted + except Error as e: + print(f" āŒ Error in batch insert: {e}") + conn.rollback() + return 0 + finally: + cursor.close() + + def process_posts(self, limit: int = 10, batch_size: int = 100, skip_duplicates: bool = True) -> Dict[str, int]: + """Main processing method.""" + source_conn = None + target_conn = None + + stats = { + 'total_batches': 0, + 'total_processed': 0, + 'total_inserted': 0, + 'total_skipped': 0, + 'start_time': time.time() + } + + try: + # Connect to databases + source_conn = mysql.connector.connect(**self.source_config) + target_conn = mysql.connector.connect(**self.target_config) + + print("āœ… Connected to source and target databases") + + # Create target table + if not self.create_target_table(target_conn): + print("āŒ Failed to create target table") + return stats + + offset = 0 + # Handle limit=0 (process all posts) + total_limit = float('inf') if limit == 0 else limit + + while offset < total_limit: + # Calculate current batch size + if limit == 0: + current_batch_size = batch_size + else: + current_batch_size = min(batch_size, limit - offset) + + # Get parent posts + parent_posts = self.get_parent_posts(source_conn, current_batch_size, offset) + if not parent_posts: + print("šŸ“„ No more parent posts to process") + # Special handling for limit=0 - break when no more posts + if limit == 0: + break + # For finite limits, break when we've processed all posts + if offset >= limit: + break + + stats['total_batches'] += 1 + print(f"\nšŸ”„ Processing batch {stats['total_batches']} - posts {offset + 1} to {offset + len(parent_posts)}") + + # Get parent IDs + parent_ids = [post['Id'] for post in parent_posts] + + # Check for duplicates + if skip_duplicates: + existing_posts = self.get_existing_posts(target_conn, parent_ids) + parent_posts = [p for p in parent_posts if p['Id'] not in existing_posts] + + duplicates_count = len(parent_ids) - len(parent_posts) + if duplicates_count > 0: + print(f" ā­ļø Skipping {duplicates_count} duplicate posts") + + if not parent_posts: + stats['total_skipped'] += len(parent_ids) + offset += current_batch_size + print(f" āœ… All posts skipped (already exist)") + continue + + # Get child posts and tags + child_posts_map = self.get_child_posts(source_conn, parent_ids) + + # Extract tags from parent posts + all_tags = {} + for post in parent_posts: + tags_from_source = self.parse_tags(post.get('Tags', '')) + all_tags[post['Id']] = tags_from_source + + # Process posts + batch_data = [] + processed_count = 0 + + for parent in parent_posts: + post_id = parent['Id'] + replies = child_posts_map.get(post_id, []) + tags = all_tags.get(post_id, set()) + + # Get creation date + creation_date = parent.get('CreationDate') + if creation_date: + creation_date_str = creation_date.isoformat() + else: + creation_date_str = None + + # Create JSON structure + post_json = { + "Id": post_id, + "Title": parent['Title'], + "CreationDate": creation_date_str, + "Body": parent['Body'], + "Replies": replies, + "Tags": sorted(list(tags)) + } + + # Process for search + search_data = self.process_post_for_search(parent, replies, tags) + + # Add to batch + batch_data.append(( + post_id, + json.dumps(post_json, ensure_ascii=False), + search_data['search_text'], + search_data['title_text'], + search_data['body_text'], + search_data['replies_text'], + json.dumps(search_data['tags'], ensure_ascii=False) + )) + + processed_count += 1 + + # Insert batch + if batch_data: + print(f" šŸ“ Processing {len(batch_data)} posts...") + inserted = self.insert_posts_batch(target_conn, batch_data) + stats['total_inserted'] += inserted + stats['total_processed'] += processed_count + + # Advance offset + offset += current_batch_size + + # Show progress + elapsed = time.time() - stats['start_time'] + if limit == 0: + print(f" ā±ļø Progress: {offset} posts processed") + else: + print(f" ā±ļø Progress: {offset}/{limit} posts ({offset/limit*100:.1f}%)") + print(f" šŸ“ˆ Total processed: {stats['total_processed']}, " + f"Inserted: {stats['total_inserted']}, " + f"Skipped: {stats['total_skipped']}") + if elapsed > 0: + print(f" ⚔ Rate: {stats['total_processed']/elapsed:.1f} posts/sec") + + stats['end_time'] = time.time() + total_time = stats['end_time'] - stats['start_time'] + + print(f"\nšŸŽ‰ Processing complete!") + print(f" šŸ“Š Total batches: {stats['total_batches']}") + print(f" šŸ“ Total processed: {stats['total_processed']}") + print(f" āœ… Total inserted: {stats['total_inserted']}") + print(f" ā­ļø Total skipped: {stats['total_skipped']}") + print(f" ā±ļø Total time: {total_time:.1f} seconds") + if total_time > 0: + print(f" šŸš€ Average rate: {stats['total_processed']/total_time:.1f} posts/sec") + + return stats + + except Error as e: + print(f"āŒ Database error: {e}") + return stats + except Exception as e: + print(f"āŒ Error: {e}") + return stats + finally: + if source_conn and source_conn.is_connected(): + source_conn.close() + if target_conn and target_conn.is_connected(): + target_conn.close() + print("\nšŸ”Œ Database connections closed") + +def main(): + # Default configurations (can be overridden by environment variables) + source_config = { + "host": os.getenv("SOURCE_DB_HOST", "127.0.0.1"), + "port": int(os.getenv("SOURCE_DB_PORT", "3306")), + "user": os.getenv("SOURCE_DB_USER", "stackexchange"), + "password": os.getenv("SOURCE_DB_PASSWORD", "my-password"), + "database": os.getenv("SOURCE_DB_NAME", "stackexchange"), + "use_pure": True, + "ssl_disabled": True + } + + target_config = { + "host": os.getenv("TARGET_DB_HOST", "127.0.0.1"), + "port": int(os.getenv("TARGET_DB_PORT", "3306")), + "user": os.getenv("TARGET_DB_USER", "stackexchange"), + "password": os.getenv("TARGET_DB_PASSWORD", "my-password"), + "database": os.getenv("TARGET_DB_NAME", "stackexchange_post"), + "use_pure": True, + "ssl_disabled": True + } + + parser = argparse.ArgumentParser(description="Comprehensive StackExchange Posts Processing") + parser.add_argument("--source-host", default=source_config['host'], help="Source database host") + parser.add_argument("--source-port", type=int, default=source_config['port'], help="Source database port") + parser.add_argument("--source-user", default=source_config['user'], help="Source database user") + parser.add_argument("--source-password", default=source_config['password'], help="Source database password") + parser.add_argument("--source-db", default=source_config['database'], help="Source database name") + + parser.add_argument("--target-host", default=target_config['host'], help="Target database host") + parser.add_argument("--target-port", type=int, default=target_config['port'], help="Target database port") + parser.add_argument("--target-user", default=target_config['user'], help="Target database user") + parser.add_argument("--target-password", default=target_config['password'], help="Target database password") + parser.add_argument("--target-db", default=target_config['database'], help="Target database name") + + parser.add_argument("--limit", type=int, default=10, help="Number of parent posts to process") + parser.add_argument("--batch-size", type=int, default=100, help="Batch size for processing") + parser.add_argument("--warning-large-batches", action="store_true", help="Show warnings for batch sizes > 1000") + parser.add_argument("--skip-duplicates", action="store_true", default=True, help="Skip posts that already exist") + parser.add_argument("--no-skip-duplicates", action="store_true", help="Disable duplicate skipping") + + parser.add_argument("--verbose", action="store_true", help="Show detailed progress") + + args = parser.parse_args() + + # Override configurations with command line arguments + source_config.update({ + "host": args.source_host, + "port": args.source_port, + "user": args.source_user, + "password": args.source_password, + "database": args.source_db + }) + + target_config.update({ + "host": args.target_host, + "port": args.target_port, + "user": args.target_user, + "password": args.target_password, + "database": args.target_db + }) + + skip_duplicates = args.skip_duplicates and not args.no_skip_duplicates + + # Check for large batch size + if args.warning_large_batches and args.batch_size > 1000: + print(f"āš ļø WARNING: Large batch size ({args.batch_size}) may cause connection issues") + print(" Consider using smaller batches (1000-5000) for better stability") + + print("šŸš€ StackExchange Posts Processor") + print("=" * 50) + print(f"Source: {source_config['host']}:{source_config['port']}/{source_config['database']}") + print(f"Target: {target_config['host']}:{target_config['port']}/{target_config['database']}") + print(f"Limit: {'All posts' if args.limit == 0 else args.limit} posts") + print(f"Batch size: {args.batch_size}") + print(f"Skip duplicates: {skip_duplicates}") + print("=" * 50) + + # Create processor and run + processor = StackExchangeProcessor(source_config, target_config) + stats = processor.process_posts( + limit=args.limit, + batch_size=args.batch_size, + skip_duplicates=skip_duplicates + ) + + if stats['total_processed'] > 0: + print(f"\nāœ… Processing completed successfully!") + else: + print(f"\nāŒ No posts were processed!") + +if __name__ == "__main__": + main() \ No newline at end of file