Recipes and Best Practices

Collection of common patterns and best practices for using sqlo effectively.

CRUD Operations

User Management

from sqlo import Q
from typing import Optional, List, Dict, Any

class UserRepository:
    """Repository pattern for user operations."""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    def create(self, name: str, email: str) -> int:
        """Create a new user."""
        query = Q.insert_into("users").values([{
            "name": name,
            "email": email,
            "created_at": "NOW()"
        }])
        cursor = self.db.cursor()
        cursor.execute(*query.build())
        self.db.commit()
        return cursor.lastrowid
    
    def get_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """Get user by ID."""
        query = Q.select("*").from_("users").where("id", user_id)
        cursor = self.db.cursor(dictionary=True)
        cursor.execute(*query.build())
        return cursor.fetchone()
    
    def get_by_email(self, email: str) -> Optional[Dict[str, Any]]:
        """Get user by email."""
        query = Q.select("*").from_("users").where("email", email)
        cursor = self.db.cursor(dictionary=True)
        cursor.execute(*query.build())
        return cursor.fetchone()
    
    def list_active(self, limit: int = 100) -> List[Dict[str, Any]]:
        """List active users."""
        query = (
            Q.select("*")
            .from_("users")
            .where("active", True)
            .order_by("-created_at")
            .limit(limit)
        )
        cursor = self.db.cursor(dictionary=True)
        cursor.execute(*query.build())
        return cursor.fetchall()
    
    def update(self, user_id: int, **fields) -> bool:
        """Update user fields."""
        fields["updated_at"] = "NOW()"
        query = Q.update("users").set(fields).where("id", user_id)
        cursor = self.db.cursor()
        cursor.execute(*query.build())
        self.db.commit()
        return cursor.rowcount > 0
    
    def delete(self, user_id: int) -> bool:
        """Delete a user."""
        query = Q.delete_from("users").where("id", user_id)
        cursor = self.db.cursor()
        cursor.execute(*query.build())
        self.db.commit()
        return cursor.rowcount > 0
    
    def soft_delete(self, user_id: int) -> bool:
        """Soft delete (mark as deleted)."""
        return self.update(user_id, deleted_at="NOW()", active=False)

Search and Filtering

Dynamic Search Builder

from typing import Optional

def build_search_query(
    search: Optional[str] = None,
    status: Optional[str] = None,
    min_age: Optional[int] = None,
    max_age: Optional[int] = None,
    tags: Optional[List[str]] = None,
    page: int = 1,
    per_page: int = 20
):
    """Build a search query with optional filters."""
    query = Q.select("*").from_("users")
    
    # Text search
    if search:
        query = query.where_like("name", f"%{search}%")
    
    # Status filter
    if status:
        query = query.where("status", status)
    
    # Age range
    if min_age is not None:
        query = query.where("age >=", min_age)
    if max_age is not None:
        query = query.where("age <=", max_age)
    
    # Tags (using JSON or separate table)
    if tags:
        query = query.where_in("tag_id", tags)
    
    # Pagination
    query = query.order_by("-created_at").paginate(page=page, per_page=per_page)
    
    return query

# Usage
query = build_search_query(
    search="John",
    status="active",
    min_age=18,
    tags=[1, 2, 3],
    page=1
)

Aggregations and Reports

Sales Report

from sqlo import func, Window

def monthly_sales_report(year: int):
    """Generate monthly sales report."""
    query = (
        Q.select(
            Raw(f"DATE_FORMAT(created_at, '%Y-%m')").as_("month"),
            func.count("*").as_("order_count"),
            func.sum("total").as_("total_sales"),
            func.avg("total").as_("avg_order_value"),
            func.min("total").as_("min_order"),
            func.max("total").as_("max_order")
        )
        .from_("orders")
        .where("YEAR(created_at)", year)
        .group_by(Raw("DATE_FORMAT(created_at, '%Y-%m')"))
        .order_by("month")
    )
    return query

Top N per Category

def top_products_by_category(n: int = 5):
    """Get top N products in each category."""
    ranked = (
        Q.select(
            "category_id",
            "product_id",
            "name",
            "sales",
            func.row_number().over(
                Window.partition_by("category_id").and_order_by("-sales")
            ).as_("rank")
        )
        .from_("products")
        .as_("ranked")
    )
    
    query = (
        Q.select("*")
        .with_(ranked)
        .from_("ranked")
        .where("rank <=", n)
    )
    return query

Data Transformation

Pivot Table

def sales_pivot_by_month_and_product():
    """Pivot sales data by month and product."""
    query = Q.select(
        Raw("DATE_FORMAT(created_at, '%Y-%m')").as_("month"),
        Raw("SUM(CASE WHEN product_id = 1 THEN total ELSE 0 END)").as_("product_1"),
        Raw("SUM(CASE WHEN product_id = 2 THEN total ELSE 0 END)").as_("product_2"),
        Raw("SUM(CASE WHEN product_id = 3 THEN total ELSE 0 END)").as_("product_3"),
        func.sum("total").as_("total")
    ).from_("sales").group_by(Raw("DATE_FORMAT(created_at, '%Y-%m')"))
    
    return query

Data Normalization

def normalize_user_data():
    """Extract first_name and last_name from full_name."""
    query = (
        Q.update("users")
        .set({
            "first_name": Raw("SUBSTRING_INDEX(full_name, ' ', 1)"),
            "last_name": Raw("SUBSTRING_INDEX(full_name, ' ', -1)")
        })
        .where_null("first_name")
    )
    return query

Complex Joins

Multi-Level Join

def get_orders_with_details():
    """Get orders with user, product, and category info."""
    query = (
        Q.select(
            "o.id as order_id",
            "u.name as customer_name",
            "u.email",
            "p.name as product_name",
            "c.name as category_name",
            "o.quantity",
            "o.total"
        )
        .from_("orders o")
        .join("users u", "u.id = o.user_id")
        .join("products p", "p.id = o.product_id")
        .join("categories c", "c.id = p.category_id")
        .where("o.created_at >", "2024-01-01")
        .order_by("-o.created_at")
    )
    return query

Self Join

def get_employee_hierarchy():
    """Get employees with their managers."""
    query = (
        Q.select(
            "e.id",
            "e.name as employee_name",
            "e.title",
            "m.name as manager_name",
            "m.title as manager_title"
        )
        .from_("employees e")
        .left_join("employees m", "m.id = e.manager_id")
        .order_by("m.name", "e.name")
    )
    return query

Performance Optimization

Query Result Caching

import hashlib
import json
from functools import lru_cache

class QueryCache:
    """Simple query result cache."""
    
    def __init__(self):
        self._cache = {}
    
    def _get_cache_key(self, sql: str, params: tuple) -> str:
        """Generate cache key from SQL and params."""
        cache_str = f"{sql}:{json.dumps(params)}"
        return hashlib.md5(cache_str.encode()).hexdigest()
    
    def get(self, query):
        """Get cached result if exists."""
        sql, params = query.build()
        key = self._get_cache_key(sql, params)
        return self._cache.get(key)
    
    def set(self, query, result, ttl: int = 300):
        """Cache query result."""
        sql, params = query.build()
        key = self._get_cache_key(sql, params)
        self._cache[key] = result
        # In production, use Redis with TTL
    
    def invalidate(self, pattern: Optional[str] = None):
        """Invalidate cache entries."""
        if pattern is None:
            self._cache.clear()
        else:
            # Implement pattern matching
            pass

# Usage
cache = QueryCache()

def get_popular_products(cache: QueryCache):
    query = Q.select("*").from_("products").where("featured", True).limit(10)
    
    # Check cache
    cached = cache.get(query)
    if cached:
        return cached
    
    # Execute query
    cursor = db.cursor(dictionary=True)
    cursor.execute(*query.build())
    results = cursor.fetchall()
    
    # Cache results
    cache.set(query, results, ttl=600)
    return results

Efficient Pagination

def cursor_based_pagination(last_id: Optional[int] = None, limit: int = 20):
    """Cursor-based pagination (more efficient than OFFSET)."""
    query = Q.select("*").from_("posts").order_by("id").limit(limit)
    
    if last_id:
        query = query.where("id >", last_id)
    
    return query

# Usage
# First page
results = cursor_based_pagination(limit=20)

# Next page (using last ID from previous results)
last_id = results[-1]['id']
next_results = cursor_based_pagination(last_id=last_id, limit=20)

Data Integrity

Unique Constraint Handling

def upsert_user(email: str, name: str):
    """Insert or update user if email exists."""
    query = (
        Q.insert_into("users")
        .values([{"email": email, "name": name}])
        .on_duplicate_key_update({"name": name, "updated_at": "NOW()"})
    )
    return query

Referential Integrity Check

def delete_user_cascade(user_id: int):
    """Delete user and all related records."""
    # Note: This is manual cascade, better to use foreign keys with ON DELETE CASCADE
    
    # Delete related records first
    queries = [
        Q.delete_from("user_sessions").where("user_id", user_id),
        Q.delete_from("user_preferences").where("user_id", user_id),
        Q.delete_from("orders").where("user_id", user_id),
        Q.delete_from("users").where("id", user_id)
    ]
    
    cursor = db.cursor()
    try:
        db.begin()
        for query in queries:
            cursor.execute(*query.build())
        db.commit()
        return True
    except Exception as e:
        db.rollback()
        raise e

Audit Logging

Automatic Audit Trail

def create_audit_trigger(table: str, action: str, user_id: int, record_id: int):
    """Create audit log entry."""
    query = Q.insert_into("audit_log").values([{
        "table_name": table,
        "action": action,
        "user_id": user_id,
        "record_id": record_id,
        "timestamp": "NOW()"
    }])
    return query

def update_with_audit(table: str, record_id: int, user_id: int, **fields):
    """Update record with audit logging."""
    # Update main record
    update_query = Q.update(table).set(fields).where("id", record_id)
    
    # Create audit log
    audit_query = create_audit_trigger(table, "UPDATE", user_id, record_id)
    
    cursor = db.cursor()
    try:
        db.begin()
        cursor.execute(*update_query.build())
        cursor.execute(*audit_query.build())
        db.commit()
        return True
    except Exception:
        db.rollback()
        raise

Testing Helpers

Query Assertion Helpers

def assert_query_equals(query, expected_sql: str, expected_params: tuple):
    """Assert that query generates expected SQL."""
    sql, params = query.build()
    assert sql == expected_sql, f"SQL mismatch:\nGot: {sql}\nExpected: {expected_sql}"
    assert params == expected_params, f"Params mismatch:\nGot: {params}\nExpected: {expected_params}"

# Usage in tests
def test_user_query():
    query = Q.select("*").from_("users").where("id", 123)
    assert_query_equals(
        query,
        "SELECT * FROM `users` WHERE `id` = %s",
        (123,)
    )

Mock Database for Testing

class MockCursor:
    """Mock database cursor for testing."""
    
    def __init__(self, return_value=None):
        self.return_value = return_value
        self.executed_sql = []
        self.executed_params = []
        self.rowcount = 1
        self.lastrowid = 1
    
    def execute(self, sql, params):
        self.executed_sql.append(sql)
        self.executed_params.append(params)
    
    def fetchone(self):
        return self.return_value
    
    def fetchall(self):
        return [self.return_value] if self.return_value else []

# Usage in tests
def test_user_repository():
    mock_cursor = MockCursor(return_value={"id": 1, "name": "Alice"})
    
    # Test query building
    query = Q.select("*").from_("users").where("id", 1)
    mock_cursor.execute(*query.build())
    
    assert "SELECT" in mock_cursor.executed_sql[0]
    assert mock_cursor.executed_params[0] == (1,)

See Also