Integration with aiomysql
Complete guide to using sqlo with aiomysql for async database operations.
Introduction
aiomysql is an async MySQL client for Python built on top of PyMySQL. sqlo generates standard parameterized SQL queries that work seamlessly with aiomysql.
Installation
pip install sqlo aiomysql
# Or with uv
uv add sqlo aiomysql
Basic Usage
Creating a Connection Pool
import aiomysql
import asyncio
from sqlo import Q
async def create_pool():
"""Create aiomysql connection pool."""
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='root',
password='password',
db='mydb',
autocommit=False,
minsize=1,
maxsize=10
)
return pool
# Usage
async def main():
pool = await create_pool()
# Use pool...
pool.close()
await pool.wait_closed()
Simple SELECT Query
async def get_user(pool, user_id: int):
"""Fetch a user by ID."""
query = Q.select("id", "name", "email").from_("users").where("id", user_id)
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(sql, params)
result = await cursor.fetchone()
return result
# Usage
# user = await get_user(pool, 123)
# print(user) # {'id': 123, 'name': 'Alice', ...}
INSERT Query
async def create_user(pool, name: str, email: str):
"""Create a new user."""
query = Q.insert_into("users").values([{
"name": name,
"email": email,
"created_at": "NOW()"
}])
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
return cursor.lastrowid
# Usage
# user_id = await create_user(pool, "Bob", "bob@example.com")
UPDATE Query
async def update_user(pool, user_id: int, **updates):
"""Update user fields."""
query = (
Q.update("users")
.set(updates)
.where("id", user_id)
)
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
return cursor.rowcount
# Usage
# rows_updated = await update_user(pool, 123, name="Alice Updated")
DELETE Query
async def delete_user(pool, user_id: int):
"""Delete a user by ID."""
query = Q.delete_from("users").where("id", user_id)
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
return cursor.rowcount
# Usage
# deleted = await delete_user(pool, 123)
Advanced Patterns
Generic Query Executor
from typing import List, Dict, Any, Optional
class AsyncDatabase:
"""Async database wrapper using sqlo and aiomysql."""
def __init__(self, pool: aiomysql.Pool):
self.pool = pool
async def fetch_one(self, query) -> Optional[Dict[str, Any]]:
"""Fetch a single row."""
sql, params = query.build()
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(sql, params)
return await cursor.fetchone()
async def fetch_all(self, query) -> List[Dict[str, Any]]:
"""Fetch all matching rows."""
sql, params = query.build()
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(sql, params)
return await cursor.fetchall()
async def execute(self, query) -> int:
"""Execute a query and return affected row count."""
sql, params = query.build()
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
return cursor.rowcount
async def insert(self, query) -> int:
"""Execute INSERT and return last inserted ID."""
sql, params = query.build()
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
return cursor.lastrowid
# Usage
async def main():
pool = await create_pool()
db = AsyncDatabase(pool)
# Fetch one
user = await db.fetch_one(
Q.select("*").from_("users").where("id", 123)
)
# Fetch all
users = await db.fetch_all(
Q.select("*").from_("users").where("active", True)
)
# Insert
user_id = await db.insert(
Q.insert_into("users").values([{"name": "Alice", "email": "alice@example.com"}])
)
# Update
updated = await db.execute(
Q.update("users").set({"active": False}).where("id", user_id)
)
pool.close()
await pool.wait_closed()
Transaction Management
async def transfer_funds(pool, from_account: int, to_account: int, amount: float):
"""Transfer funds between accounts within a transaction."""
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
try:
# Start transaction (autocommit is False in pool config)
await conn.begin()
# Deduct from source account
deduct_query = (
Q.update("accounts")
.set({"balance": "balance - %s"})
.where("id", from_account)
)
sql, params = deduct_query.build()
await cursor.execute(sql, (amount,) + params[1:])
# Add to destination account
add_query = (
Q.update("accounts")
.set({"balance": "balance + %s"})
.where("id", to_account)
)
sql, params = add_query.build()
await cursor.execute(sql, (amount,) + params[1:])
# Commit transaction
await conn.commit()
return True
except Exception as e:
# Rollback on error
await conn.rollback()
raise Exception(f"Transfer failed: {e}")
Batch Operations
async def batch_insert_users(pool, users: List[Dict[str, Any]], chunk_size: int = 500):
"""Insert users in batches."""
total_inserted = 0
for i in range(0, len(users), chunk_size):
chunk = users[i:i + chunk_size]
query = Q.insert_into("users").values(chunk)
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, params)
await conn.commit()
total_inserted += cursor.rowcount
return total_inserted
# Usage
# users = [{"name": f"User{i}", "email": f"user{i}@example.com"} for i in range(1000)]
# inserted = await batch_insert_users(pool, users)
Pagination Helper
from typing import Tuple
async def paginate(
pool,
base_query,
page: int = 1,
per_page: int = 20
) -> Tuple[List[Dict[str, Any]], int]:
"""
Paginate query results.
Returns: (results, total_count)
"""
# Get total count
count_query = Q.select("COUNT(*) as total").from_(
base_query.as_("subquery")
)
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get count
sql, params = count_query.build()
await cursor.execute(sql, params)
count_result = await cursor.fetchone()
total = count_result['total']
# Get paginated results
paginated_query = base_query.paginate(page=page, per_page=per_page)
sql, params = paginated_query.build()
await cursor.execute(sql, params)
results = await cursor.fetchall()
return results, total
# Usage
async def get_users_page(pool, page: int = 1):
base_query = Q.select("*").from_("users").where("active", True).order_by("-created_at")
results, total = await paginate(pool, base_query, page=page, per_page=20)
return {
"results": results,
"total": total,
"page": page,
"pages": (total + 19) // 20 # Ceiling division
}
Context Manager Pattern
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(pool):
"""Context manager for database connections."""
conn = await pool.acquire()
try:
yield conn
finally:
pool.release(conn)
@asynccontextmanager
async def db_transaction(pool):
"""Context manager for transactions."""
async with db_connection(pool) as conn:
try:
await conn.begin()
yield conn
await conn.commit()
except Exception:
await conn.rollback()
raise
# Usage
async def create_user_with_profile(pool, user_data: dict, profile_data: dict):
"""Create user and profile in a transaction."""
async with db_transaction(pool) as conn:
async with conn.cursor() as cursor:
# Insert user
user_query = Q.insert_into("users").values([user_data])
sql, params = user_query.build()
await cursor.execute(sql, params)
user_id = cursor.lastrowid
# Insert profile
profile_data['user_id'] = user_id
profile_query = Q.insert_into("profiles").values([profile_data])
sql, params = profile_query.build()
await cursor.execute(sql, params)
return user_id
Error Handling
import aiomysql.err as mysql_errors
async def safe_insert_user(pool, email: str, name: str):
"""Insert user with duplicate key handling."""
query = Q.insert_into("users").values([{
"email": email,
"name": name
}])
sql, params = query.build()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
try:
await cursor.execute(sql, params)
await conn.commit()
return {"success": True, "id": cursor.lastrowid}
except mysql_errors.IntegrityError as e:
if e.args[0] == 1062: # Duplicate entry
return {"success": False, "error": "Email already exists"}
raise
except Exception as e:
await conn.rollback()
return {"success": False, "error": str(e)}
Connection Pool Best Practices
# Configure pool appropriately
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='user',
password='password',
db='database',
autocommit=False, # Explicit transaction control
minsize=5, # Minimum connections
maxsize=20, # Maximum connections
pool_recycle=3600, # Recycle connections every hour
echo=False, # Set True for SQL logging
)
# Use context managers
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
# Query execution
pass
# Clean up on shutdown
pool.close()
await pool.wait_closed()
FastAPI Integration Example
from fastapi import FastAPI, Depends
from typing import Optional
import aiomysql
app = FastAPI()
# Global pool
db_pool: Optional[aiomysql.Pool] = None
@app.on_event("startup")
async def startup():
"""Create database pool on startup."""
global db_pool
db_pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='user',
password='password',
db='mydb',
minsize=5,
maxsize=20
)
@app.on_event("shutdown")
async def shutdown():
"""Close pool on shutdown."""
if db_pool:
db_pool.close()
await db_pool.wait_closed()
async def get_db():
"""Dependency for database access."""
return AsyncDatabase(db_pool)
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncDatabase = Depends(get_db)):
"""Get user by ID."""
query = Q.select("*").from_("users").where("id", user_id)
user = await db.fetch_one(query)
if user:
return user
return {"error": "User not found"}, 404
@app.post("/users")
async def create_user(name: str, email: str, db: AsyncDatabase = Depends(get_db)):
"""Create a new user."""
query = Q.insert_into("users").values([{
"name": name,
"email": email
}])
user_id = await db.insert(query)
return {"id": user_id, "name": name, "email": email}
@app.get("/users")
async def list_users(page: int = 1, db: AsyncDatabase = Depends(get_db)):
"""List users with pagination."""
base_query = Q.select("*").from_("users").order_by("-created_at")
results, total = await paginate(db_pool, base_query, page=page, per_page=20)
return {
"results": results,
"total": total,
"page": page
}
Performance Tips
Use Connection Pooling: Always use a connection pool, never create connections per request
Batch Operations: Use batch inserts/updates for multiple records
Prepared Statements: sqlo generates parameterized queries automatically
Transaction Management: Group related operations in transactions
Index Usage: Ensure database tables have appropriate indexes
See Also
Batch Operations - Efficient bulk operations
SELECT Queries - Query building
Security Guide - SQL injection prevention