Merge pull request #6 from ProxySQL/v3.1-vec4

v3.1-vec4: Add NLP search demo and fix data processing issues
pull/5310/head
René Cannaò 4 months ago committed by GitHub
commit a50a5487a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,79 +1,236 @@
### Scripts description
# StackExchange Posts Processor
This is a set of example scripts to show the capabilities of the RESTAPI interface and how to interface with it.
A comprehensive script to extract, process, and index StackExchange posts for search capabilities.
### Prepare ProxySQL
## Features
1. Launch ProxySQL:
- ✅ **Complete Pipeline**: Extracts parent posts and replies from source database
- 📊 **Search Ready**: Creates full-text search indexes and processed text columns
- 🚀 **Efficient**: Batch processing with memory optimization
- 🔍 **Duplicate Prevention**: Skip already processed posts
- 📈 **Progress Tracking**: Real-time statistics and performance metrics
- 🔧 **Flexible**: Configurable source/target databases
- 📝 **Rich Output**: Structured JSON with tags and metadata
## Database Schema
The script creates a comprehensive target table with these columns:
```sql
processed_posts (
PostId BIGINT PRIMARY KEY,
JsonData JSON NOT NULL, -- Complete post data
Embeddings BLOB NULL, -- For future ML embeddings
SearchText LONGTEXT NULL, -- Combined text for search
TitleText VARCHAR(1000) NULL, -- Cleaned title
BodyText LONGTEXT NULL, -- Cleaned body
RepliesText LONGTEXT NULL, -- Combined replies
Tags JSON NULL, -- Extracted tags
CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UpdatedAt TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
-- Indexes
KEY idx_created_at (CreatedAt),
KEY idx_tags ((CAST(Tags AS CHAR(1000)))), -- JSON tag index
FULLTEXT INDEX ft_search (SearchText, TitleText, BodyText, RepliesText)
)
```
## Usage
### Basic Usage
```bash
# Process first 1000 posts
python3 stackexchange_posts.py --limit 1000
# Process with custom batch size
python3 stackexchange_posts.py --limit 10000 --batch-size 500
# Don't skip duplicates (process all posts)
python3 stackexchange_posts.py --limit 1000 --no-skip-duplicates
```
### Advanced Configuration
```bash
# Custom database connections
python3 stackexchange_posts.py \
--source-host 192.168.1.100 \
--source-port 3307 \
--source-user myuser \
--source-password mypass \
--source-db my_stackexchange \
--target-host 192.168.1.200 \
--target-port 3306 \
--target-user search_user \
--target-password search_pass \
--target-db search_db \
--limit 50000 \
--batch-size 1000
```
## Search Examples
Once processed, you can search the data using:
### 1. MySQL Full-Text Search
```sql
-- Basic search
SELECT PostId, Title
FROM processed_posts
WHERE MATCH(SearchText) AGAINST('mysql optimization' IN BOOLEAN MODE)
ORDER BY relevance DESC;
-- Boolean search operators
SELECT PostId, Title
FROM processed_posts
WHERE MATCH(SearchText) AGAINST('+database -oracle' IN BOOLEAN MODE);
-- Proximity search
SELECT PostId, Title
FROM processed_posts
WHERE MATCH(SearchText) AGAINST('"database performance"~5' IN BOOLEAN MODE);
```
./proxysql -M --sqlite3-server --idle-threads -f -c $PROXYSQL_PATH/scripts/datadir/proxysql.cnf -D $PROXYSQL_PATH/scripts/datadir
### 2. Tag-based Search
```sql
-- Search by specific tags
SELECT PostId, Title
FROM processed_posts
WHERE JSON_CONTAINS(Tags, '"mysql"') AND JSON_CONTAINS(Tags, '"performance"');
```
### 3. Filtered Search
```sql
-- Search within date range
SELECT PostId, Title, JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate
FROM processed_posts
WHERE MATCH(SearchText) AGAINST('python' IN BOOLEAN MODE)
AND JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) BETWEEN '2023-01-01' AND '2023-12-31';
```
## Performance Tips
1. **Batch Size**: Use larger batches (1000-5000) for better throughput
2. **Memory**: Adjust batch size based on available memory
3. **Indexes**: The script automatically creates necessary indexes
4. **Parallel Processing**: Consider running multiple instances with different offset ranges
## Output Example
```
🚀 StackExchange Posts Processor
==================================================
Source: 127.0.0.1:3306/stackexchange
Target: 127.0.0.1:3306/stackexchange_post
Limit: 1000 posts
Batch size: 100
Skip duplicates: True
==================================================
✅ Connected to source and target databases
✅ Target table created successfully with all search columns
2. Configure ProxySQL:
🔄 Processing batch 1 - posts 1 to 100
⏭️ Skipping 23 duplicate posts
📝 Processing 77 posts...
📊 Batch inserted 77 posts
⏱️ Progress: 100/1000 posts (10.0%)
📈 Total processed: 77, Inserted: 77, Skipped: 23
⚡ Rate: 12.3 posts/sec
🎉 Processing complete!
📊 Total batches: 10
📝 Total processed: 800
✅ Total inserted: 800
⏭️ Total skipped: 200
⏱️ Total time: 45.2 seconds
🚀 Average rate: 17.7 posts/sec
✅ Processing completed successfully!
```
cd $RESTAPI_EXAMPLES_DIR
./proxysql_config.sh
## Troubleshooting
### Common Issues
1. **Table Creation Failed**: Check database permissions
2. **Memory Issues**: Reduce batch size
3. **Slow Performance**: Optimize MySQL configuration
4. **Connection Errors**: Verify database credentials
### Maintenance
```sql
-- Check table status
SHOW TABLE STATUS LIKE 'processed_posts';
-- Rebuild full-text index
ALTER TABLE processed_posts DROP INDEX ft_search,
ADD FULLTEXT INDEX ft_search (SearchText, TitleText, BodyText, RepliesText);
-- Count processed posts
SELECT COUNT(*) FROM processed_posts;
```
3. Install requirements
## Requirements
- Python 3.7+
- mysql-connector-python
- MySQL 5.7+ (for JSON and full-text support)
Install dependencies:
```bash
pip install mysql-connector-python
```
cd $RESTAPI_EXAMPLES_DIR/requirements
./install_requirements.sh
## Other Scripts
The `scripts/` directory also contains other utility scripts:
- `nlp_search_demo.py` - Demonstrate various search techniques on processed posts:
- Full-text search with MySQL
- Boolean search with operators
- Tag-based JSON queries
- Combined search approaches
- Statistics and search analytics
- Data preparation for future semantic search
- `add_mysql_user.sh` - Add/replace MySQL users in ProxySQL
- `change_host_status.sh` - Change host status in ProxySQL
- `flush_query_cache.sh` - Flush ProxySQL query cache
- `kill_idle_backend_conns.py` - Kill idle backend connections
- `proxysql_config.sh` - Configure ProxySQL settings
- `stats_scrapper.py` - Scrape statistics from ProxySQL
## Search Examples
### Using the NLP Search Demo
```bash
# Show search statistics
python3 nlp_search_demo.py --mode stats
# Full-text search
python3 nlp_search_demo.py --mode full-text --query "mysql performance optimization"
# Boolean search with operators
python3 nlp_search_demo.py --mode boolean --query "+database -oracle"
# Search by tags
python3 nlp_search_demo.py --mode tags --tags mysql performance --operator AND
# Combined search with text and tags
python3 nlp_search_demo.py --mode combined --query "python optimization" --tags python
# Prepare data for semantic search
python3 nlp_search_demo.py --mode similarity --query "machine learning"
```
### Query the endpoints
1. Flush Query Cache: `curl -i -X GET http://localhost:6070/sync/flush_query_cache`
2. Change host status:
- Assuming local ProxySQL:
```
curl -i -X POST -d '{ "hostgroup_id": "0", "hostname": "127.0.0.1", "port": 13306, "status": "OFFLINE_HARD" }' http://localhost:6070/sync/change_host_status
```
- Specifying server:
```
curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": "6032", "admin_user": "radmin", "admin_pass": "radmin", "hostgroup_id": "0", "hostname": "127.0.0.1", "port": 13306, "status": "OFFLINE_HARD" }' http://localhost:6070/sync/change_host_status
```
2. Add or replace MySQL user:
- Assuming local ProxySQL:
```
curl -i -X POST -d '{ "user": "sbtest1", "pass": "sbtest1" }' http://localhost:6070/sync/add_mysql_user
```
- Add user and load to runtime (Assuming local instance):
```
curl -i -X POST -d '{ "user": "sbtest1", "pass": "sbtest1", "to_runtime": 1 }' http://localhost:6070/sync/add_mysql_user
```
- Specifying server:
```
curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": "6032", "admin_user": "radmin", "admin_pass": "radmin", "user": "sbtest1", "pass": "sbtest1" }' http://localhost:6070/sync/add_mysql_user
```
3. Kill idle backend connections:
- Assuming local ProxySQL:
```
curl -i -X POST -d '{ "timeout": 10 }' http://localhost:6070/sync/kill_idle_backend_conns
```
- Specifying server:
```
curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "timeout": 10 }' http://localhost:6070/sync/kill_idle_backend_conns
```
4. Scrap tables from 'stats' schema:
- Assuming local ProxySQL:
```
curl -i -X POST -d '{ "table": "stats_mysql_users" }' http://localhost:6070/sync/scrap_stats
```
- Specifying server:
```
curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "table": "stats_mysql_users" }' http://localhost:6070/sync/scrap_stats
```
- Provoke script failure (non-existing table):
```
curl -i -X POST -d '{ "admin_host": "127.0.0.1", "admin_port": 6032, "admin_user": "radmin", "admin_pass": "radmin", "table": "stats_mysql_servers" }' http://localhost:6070/sync/scrap_stats
```
### Scripts doc
- All scripts allows to perform the target operations on a local or remote ProxySQL instance.
- Notice how the unique 'GET' request is for 'QUERY CACHE' flushing, since it doesn't require any parameters.
- Script 'stats_scrapper.py' fails when a table that isn't present in 'stats' schema is queried. This is left as an example of the behavior of a failing script and ProxySQL log output.
## License
Internal use only.

@ -0,0 +1,554 @@
#!/usr/bin/env python3
"""
NLP Search Demo for StackExchange Posts
Demonstrates various search techniques on processed posts:
- Full-text search with MySQL
- Boolean search with operators
- Tag-based JSON queries
- Combined search approaches
- Statistics and search analytics
- Data preparation for future semantic search
"""
import mysql.connector
from mysql.connector import Error, OperationalError
import json
import re
import html
from typing import List, Dict, Any, Set, Tuple
import argparse
import time
import sys
import os
class NLPSearchDemo:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those',
'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'its', 'our', 'their'
}
def connect(self):
"""Create database connection."""
try:
conn = mysql.connector.connect(**self.config)
print("✅ Connected to database")
return conn
except Error as e:
print(f"❌ Connection error: {e}")
return None
def get_table_stats(self, conn):
"""Get statistics about the processed_posts table."""
cursor = conn.cursor(dictionary=True)
try:
# Basic table stats
cursor.execute("SELECT COUNT(*) as total_posts FROM processed_posts")
total_posts = cursor.fetchone()['total_posts']
cursor.execute("SELECT COUNT(*) as posts_with_tags FROM processed_posts WHERE Tags IS NOT NULL AND Tags != '[]'")
posts_with_tags = cursor.fetchone()['posts_with_tags']
cursor.execute("SELECT MIN(JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate'))) as earliest, "
"MAX(JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate'))) as latest "
"FROM processed_posts")
date_range = cursor.fetchone()
# Get unique tags
cursor.execute("""
SELECT DISTINCT Tags
FROM processed_posts
WHERE Tags IS NOT NULL AND Tags != '[]'
LIMIT 1000
""")
tags_data = cursor.fetchall()
# Extract all unique tags
all_tags = set()
for row in tags_data:
if row['Tags']:
try:
tags_list = json.loads(row['Tags'])
all_tags.update(tags_list)
except:
pass
print(f"\n📊 Table Statistics:")
print(f" Total posts: {total_posts:,}")
if total_posts > 0:
print(f" Posts with tags: {posts_with_tags:,} ({posts_with_tags/total_posts*100:.1f}%)")
else:
print(f" Posts with tags: {posts_with_tags:,}")
print(f" Date range: {date_range['earliest'][:10]} to {date_range['latest'][:10]}")
print(f" Unique tags: {len(all_tags):,}")
if all_tags:
print(f" Top tags: {', '.join(sorted(list(all_tags))[:20])}")
except Error as e:
print(f"❌ Error getting stats: {e}")
finally:
cursor.close()
def full_text_search(self, conn, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Perform full-text search with MySQL."""
cursor = conn.cursor(dictionary=True)
start_time = time.time()
try:
sql = """
SELECT PostId, TitleText, MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE) as relevance
FROM processed_posts
WHERE MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE)
ORDER BY relevance DESC, CreatedAt DESC LIMIT %s
"""
cursor.execute(sql, (query, query, limit))
results = cursor.fetchall()
search_method = "full-text"
except Error:
sql = """
SELECT PostId, TitleText, CreatedAt
FROM processed_posts
WHERE SearchText LIKE %s OR TitleText LIKE %s OR BodyText LIKE %s
ORDER BY CreatedAt DESC LIMIT %s
"""
search_term = f"%{query}%"
cursor.execute(sql, (search_term, search_term, search_term, limit))
results = cursor.fetchall()
search_method = "LIKE"
elapsed = time.time() - start_time
print(f"🔍 {search_method.title()} search for '{query}' ({elapsed:.3f}s):")
for i, row in enumerate(results, 1):
print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...")
print(f"📊 Found {len(results)} results in {elapsed:.3f} seconds")
return results
def boolean_search(self, conn, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Perform boolean search with operators."""
cursor = conn.cursor(dictionary=True)
start_time = time.time()
try:
# Try boolean mode first
sql = """
SELECT PostId, TitleText,
MATCH(SearchText) AGAINST(%s IN BOOLEAN MODE) as relevance
FROM processed_posts
WHERE MATCH(SearchText) AGAINST(%s IN BOOLEAN MODE)
ORDER BY relevance DESC, CreatedAt DESC LIMIT %s
"""
cursor.execute(sql, (query, query, limit))
results = cursor.fetchall()
search_method = "boolean"
except Error:
# Fallback to LIKE search
sql = """
SELECT PostId, TitleText, CreatedAt
FROM processed_posts
WHERE SearchText LIKE %s
ORDER BY CreatedAt DESC LIMIT %s
"""
search_term = f"%{query}%"
cursor.execute(sql, (search_term, limit))
results = cursor.fetchall()
search_method = "LIKE"
elapsed = time.time() - start_time
print(f"🔍 Boolean search for '{query}' ({elapsed:.3f}s):")
for i, row in enumerate(results, 1):
print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...")
print(f"📊 Found {len(results)} results in {elapsed:.3f} seconds")
return results
def tag_search(self, conn, tags: List[str], operator: str = "AND", limit: int = 10) -> List[Dict[str, Any]]:
"""Search by tags using JSON functions."""
cursor = conn.cursor(dictionary=True)
try:
# Build JSON_CONTAINS conditions
conditions = []
params = []
for tag in tags:
conditions.append(f"JSON_CONTAINS(Tags, %s)")
params.append(f'"{tag}"')
if operator.upper() == "AND":
where_clause = " AND ".join(conditions)
else: # OR
where_clause = " OR ".join(conditions)
sql = f"""
SELECT
PostId,
TitleText,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson,
CreatedAt
FROM processed_posts
WHERE {where_clause}
ORDER BY CreatedAt DESC
LIMIT %s
"""
start_time = time.time()
cursor.execute(sql, params + [limit])
results = cursor.fetchall()
search_method = "JSON_CONTAINS"
elapsed = time.time() - start_time
tag_str = " AND ".join(tags) if operator == "AND" else " OR ".join(tags)
print(f"🏷️ Tag search for {tag_str} ({elapsed:.3f}s):")
for i, row in enumerate(results, 1):
found_tags = json.loads(row['TagsJson']) if row['TagsJson'] else []
print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...")
print(f" All tags: {', '.join(found_tags[:5])}{'...' if len(found_tags) > 5 else ''}")
print()
print(f"📊 Found {len(results)} results in {elapsed:.3f} seconds")
return results
except Error as e:
print(f"❌ Tag search error: {e}")
return []
finally:
cursor.close()
def combined_search(self, conn, search_term: str = None, tags: List[str] = None,
date_from: str = None, date_to: str = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Combined search with full-text, tags, and date filtering."""
cursor = conn.cursor(dictionary=True)
try:
conditions = []
params = []
# Full-text search condition
if search_term:
conditions.append("MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE)")
params.append(search_term)
# Tag conditions
if tags:
for tag in tags:
conditions.append("JSON_CONTAINS(Tags, %s)")
params.append(f'"{tag}"')
# Date conditions
if date_from:
conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) >= %s")
params.append(date_from)
if date_to:
conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) <= %s")
params.append(date_to)
# Build WHERE clause
where_clause = " AND ".join(conditions) if conditions else "1=1"
# Build SELECT clause dynamically - only include relevance if search_term is provided
if search_term:
select_clause = """
SELECT
PostId,
TitleText,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson,
MATCH(SearchText) AGAINST(%s IN NATURAL LANGUAGE MODE) as relevance,
CreatedAt
"""
order_clause = "ORDER BY relevance DESC, CreatedAt DESC"
# Add search_term again for the SELECT clause's MATCH
fulltext_params = [search_term] + params + [limit]
else:
select_clause = """
SELECT
PostId,
TitleText,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson,
CreatedAt
"""
order_clause = "ORDER BY CreatedAt DESC"
fulltext_params = params + [limit]
sql = f"""
{select_clause}
FROM processed_posts
WHERE {where_clause}
{order_clause}
LIMIT %s
"""
start_time = time.time()
try:
# First try full-text search
cursor.execute(sql, fulltext_params)
results = cursor.fetchall()
search_method = "combined"
except Error:
# Fallback to LIKE search
conditions = []
like_params = []
# Add search term condition
if search_term:
conditions.append("(SearchText LIKE %s OR TitleText LIKE %s OR BodyText LIKE %s)")
like_params.extend([f"%{search_term}%"] * 3)
# Add tag conditions
if tags:
for tag in tags:
conditions.append("JSON_CONTAINS(Tags, %s)")
like_params.append(f'"{tag}"')
# Add date conditions
if date_from:
conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) >= %s")
like_params.append(date_from)
if date_to:
conditions.append("JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) <= %s")
like_params.append(date_to)
where_clause = " AND ".join(conditions) if conditions else "1=1"
like_sql = f"""
SELECT
PostId,
TitleText,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.CreationDate')) as CreationDate,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson,
CreatedAt
FROM processed_posts
WHERE {where_clause}
ORDER BY CreatedAt DESC
LIMIT %s
"""
like_params.append(limit)
cursor.execute(like_sql, like_params)
results = cursor.fetchall()
search_method = "LIKE"
elapsed = time.time() - start_time
print(f"🔍 {search_method.title()} search ({elapsed:.3f}s):")
print(f" Search term: {search_term or 'None'}")
print(f" Tags: {tags or 'None'}")
print(f" Date range: {date_from or 'beginning'} to {date_to or 'end'}")
print()
for i, row in enumerate(results, 1):
found_tags = json.loads(row['TagsJson']) if row['TagsJson'] else []
relevance = row.get('relevance', 0.0) if search_method == "combined" else "N/A"
print(f" {i}. [{row['PostId']}] {row['TitleText'][:80]}...")
print(f" Tags: {', '.join(found_tags[:3])}{'...' if len(found_tags) > 3 else ''}")
print(f" Created: {row['CreationDate']}")
if search_method == "combined":
print(f" Relevance: {relevance:.3f}")
print()
print(f"📊 Found {len(results)} results in {elapsed:.3f} seconds")
return results
except Error as e:
print(f"❌ Combined search error: {e}")
return []
finally:
cursor.close()
def similarity_search_preparation(self, conn, query: str, limit: int = 20) -> List[Dict[str, Any]]:
"""Prepare data for future semantic search by extracting relevant terms."""
cursor = conn.cursor(dictionary=True)
try:
# Search and return results with text content for future embedding generation
sql = """
SELECT
PostId,
TitleText,
BodyText,
RepliesText,
JSON_UNQUOTE(JSON_EXTRACT(JsonData, '$.Tags')) as TagsJson
FROM processed_posts
WHERE SearchText LIKE %s
ORDER BY CreatedAt DESC
LIMIT %s
"""
search_term = f"%{query}%"
cursor.execute(sql, (search_term, limit))
results = cursor.fetchall()
print(f"🔍 Preparation for semantic search on '{query}':")
print(f" Found {len(results)} relevant posts")
# Extract text for future embeddings
all_text = []
for row in results:
title = row['TitleText'] or ''
body = row['BodyText'] or ''
replies = row['RepliesText'] or ''
combined = f"{title} {body} {replies}".strip()
if combined:
all_text.append(combined)
print(f" Total text length: {sum(len(text) for text in all_text):,} characters")
if all_text:
print(f" Average text length: {sum(len(text) for text in all_text) / len(all_text):,.0f} characters")
return results
except Error as e:
print(f"❌ Similarity search preparation error: {e}")
return []
finally:
cursor.close()
def run_demo(self, mode: str = "stats", **kwargs):
"""Run the search demo with specified mode."""
conn = self.connect()
if not conn:
return
try:
if mode == "stats":
self.get_table_stats(conn)
elif mode == "full-text":
query = kwargs.get('query', '')
limit = kwargs.get('limit', 10)
self.full_text_search(conn, query, limit)
elif mode == "boolean":
query = kwargs.get('query', '')
limit = kwargs.get('limit', 10)
self.boolean_search(conn, query, limit)
elif mode == "tags":
tags = kwargs.get('tags', [])
operator = kwargs.get('operator', 'AND')
limit = kwargs.get('limit', 10)
self.tag_search(conn, tags, operator, limit)
elif mode == "combined":
search_term = kwargs.get('query', None)
tags = kwargs.get('tags', None)
date_from = kwargs.get('date_from', None)
date_to = kwargs.get('date_to', None)
limit = kwargs.get('limit', 10)
self.combined_search(conn, search_term, tags, date_from, date_to, limit)
elif mode == "similarity":
query = kwargs.get('query', '')
limit = kwargs.get('limit', 20)
self.similarity_search_preparation(conn, query, limit)
else:
print(f"❌ Unknown mode: {mode}")
print("Available modes: stats, full-text, boolean, tags, combined, similarity")
finally:
if conn and conn.is_connected():
conn.close()
def main():
# Default configuration (can be overridden by environment variables)
config = {
"host": os.getenv("DB_HOST", "127.0.0.1"),
"port": int(os.getenv("DB_PORT", "3306")),
"user": os.getenv("DB_USER", "stackexchange"),
"password": os.getenv("DB_PASSWORD", "my-password"),
"database": os.getenv("DB_NAME", "stackexchange_post"),
"use_pure": True,
"ssl_disabled": True
}
parser = argparse.ArgumentParser(description="NLP Search Demo for StackExchange Posts")
parser.add_argument("--host", default=config['host'], help="Database host")
parser.add_argument("--port", type=int, default=config['port'], help="Database port")
parser.add_argument("--user", default=config['user'], help="Database user")
parser.add_argument("--password", default=config['password'], help="Database password")
parser.add_argument("--database", default=config['database'], help="Database name")
parser.add_argument("--mode", default="stats",
choices=["stats", "full-text", "boolean", "tags", "combined", "similarity"],
help="Search mode to demonstrate")
parser.add_argument("--limit", type=int, default=10, help="Number of results to return")
parser.add_argument("--operator", default="AND", choices=["AND", "OR"], help="Tag operator")
parser.add_argument("--query", help="Search query for text-based searches")
parser.add_argument("--tags", nargs='+', help="Tags to search for")
parser.add_argument("--date-from", help="Start date (YYYY-MM-DD)")
parser.add_argument("--date-to", help="End date (YYYY-MM-DD)")
parser.add_argument("--stats", action="store_true", help="Show table statistics")
parser.add_argument("--verbose", action="store_true", help="Show detailed output")
args = parser.parse_args()
# Override configuration with command line arguments
config.update({
"host": args.host,
"port": args.port,
"user": args.user,
"password": args.password,
"database": args.database
})
# Handle legacy --stats flag
if args.stats:
args.mode = "stats"
print("🔍 NLP Search Demo for StackExchange Posts")
print("=" * 50)
print(f"Database: {config['host']}:{config['port']}/{config['database']}")
print(f"Mode: {args.mode}")
print("=" * 50)
# Create demo instance and run
demo = NLPSearchDemo(config)
# Prepare kwargs based on mode
kwargs = {
'limit': args.limit,
'operator': args.operator,
'query': args.query,
'tags': args.tags,
'date_from': args.date_from,
'date_to': args.date_to
}
# Remove None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
# If mode is text-based and no query provided, use the mode as query
if args.mode in ["full-text", "boolean", "similarity"] and not args.query:
# For compatibility with command-line usage like: python3 script.py --full-text "mysql optimization"
if len(sys.argv) > 2 and sys.argv[1] == "--mode" and len(sys.argv) > 4:
# Find the actual query after the mode
mode_index = sys.argv.index("--mode")
if mode_index + 2 < len(sys.argv):
query_index = mode_index + 2
query_parts = []
while query_index < len(sys.argv) and not sys.argv[query_index].startswith("--"):
query_parts.append(sys.argv[query_index])
query_index += 1
if query_parts:
kwargs['query'] = ' '.join(query_parts)
demo.run_demo(args.mode, **kwargs)
if __name__ == "__main__":
main()

@ -0,0 +1,491 @@
#!/usr/bin/env python3
"""
Comprehensive StackExchange Posts Processing Script
Creates target table, extracts data from source, and processes for search.
- Retrieves parent posts (PostTypeId=1) and their replies (PostTypeId=2)
- Combines posts and tags into structured JSON
- Creates search-ready columns with full-text indexes
- Supports batch processing and duplicate checking
- Handles large datasets efficiently
"""
import mysql.connector
from mysql.connector import Error, OperationalError
import json
import re
import html
from typing import List, Dict, Any, Set, Tuple
import argparse
import time
import sys
import os
class StackExchangeProcessor:
def __init__(self, source_config: Dict[str, Any], target_config: Dict[str, Any]):
self.source_config = source_config
self.target_config = target_config
self.stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those',
'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'its', 'our', 'their'
}
def clean_text(self, text: str) -> str:
"""Clean and normalize text for search indexing."""
if not text:
return ""
# Decode HTML entities
text = html.unescape(text)
# Remove HTML tags
text = re.sub(r'<[^>]+>', ' ', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Convert to lowercase
return text.lower()
def parse_tags(self, tags_string: str) -> Set[str]:
"""Parse HTML-like tags string and extract unique tag values."""
if not tags_string:
return set()
# Extract content between < and > tags
tags = re.findall(r'<([^<>]+)>', tags_string)
return set(tag.strip().lower() for tag in tags if tag.strip())
def create_target_table(self, conn) -> bool:
"""Create the target table with all necessary columns."""
cursor = conn.cursor()
# SQL to create table with all search columns
create_table_sql = """
CREATE TABLE IF NOT EXISTS `processed_posts` (
`PostId` BIGINT NOT NULL,
`JsonData` JSON NOT NULL,
`Embeddings` BLOB NULL,
`SearchText` LONGTEXT NULL COMMENT 'Combined text content for full-text search',
`TitleText` VARCHAR(1000) NULL COMMENT 'Processed title text',
`BodyText` LONGTEXT NULL COMMENT 'Processed body text',
`RepliesText` LONGTEXT NULL COMMENT 'Combined replies text',
`Tags` JSON NULL COMMENT 'Extracted tags',
`CreatedAt` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`UpdatedAt` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`PostId`),
KEY `idx_created_at` (`CreatedAt`),
-- KEY `idx_tags` ((CAST(Tags AS CHAR(1000) CHARSET utf8mb4))), -- Commented out for compatibility
FULLTEXT INDEX `ft_search` (`SearchText`, `TitleText`, `BodyText`, `RepliesText`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='Structured StackExchange posts data with search capabilities'
"""
try:
cursor.execute(create_table_sql)
conn.commit()
print("✅ Target table created successfully with all search columns")
return True
except Error as e:
print(f"❌ Error creating target table: {e}")
return False
finally:
cursor.close()
def get_parent_posts(self, conn, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
"""Retrieve parent posts (PostTypeId=1) with pagination."""
cursor = conn.cursor(dictionary=True)
query = """
SELECT Id, Title, CreationDate, Body, Tags
FROM Posts
WHERE PostTypeId = 1
ORDER BY Id
LIMIT %s OFFSET %s
"""
try:
cursor.execute(query, (limit, offset))
posts = cursor.fetchall()
return posts
except Error as e:
print(f"Error retrieving parent posts: {e}")
return []
finally:
cursor.close()
def get_child_posts(self, conn, parent_ids: List[int], chunk_size: int = 1000) -> Dict[int, List[str]]:
"""Retrieve child posts for given parent IDs with chunking."""
if not parent_ids:
return {}
parent_to_children = {}
# Process parent IDs in chunks
for i in range(0, len(parent_ids), chunk_size):
chunk = parent_ids[i:i + chunk_size]
cursor = conn.cursor(dictionary=True)
query = """
SELECT ParentId, Body, Id as ReplyId
FROM Posts
WHERE PostTypeId = 2 AND ParentId IN (%s)
ORDER BY ParentId, ReplyId
""" % (','.join(['%s'] * len(chunk)))
try:
cursor.execute(query, chunk)
child_posts = cursor.fetchall()
for child in child_posts:
parent_id = child['ParentId']
if parent_id not in parent_to_children:
parent_to_children[parent_id] = []
parent_to_children[parent_id].append(child['Body'])
except Error as e:
print(f"Error retrieving child posts (chunk {i//chunk_size + 1}): {e}")
finally:
cursor.close()
return parent_to_children
def get_existing_posts(self, conn, post_ids: List[int]) -> Set[int]:
"""Check which post IDs already exist in the target table."""
if not post_ids:
return set()
cursor = conn.cursor()
placeholders = ','.join(['%s'] * len(post_ids))
query = f"SELECT PostId FROM processed_posts WHERE PostId IN ({placeholders})"
try:
cursor.execute(query, post_ids)
existing_ids = {row[0] for row in cursor.fetchall()}
return existing_ids
except Error as e:
print(f"Error checking existing posts: {e}")
return set()
finally:
cursor.close()
def process_post_for_search(self, post_data: Dict[str, Any], replies: List[str], tags: Set[str]) -> Dict[str, str]:
"""Process a post and extract search-ready text."""
# Extract title
title = self.clean_text(post_data.get('Title', ''))
# Extract body
body = self.clean_text(post_data.get('Body', ''))
# Process replies
replies_text = ' '.join([self.clean_text(reply) for reply in replies if reply])
# Combine all text for search
combined_text = f"{title} {body} {replies_text}"
# Add tags to search text
if tags:
combined_text += ' ' + ' '.join(tags)
return {
'title_text': title,
'body_text': body,
'replies_text': replies_text,
'search_text': combined_text,
'tags': list(tags) if tags else []
}
def insert_posts_batch(self, conn, posts_data: List[tuple]) -> int:
"""Insert multiple posts in a batch."""
if not posts_data:
return 0
cursor = conn.cursor()
query = """
INSERT INTO processed_posts (PostId, JsonData, SearchText, TitleText, BodyText, RepliesText, Tags)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
JsonData = VALUES(JsonData),
SearchText = VALUES(SearchText),
TitleText = VALUES(TitleText),
BodyText = VALUES(BodyText),
RepliesText = VALUES(RepliesText),
Tags = VALUES(Tags),
UpdatedAt = CURRENT_TIMESTAMP
"""
try:
cursor.executemany(query, posts_data)
conn.commit()
inserted = cursor.rowcount
print(f" 📊 Batch inserted {inserted} posts")
return inserted
except Error as e:
print(f" ❌ Error in batch insert: {e}")
conn.rollback()
return 0
finally:
cursor.close()
def process_posts(self, limit: int = 10, batch_size: int = 100, skip_duplicates: bool = True) -> Dict[str, int]:
"""Main processing method."""
source_conn = None
target_conn = None
stats = {
'total_batches': 0,
'total_processed': 0,
'total_inserted': 0,
'total_skipped': 0,
'start_time': time.time()
}
try:
# Connect to databases
source_conn = mysql.connector.connect(**self.source_config)
target_conn = mysql.connector.connect(**self.target_config)
print("✅ Connected to source and target databases")
# Create target table
if not self.create_target_table(target_conn):
print("❌ Failed to create target table")
return stats
offset = 0
# Handle limit=0 (process all posts)
total_limit = float('inf') if limit == 0 else limit
while offset < total_limit:
# Calculate current batch size
if limit == 0:
current_batch_size = batch_size
else:
current_batch_size = min(batch_size, limit - offset)
# Get parent posts
parent_posts = self.get_parent_posts(source_conn, current_batch_size, offset)
if not parent_posts:
print("📄 No more parent posts to process")
# Special handling for limit=0 - break when no more posts
if limit == 0:
break
# For finite limits, break when we've processed all posts
if offset >= limit:
break
stats['total_batches'] += 1
print(f"\n🔄 Processing batch {stats['total_batches']} - posts {offset + 1} to {offset + len(parent_posts)}")
# Get parent IDs
parent_ids = [post['Id'] for post in parent_posts]
# Check for duplicates
if skip_duplicates:
existing_posts = self.get_existing_posts(target_conn, parent_ids)
parent_posts = [p for p in parent_posts if p['Id'] not in existing_posts]
duplicates_count = len(parent_ids) - len(parent_posts)
if duplicates_count > 0:
print(f" ⏭️ Skipping {duplicates_count} duplicate posts")
if not parent_posts:
stats['total_skipped'] += len(parent_ids)
offset += current_batch_size
print(f" ✅ All posts skipped (already exist)")
continue
# Get child posts and tags
child_posts_map = self.get_child_posts(source_conn, parent_ids)
# Extract tags from parent posts
all_tags = {}
for post in parent_posts:
tags_from_source = self.parse_tags(post.get('Tags', ''))
all_tags[post['Id']] = tags_from_source
# Process posts
batch_data = []
processed_count = 0
for parent in parent_posts:
post_id = parent['Id']
replies = child_posts_map.get(post_id, [])
tags = all_tags.get(post_id, set())
# Get creation date
creation_date = parent.get('CreationDate')
if creation_date:
creation_date_str = creation_date.isoformat()
else:
creation_date_str = None
# Create JSON structure
post_json = {
"Id": post_id,
"Title": parent['Title'],
"CreationDate": creation_date_str,
"Body": parent['Body'],
"Replies": replies,
"Tags": sorted(list(tags))
}
# Process for search
search_data = self.process_post_for_search(parent, replies, tags)
# Add to batch
batch_data.append((
post_id,
json.dumps(post_json, ensure_ascii=False),
search_data['search_text'],
search_data['title_text'],
search_data['body_text'],
search_data['replies_text'],
json.dumps(search_data['tags'], ensure_ascii=False)
))
processed_count += 1
# Insert batch
if batch_data:
print(f" 📝 Processing {len(batch_data)} posts...")
inserted = self.insert_posts_batch(target_conn, batch_data)
stats['total_inserted'] += inserted
stats['total_processed'] += processed_count
# Advance offset
offset += current_batch_size
# Show progress
elapsed = time.time() - stats['start_time']
if limit == 0:
print(f" ⏱️ Progress: {offset} posts processed")
else:
print(f" ⏱️ Progress: {offset}/{limit} posts ({offset/limit*100:.1f}%)")
print(f" 📈 Total processed: {stats['total_processed']}, "
f"Inserted: {stats['total_inserted']}, "
f"Skipped: {stats['total_skipped']}")
if elapsed > 0:
print(f" ⚡ Rate: {stats['total_processed']/elapsed:.1f} posts/sec")
stats['end_time'] = time.time()
total_time = stats['end_time'] - stats['start_time']
print(f"\n🎉 Processing complete!")
print(f" 📊 Total batches: {stats['total_batches']}")
print(f" 📝 Total processed: {stats['total_processed']}")
print(f" ✅ Total inserted: {stats['total_inserted']}")
print(f" ⏭️ Total skipped: {stats['total_skipped']}")
print(f" ⏱️ Total time: {total_time:.1f} seconds")
if total_time > 0:
print(f" 🚀 Average rate: {stats['total_processed']/total_time:.1f} posts/sec")
return stats
except Error as e:
print(f"❌ Database error: {e}")
return stats
except Exception as e:
print(f"❌ Error: {e}")
return stats
finally:
if source_conn and source_conn.is_connected():
source_conn.close()
if target_conn and target_conn.is_connected():
target_conn.close()
print("\n🔌 Database connections closed")
def main():
# Default configurations (can be overridden by environment variables)
source_config = {
"host": os.getenv("SOURCE_DB_HOST", "127.0.0.1"),
"port": int(os.getenv("SOURCE_DB_PORT", "3306")),
"user": os.getenv("SOURCE_DB_USER", "stackexchange"),
"password": os.getenv("SOURCE_DB_PASSWORD", "my-password"),
"database": os.getenv("SOURCE_DB_NAME", "stackexchange"),
"use_pure": True,
"ssl_disabled": True
}
target_config = {
"host": os.getenv("TARGET_DB_HOST", "127.0.0.1"),
"port": int(os.getenv("TARGET_DB_PORT", "3306")),
"user": os.getenv("TARGET_DB_USER", "stackexchange"),
"password": os.getenv("TARGET_DB_PASSWORD", "my-password"),
"database": os.getenv("TARGET_DB_NAME", "stackexchange_post"),
"use_pure": True,
"ssl_disabled": True
}
parser = argparse.ArgumentParser(description="Comprehensive StackExchange Posts Processing")
parser.add_argument("--source-host", default=source_config['host'], help="Source database host")
parser.add_argument("--source-port", type=int, default=source_config['port'], help="Source database port")
parser.add_argument("--source-user", default=source_config['user'], help="Source database user")
parser.add_argument("--source-password", default=source_config['password'], help="Source database password")
parser.add_argument("--source-db", default=source_config['database'], help="Source database name")
parser.add_argument("--target-host", default=target_config['host'], help="Target database host")
parser.add_argument("--target-port", type=int, default=target_config['port'], help="Target database port")
parser.add_argument("--target-user", default=target_config['user'], help="Target database user")
parser.add_argument("--target-password", default=target_config['password'], help="Target database password")
parser.add_argument("--target-db", default=target_config['database'], help="Target database name")
parser.add_argument("--limit", type=int, default=10, help="Number of parent posts to process")
parser.add_argument("--batch-size", type=int, default=100, help="Batch size for processing")
parser.add_argument("--warning-large-batches", action="store_true", help="Show warnings for batch sizes > 1000")
parser.add_argument("--skip-duplicates", action="store_true", default=True, help="Skip posts that already exist")
parser.add_argument("--no-skip-duplicates", action="store_true", help="Disable duplicate skipping")
parser.add_argument("--verbose", action="store_true", help="Show detailed progress")
args = parser.parse_args()
# Override configurations with command line arguments
source_config.update({
"host": args.source_host,
"port": args.source_port,
"user": args.source_user,
"password": args.source_password,
"database": args.source_db
})
target_config.update({
"host": args.target_host,
"port": args.target_port,
"user": args.target_user,
"password": args.target_password,
"database": args.target_db
})
skip_duplicates = args.skip_duplicates and not args.no_skip_duplicates
# Check for large batch size
if args.warning_large_batches and args.batch_size > 1000:
print(f"⚠️ WARNING: Large batch size ({args.batch_size}) may cause connection issues")
print(" Consider using smaller batches (1000-5000) for better stability")
print("🚀 StackExchange Posts Processor")
print("=" * 50)
print(f"Source: {source_config['host']}:{source_config['port']}/{source_config['database']}")
print(f"Target: {target_config['host']}:{target_config['port']}/{target_config['database']}")
print(f"Limit: {'All posts' if args.limit == 0 else args.limit} posts")
print(f"Batch size: {args.batch_size}")
print(f"Skip duplicates: {skip_duplicates}")
print("=" * 50)
# Create processor and run
processor = StackExchangeProcessor(source_config, target_config)
stats = processor.process_posts(
limit=args.limit,
batch_size=args.batch_size,
skip_duplicates=skip_duplicates
)
if stats['total_processed'] > 0:
print(f"\n✅ Processing completed successfully!")
else:
print(f"\n❌ No posts were processed!")
if __name__ == "__main__":
main()
Loading…
Cancel
Save