热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为Python开发者设计,能够根据具体的数据库操作需求生成准确、高效的SQLAlchemy查询语句。它采用技术文档写作风格,确保代码的精确性、清晰度和专业性,同时提供完整的代码解释和使用说明,帮助开发者理解查询逻辑并正确应用于实际项目中。适用于数据查询、数据操作、表结构管理等多种数据库操作场景。
查询需求分析
核心代码
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Iterable, Optional, Dict, Any, List
from sqlalchemy import (
create_engine,
select,
func,
and_,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
Session,
)
from sqlalchemy import String, Integer, DateTime, Numeric, ForeignKey
from sqlalchemy.exc import SQLAlchemyError
# ========== 声明式模型(最小必要字段) ==========
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
username: Mapped[str] = mapped_column(String(255), nullable=False)
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
order_number: Mapped[str] = mapped_column(String(64), nullable=False, index=True, unique=True)
amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
order_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
status: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
tenant_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
# ========== 查询函数 ==========
MAX_PAGE_SIZE = 100 # 防止过大分页导致压力
def _serialize_value(val: Any) -> Any:
"""将值序列化为可 JSON 化的基本类型。"""
if isinstance(val, datetime):
# 统一输出 ISO 8601 字符串(含时区或假定 UTC)
if val.tzinfo is None:
return val.replace(tzinfo=timezone.utc).isoformat()
return val.isoformat()
if isinstance(val, Decimal):
# 保留精度,转字符串
return str(val)
return val
def fetch_paid_orders(
session: Session,
*,
page: int = 1,
size: int = 20,
amount_min: Optional[Decimal] = None,
amount_max: Optional[Decimal] = None,
tenant_id: Optional[int] = None,
selected_fields: Optional[Iterable[str]] = None,
) -> Dict[str, Any]:
"""
检索最近30天已支付订单,左联 users 取 username,支持参数化过滤与分页,返回分页结构。
- 安全:字段白名单、参数化、限制分页大小。
"""
# 1) 分页参数校正
if page < 1:
page = 1
if size < 1:
size = 1
if size > MAX_PAGE_SIZE:
size = MAX_PAGE_SIZE
# 2) 时间边界(以 UTC 计算,避免各数据库 CURRENT_DATE 差异)
now_utc = datetime.now(timezone.utc)
cutoff_dt = now_utc - timedelta(days=30)
# 3) 字段白名单
allowed_fields = {
"order_number": Order.order_number,
"amount": Order.amount,
"order_date": Order.order_date,
"username": User.username,
}
default_fields = ["order_number", "amount", "order_date", "username"]
if selected_fields is None:
selected_fields = default_fields
# 只保留白名单内字段
safe_fields = [f for f in selected_fields if f in allowed_fields]
if not safe_fields:
raise ValueError("selected_fields 为空或不在白名单中")
# 显式 label,使 Row 映射键名稳定
columns = [allowed_fields[f].label(f) for f in safe_fields]
# 4) 过滤条件
filters = [
Order.status == "paid",
Order.order_date >= cutoff_dt,
]
if tenant_id is not None:
filters.append(Order.tenant_id == tenant_id)
if amount_min is not None:
filters.append(Order.amount >= amount_min)
if amount_max is not None:
filters.append(Order.amount <= amount_max)
# 5) 主查询(左联 users,按下单时间倒序 + 次级 id 倒序保证稳定排序)
stmt = (
select(*columns)
.select_from(Order)
.join(User, Order.user_id == User.id, isouter=True)
.where(and_(*filters))
.order_by(Order.order_date.desc(), Order.id.desc())
.offset((page - 1) * size)
.limit(size)
)
# 6) 计数查询(对 orders 计数,避免 join 带来的干扰)
count_stmt = select(func.count(Order.id)).where(and_(*filters))
try:
rows = session.execute(stmt).all()
total = session.execute(count_stmt).scalar_one()
except SQLAlchemyError as e:
# 可根据需要记录日志或封装自定义异常
raise RuntimeError(f"数据库查询失败: {e}") from e
# 7) 序列化结果
items: List[Dict[str, Any]] = []
for row in rows:
record = {}
for key, val in row._mapping.items():
record[key] = _serialize_value(val)
items.append(record)
pages = (total + size - 1) // size if size > 0 else 0
result = {
"total": total,
"page": page,
"size": size,
"pages": pages,
"has_next": page < pages,
"has_prev": page > 1,
"items": items,
}
return result
# ========== 连接与 Session(示例) ==========
def get_session(database_url: str) -> Session:
"""
基于给定的 database_url 创建会话(请勿在代码仓库中硬编码敏感信息)。
例: postgresql+psycopg://user:pass@host:5432/dbname
mysql+aiomysql:// (若使用异步则需改造为 AsyncSession)
sqlite:///./app.db
"""
engine = create_engine(database_url, pool_pre_ping=True, future=True)
return Session(engine)
代码解释
使用示例
from decimal import Decimal
# 使用前请设置实际的数据库连接 URL(不要将真实凭据提交到仓库或日志)
DATABASE_URL = "postgresql+psycopg://user:password@host:5432/dbname"
def main():
# 1) 准备会话
with get_session(DATABASE_URL) as session:
try:
# 2) 调用查询函数
result = fetch_paid_orders(
session,
page=1,
size=20,
amount_min=Decimal("50.00"), # 可选:最小金额
amount_max=Decimal("500.00"), # 可选:最大金额
tenant_id=42, # 可选:租户隔离
# selected_fields=["order_number", "amount", "order_date", "username"], # 可选,不传则默认
)
# 3) 处理结果
print("total:", result["total"])
for item in result["items"]:
# item 形如:{"order_number": "...", "amount": "123.45", "order_date": "2025-11-10T12:34:56+00:00", "username": "..."}
print(item)
except RuntimeError as e:
# 异常处理(可记录日志)
print("查询失败:", e)
if __name__ == "__main__":
main()
注意事项
查询需求分析
核心代码
from __future__ import annotations
from typing import Any, Dict, Iterable, List, Optional, Tuple
from dataclasses import dataclass
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session, sessionmaker
from sqlalchemy import String, Integer, Boolean
# ---------- ORM 映射 ----------
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
sku: Mapped[str] = mapped_column(String(64), index=True, nullable=False)
warehouse_id: Mapped[int] = mapped_column(Integer, index=True, nullable=False)
stock: Mapped[int] = mapped_column(Integer, nullable=False)
active: Mapped[bool] = mapped_column(Boolean, index=True, nullable=False, default=True)
# 可选的操作人追踪字段
last_operator: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# ---------- 业务函数 ----------
@dataclass
class DecrementResult:
affected_rows: int
successes: List[Dict[str, Any]]
skipped: List[Dict[str, Any]]
def _normalize_payload(payload: Iterable[Dict[str, Any]]) -> Dict[str, int]:
"""
归并 payload(同一 sku 多次出现则累加 delta),并做基础校验。
仅保留 delta > 0 的项。
"""
sku_to_delta: Dict[str, int] = {}
for i, item in enumerate(payload):
if not isinstance(item, dict):
raise ValueError(f"payload[{i}] 必须是字典")
sku = item.get("sku")
delta = item.get("delta")
if not sku or not isinstance(sku, str):
raise ValueError(f"payload[{i}].sku 必须是非空字符串")
if not isinstance(delta, int):
raise ValueError(f"payload[{i}].delta 必须是整数")
if delta <= 0:
# 非正扣减没有意义,跳过
continue
sku_to_delta[sku] = sku_to_delta.get(sku, 0) + delta
return sku_to_delta
def decrement_stocks(
session: Session,
*,
warehouse_id: int,
payload: Iterable[Dict[str, Any]],
operator_id: Optional[int] = None,
lock_nowait: bool = False,
) -> DecrementResult:
"""
批量扣减库存(行锁 + 事务环境中执行)。
规则:
- 仅 active = True 且 warehouse_id 匹配的记录参与。
- 库存不足(stock < delta)跳过。
- 支持可选 operator_id -> 写入 last_operator。
并在事务内返回原始结果(纯 Python 类型)。
要求:调用方应确保在 session.begin() 或外层事务内调用,以获得行锁效果。
"""
sku_to_delta = _normalize_payload(payload)
if not sku_to_delta:
return DecrementResult(affected_rows=0, successes=[], skipped=[])
skus = list(sku_to_delta.keys())
# 1) 选中目标行并加行级锁
stmt = (
select(Product)
.where(
Product.active.is_(True),
Product.warehouse_id == warehouse_id,
Product.sku.in_(skus),
)
.with_for_update(nowait=lock_nowait) # 行锁(SQLite 会忽略,见注意事项)
)
rows = session.execute(stmt).scalars().all()
found_skus = set()
successes: List[Dict[str, Any]] = []
skipped: List[Dict[str, Any]] = []
# 2) 逐行检查并更新(已加锁,避免并发超卖)
for p in rows:
found_skus.add(p.sku)
delta = sku_to_delta.get(p.sku, 0)
if delta <= 0:
skipped.append(
{"sku": p.sku, "delta": delta, "reason": "invalid_delta", "current_stock": p.stock}
)
continue
if p.stock >= delta:
p.stock = p.stock - delta
if operator_id is not None:
p.last_operator = operator_id
successes.append({"sku": p.sku, "delta": delta, "new_stock": p.stock})
else:
skipped.append(
{"sku": p.sku, "delta": delta, "reason": "insufficient_stock", "current_stock": p.stock}
)
# 3) 未找到或不满足 active/warehouse 条件的 SKU 视为跳过
missing_skus = set(skus) - found_skus
for sku in missing_skus:
skipped.append(
{"sku": sku, "delta": sku_to_delta[sku], "reason": "not_found_or_inactive_or_warehouse_mismatch"}
)
# 4) 刷新到数据库(仍在事务内,可继续查询最新库存)
session.flush()
return DecrementResult(
affected_rows=len(successes),
successes=successes,
skipped=skipped,
)
def get_stocks(
session: Session,
*,
warehouse_id: int,
skus: Iterable[str],
) -> List[Dict[str, Any]]:
"""
一次性查询给定 sku 的当前库存(原始结果)。
"""
skus = list(set(skus))
if not skus:
return []
stmt = select(Product.sku, Product.stock).where(
Product.warehouse_id == warehouse_id,
Product.sku.in_(skus),
)
rows = session.execute(stmt).all()
return [{"sku": sku, "stock": stock} for (sku, stock) in rows]
# ---------- Session 工厂(示例) ----------
# 请替换为你的数据库 URL(勿在代码中硬编码敏感信息;可用环境变量加载)
# engine = create_engine(os.getenv("DATABASE_URL"), pool_pre_ping=True, future=True)
# 这里仅作占位演示,不要在生产中直接使用 sqlite+aiosqlite 或内存数据库
# engine = create_engine("postgresql+psycopg://user:pass@host:5432/dbname", pool_pre_ping=True)
代码解释
使用示例
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 1) 创建 Engine / Session
# 替换为你的数据库连接串(请从环境变量读取,避免泄漏敏感信息)
engine = create_engine("postgresql+psycopg://user:pass@host:5432/dbname", pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
# 首次建表(如果你还没有表结构,生产中请用迁移工具 Alembic 管理)
# Base.metadata.create_all(engine)
payload = [
{"sku": "SKU-001", "delta": 3},
{"sku": "SKU-002", "delta": 5},
{"sku": "SKU-001", "delta": 2}, # 同 SKU 会被合并为 delta=5
{"sku": "SKU-NotExist", "delta": 1},
]
warehouse_id = 10
operator_id = 9527
# 2) 正常执行并提交
with SessionLocal.begin() as session:
# 扣减库存(行锁生效)
result = decrement_stocks(
session,
warehouse_id=warehouse_id,
payload=payload,
operator_id=operator_id,
lock_nowait=False, # 可改 True 以在遇到锁冲突时立即失败
)
print("decrement result:", {
"affected_rows": result.affected_rows,
"successes": result.successes,
"skipped": result.skipped
})
# 一次性查询更新后的库存(仍在同一事务内,读取最新值)
updated_skus = list({item["sku"] for item in payload})
current_stocks = get_stocks(session, warehouse_id=warehouse_id, skus=updated_skus)
print("current stocks:", current_stocks)
# 3) 回滚示例(演示在出现业务错误时如何回滚)
try:
with SessionLocal.begin() as session:
# 故意传入非法 delta,触发 ValueError
bad_payload = [{"sku": "SKU-001", "delta": -100}]
_ = decrement_stocks(
session,
warehouse_id=warehouse_id,
payload=bad_payload,
operator_id=operator_id,
)
# 任何异常抛出都会导致该 with 块自动回滚
except Exception as e:
print("rolled back due to:", repr(e))
# 4) 事务外的读取(验证提交结果)
with SessionLocal() as session:
current_stocks = get_stocks(session, warehouse_id=warehouse_id, skus=["SKU-001", "SKU-002", "SKU-NotExist"])
print("stocks after commit:", current_stocks)
注意事项
查询需求分析
若你的列名与上述不同(如注册来源列为 register_source,或会话时间列为 created_at),请在代码中相应替换 Users.signup_source、UserSessions.started_at 等引用。
核心代码
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Optional
from sqlalchemy import (
create_engine,
select,
func,
case,
cast,
Float,
and_,
or_,
ForeignKey,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from sqlalchemy.types import String, Integer, DateTime
# ----------------------------
# ORM 模型定义(按需调整列名)
# ----------------------------
class Base(DeclarativeBase):
pass
class Users(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
status: Mapped[str] = mapped_column(String(32), index=True)
signup_source: Mapped[Optional[str]] = mapped_column(String(64), index=True) # 注册来源
city: Mapped[Optional[str]] = mapped_column(String(64), index=True)
app_version: Mapped[Optional[str]] = mapped_column(String(32), index=True)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), index=True)
class UserSessions(Base):
__tablename__ = "user_sessions"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) # 会话开始时间
# ----------------------------
# 公共工具
# ----------------------------
def _since_7d(now_utc: Optional[datetime] = None) -> datetime:
"""计算近7日时间窗口的起点(UTC)。"""
now = now_utc or datetime.now(timezone.utc)
return now - timedelta(days=7)
def _filters(enabled_only=True,
city: Optional[str] = None,
app_version: Optional[str] = None):
"""构造 users 表的过滤条件列表。"""
conds = []
if enabled_only:
conds.append(Users.status == "enabled")
if city is not None:
conds.append(Users.city == city)
if app_version is not None:
conds.append(Users.app_version == app_version)
return conds
def _rows_to_dicts(rows: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""将 RowMapping 转换为普通字典列表。"""
return [dict(r) for r in rows]
# ----------------------------
# 方案一:聚合 + 子查询(推荐,一次 join,去重简单)
# ----------------------------
def active_by_source_subquery(
session: Session,
city: Optional[str] = None,
app_version: Optional[str] = None,
limit: int = 20,
now_utc: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
"""
统计近7日活跃用户(按注册来源分组),子查询方案。
返回:[{source, total_users, active_users, active_rate}, ...]
"""
since_ts = _since_7d(now_utc)
# 近7日每个用户的会话计数(子查询),避免多行重复
sess_7d_sq = (
select(
UserSessions.user_id.label("user_id"),
func.count().label("sess_7d"),
)
.where(UserSessions.started_at >= since_ts)
.group_by(UserSessions.user_id)
.subquery("sess_7d")
)
# 基础用户过滤
base_user_cond = and_(*_filters(enabled_only=True, city=city, app_version=app_version))
# 注册来源为空时归为 'unknown'
source_expr = func.coalesce(Users.signup_source, "unknown").label("source")
# 定义活跃条件:last_login>=since_ts 或 sess_7d>=2
# 注意:left join 后 sess_7d 可能为 NULL,需要处理为 false
active_cond = or_(
Users.last_login >= since_ts,
sess_7d_sq.c.sess_7d >= 2,
)
# 统计:总人数、活跃人数(均按 distinct users.id 去重)
# 使用 count(distinct case when ...) 保证去重统计活跃用户
active_id_expr = case((active_cond, Users.id))
total_users_expr = func.count(func.distinct(Users.id)).label("total_users")
active_users_expr = func.count(func.distinct(active_id_expr)).label("active_users")
active_rate_expr = (
cast(func.count(func.distinct(active_id_expr)), Float)
/ func.nullif(func.count(func.distinct(Users.id)), 0)
).label("active_rate")
stmt = (
select(
source_expr,
total_users_expr,
active_users_expr,
active_rate_expr,
)
.select_from(Users)
.outerjoin(sess_7d_sq, Users.id == sess_7d_sq.c.user_id)
.where(base_user_cond)
.group_by(source_expr)
# 为了跨数据库兼容,order_by 使用完整表达式而非别名
.order_by(func.count(func.distinct(Users.id)).desc())
.limit(limit)
)
result = session.execute(stmt).mappings().all()
return _rows_to_dicts(result)
# ----------------------------
# 方案二:CTE(可读性更好,便于扩展)
# ----------------------------
def active_by_source_cte(
session: Session,
city: Optional[str] = None,
app_version: Optional[str] = None,
limit: int = 20,
now_utc: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
"""
统计近7日活跃用户(按注册来源分组),CTE 方案。
返回:[{source, total_users, active_users, active_rate}, ...]
"""
since_ts = _since_7d(now_utc)
base_users_cte = (
select(
Users.id.label("user_id"),
func.coalesce(Users.signup_source, "unknown").label("source"),
Users.last_login.label("last_login"),
)
.where(and_(*_filters(enabled_only=True, city=city, app_version=app_version)))
.cte("base_users")
)
sess_7d_cte = (
select(
UserSessions.user_id.label("user_id"),
func.count().label hoop("sess
把零散的数据库需求,快速变成可直接落地的 SQLAlchemy 查询方案。用户只需用自然语言描述要做什么、想要的返回形式、目标表和条件,即可得到:可粘贴即用的高质量代码、逐步解释与注意事项、以及兼顾性能与安全的实现建议。通过标准化输出、清晰注释和错误防护,帮助个人开发者提升交付速度,帮助团队统一风格、降低线上风险,覆盖查询、插入、更新、删除与常见场景的组合操作。支持多类型项目(Web、数据处理、内部工具),既满足“马上能用”的效率诉求,又兼顾“长期可维护”的工程品质。立即试用,付费升级可解锁复杂关联、多表联查、批量操作、分页、异步用法与表结构管理等高级能力。
快速生成复杂联表查询与分页排序,现成事务封装可直接复用,减少手写SQL与审查时间,加速功能迭代稳定上线。
便捷编写聚合、分组与窗口统计,按需导出查询结果,沉淀可复用脚本,为报表与看板提供一致且可追溯的数据口径。
用少量需求描述即可产出可用数据访问层,统一风格与异常处理,快速搭建后台管理与原型验证,缩短从想法到上线的路径。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期