¥
立即购买

Python SQLAlchemy查询生成器

12 浏览
1 试用
0 购买
Dec 10, 2025更新

本提示词专为Python开发者设计,能够根据具体的数据库操作需求生成准确、高效的SQLAlchemy查询语句。它采用技术文档写作风格,确保代码的精确性、清晰度和专业性,同时提供完整的代码解释和使用说明,帮助开发者理解查询逻辑并正确应用于实际项目中。适用于数据查询、数据操作、表结构管理等多种数据库操作场景。

查询需求分析

  • 数据源与表结构:orders 表(含 order_number、amount、order_date、status、user_id、tenant_id 等字段);users 表(含 id、username)。
  • 查询目标:检索最近 30 天已支付订单(status='paid' 且 order_date >= 当前时间 - 30 天),左联接 users 获取 username。
  • 选择字段:order_number、amount、order_date、username;使用字段白名单防止泄露非预期字段。
  • 过滤条件:可选 amount_min/amount_max 金额区间;支持 tenant_id 隔离。
  • 排序与分页:按 order_date 倒序(并用 id 作为次序保证稳定排序),支持 page/size。
  • 返回格式:分页结构(total、page、size、pages、has_next、has_prev、items)。
  • 安全要求:SQLAlchemy 2.0 风格、参数化查询、无原始拼接 SQL、错误处理与分页限流。

核心代码

from __future__ import annotations

from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Iterable, Optional, Dict, Any, List

from sqlalchemy import (
    create_engine,
    select,
    func,
    and_,
)
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    Session,
)
from sqlalchemy import String, Integer, DateTime, Numeric, ForeignKey
from sqlalchemy.exc import SQLAlchemyError


# ========== 声明式模型(最小必要字段) ==========
class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    username: Mapped[str] = mapped_column(String(255), nullable=False)


class Order(Base):
    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    order_number: Mapped[str] = mapped_column(String(64), nullable=False, index=True, unique=True)
    amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
    order_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
    status: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
    tenant_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)


# ========== 查询函数 ==========
MAX_PAGE_SIZE = 100  # 防止过大分页导致压力


def _serialize_value(val: Any) -> Any:
    """将值序列化为可 JSON 化的基本类型。"""
    if isinstance(val, datetime):
        # 统一输出 ISO 8601 字符串(含时区或假定 UTC)
        if val.tzinfo is None:
            return val.replace(tzinfo=timezone.utc).isoformat()
        return val.isoformat()
    if isinstance(val, Decimal):
        # 保留精度,转字符串
        return str(val)
    return val


def fetch_paid_orders(
    session: Session,
    *,
    page: int = 1,
    size: int = 20,
    amount_min: Optional[Decimal] = None,
    amount_max: Optional[Decimal] = None,
    tenant_id: Optional[int] = None,
    selected_fields: Optional[Iterable[str]] = None,
) -> Dict[str, Any]:
    """
    检索最近30天已支付订单,左联 users 取 username,支持参数化过滤与分页,返回分页结构。
    - 安全:字段白名单、参数化、限制分页大小。
    """

    # 1) 分页参数校正
    if page < 1:
        page = 1
    if size < 1:
        size = 1
    if size > MAX_PAGE_SIZE:
        size = MAX_PAGE_SIZE

    # 2) 时间边界(以 UTC 计算,避免各数据库 CURRENT_DATE 差异)
    now_utc = datetime.now(timezone.utc)
    cutoff_dt = now_utc - timedelta(days=30)

    # 3) 字段白名单
    allowed_fields = {
        "order_number": Order.order_number,
        "amount": Order.amount,
        "order_date": Order.order_date,
        "username": User.username,
    }
    default_fields = ["order_number", "amount", "order_date", "username"]
    if selected_fields is None:
        selected_fields = default_fields
    # 只保留白名单内字段
    safe_fields = [f for f in selected_fields if f in allowed_fields]
    if not safe_fields:
        raise ValueError("selected_fields 为空或不在白名单中")

    # 显式 label,使 Row 映射键名稳定
    columns = [allowed_fields[f].label(f) for f in safe_fields]

    # 4) 过滤条件
    filters = [
        Order.status == "paid",
        Order.order_date >= cutoff_dt,
    ]
    if tenant_id is not None:
        filters.append(Order.tenant_id == tenant_id)
    if amount_min is not None:
        filters.append(Order.amount >= amount_min)
    if amount_max is not None:
        filters.append(Order.amount <= amount_max)

    # 5) 主查询(左联 users,按下单时间倒序 + 次级 id 倒序保证稳定排序)
    stmt = (
        select(*columns)
        .select_from(Order)
        .join(User, Order.user_id == User.id, isouter=True)
        .where(and_(*filters))
        .order_by(Order.order_date.desc(), Order.id.desc())
        .offset((page - 1) * size)
        .limit(size)
    )

    # 6) 计数查询(对 orders 计数,避免 join 带来的干扰)
    count_stmt = select(func.count(Order.id)).where(and_(*filters))

    try:
        rows = session.execute(stmt).all()
        total = session.execute(count_stmt).scalar_one()
    except SQLAlchemyError as e:
        # 可根据需要记录日志或封装自定义异常
        raise RuntimeError(f"数据库查询失败: {e}") from e

    # 7) 序列化结果
    items: List[Dict[str, Any]] = []
    for row in rows:
        record = {}
        for key, val in row._mapping.items():
            record[key] = _serialize_value(val)
        items.append(record)

    pages = (total + size - 1) // size if size > 0 else 0
    result = {
        "total": total,
        "page": page,
        "size": size,
        "pages": pages,
        "has_next": page < pages,
        "has_prev": page > 1,
        "items": items,
    }
    return result


# ========== 连接与 Session(示例) ==========
def get_session(database_url: str) -> Session:
    """
    基于给定的 database_url 创建会话(请勿在代码仓库中硬编码敏感信息)。
    例: postgresql+psycopg://user:pass@host:5432/dbname
        mysql+aiomysql://  (若使用异步则需改造为 AsyncSession)
        sqlite:///./app.db
    """
    engine = create_engine(database_url, pool_pre_ping=True, future=True)
    return Session(engine)

代码解释

  • Base/User/Order:最小实体定义,包含本查询所需字段。为跨数据库兼容,使用 SQLAlchemy 2.0 声明式映射与类型。
  • MAX_PAGE_SIZE:限制最大分页,避免一次性拉取过多数据。
  • _serialize_value:将 datetime/Decimal 转为可 JSON 化的字符串,确保响应格式统一。
  • fetch_paid_orders:
    • 分页参数规范化,避免非法值;限制 size 最大值。
    • cutoff_dt 在应用层使用 UTC 时间计算最近 30 天,避免不同数据库 CURRENT_DATE/CURRENT_TIMESTAMP 差异导致的兼容问题。
    • 字段白名单 allowed_fields:仅允许输出 order_number、amount、order_date、username;对列使用 label 保证返回键名稳定。
    • 构建 filters:包含 status='paid'、order_date>=cutoff、可选 tenant_id 以及金额区间过滤。
    • 左联接 users 获取 username。
    • 排序使用 order_date desc 和 id desc 作为稳定次序。
    • 计数查询仅基于 orders 表,避免 join 对计数的影响与性能损耗。
    • 使用 session.execute(stmt) 参数化执行,无字符串拼接,防 SQL 注入。
    • items 序列化,返回分页结构:total/page/size/pages/has_next/has_prev/items。
  • get_session:演示如何基于外部传入的 database_url 创建 Session,避免泄露敏感信息。

使用示例

from decimal import Decimal

# 使用前请设置实际的数据库连接 URL(不要将真实凭据提交到仓库或日志)
DATABASE_URL = "postgresql+psycopg://user:password@host:5432/dbname"

def main():
    # 1) 准备会话
    with get_session(DATABASE_URL) as session:
        try:
            # 2) 调用查询函数
            result = fetch_paid_orders(
                session,
                page=1,
                size=20,
                amount_min=Decimal("50.00"),   # 可选:最小金额
                amount_max=Decimal("500.00"),  # 可选:最大金额
                tenant_id=42,                  # 可选:租户隔离
                # selected_fields=["order_number", "amount", "order_date", "username"],  # 可选,不传则默认
            )
            # 3) 处理结果
            print("total:", result["total"])
            for item in result["items"]:
                # item 形如:{"order_number": "...", "amount": "123.45", "order_date": "2025-11-10T12:34:56+00:00", "username": "..."}
                print(item)

        except RuntimeError as e:
            # 异常处理(可记录日志)
            print("查询失败:", e)

if __name__ == "__main__":
    main()

注意事项

  • 时间与时区:
    • 代码使用 UTC 计算最近 30 天,建议数据库层也存储为带时区的 UTC 时间,或在入库时统一转换。
  • 性能与索引建议:
    • 复合索引(根据过滤与排序最常用的组合):
      • orders(status, order_date) 覆盖常见过滤与排序。
      • 如存在强隔离场景,建议 orders(tenant_id, status, order_date) 作为更精确的复合索引。
      • orders(user_id) 支持联接与用户侧筛选扩展。
      • orders(order_number) 已加唯一/索引,便于单单号检索。
    • users(id) 通常为主键索引,已足够支持联接。
    • 若 amount 区间过滤使用频繁,可考虑在统计型场景中增加 orders(amount) 索引(但注意对写入性能的影响)。
  • 分页稳定性:
    • 使用 order_date desc, id desc 组合,防止 order_date 相同记录在翻页过程中顺序不稳定。
  • 字段白名单扩展:
    • 如需返回更多字段,请在 allowed_fields 中显式新增,并谨慎评估敏感信息泄漏风险。
  • 事务与连接池:
    • 读操作默认在一个短事务内完成。对于高并发场景,建议为不同服务设置合理的连接池大小,并开启 pool_pre_ping。
  • 数据库适配:
    • 代码为 SQLAlchemy 2.0+ 通用写法,适配 PostgreSQL/MySQL/SQLite 等主流数据库。
    • 金额字段使用 Numeric(12,2) 与 Decimal 类型,若数据库端为 DECIMAL,请确保精度一致。
  • 安全:
    • 全程使用 ORM/表达式构建器参数化查询,无字符串拼接,避免 SQL 注入。
    • 不要在代码/日志中记录数据库凭据、租户ID映射等敏感信息。
  • 扩展方向:
    • 如需要导出 CSV/Excel,可基于 result["items"] 直接序列化输出。
    • 如需异步支持,可将 Session/engine 替换为 AsyncSession/async_engine 并使用 await 执行查询。

查询需求分析

  • 目标:对 products 表按 sku 批量扣减库存(stock = stock - delta),仅更新 active = true 且 warehouse_id 匹配的记录。
  • 并发一致性:使用事务 + with_for_update 行级锁,避免并发扣减超卖。
  • 业务规则:若库存不足(stock < delta)则跳过该 sku,并返回跳过明细;支持可选 operator_id 写入 last_operator。
  • 输入:payload = [{sku, delta}, ...],warehouse_id,operator_id(可选)。
  • 输出:原始结果(Python 基本类型),包含受影响行数、成功清单、跳过清单,并演示一次性查询更新后的库存。
  • 兼容性:使用 SQLAlchemy 2.0 ORM 风格,避免弃用 API,参数绑定防注入。

核心代码

from __future__ import annotations

from typing import Any, Dict, Iterable, List, Optional, Tuple
from dataclasses import dataclass

from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session, sessionmaker
from sqlalchemy import String, Integer, Boolean


# ---------- ORM 映射 ----------
class Base(DeclarativeBase):
    pass


class Product(Base):
    __tablename__ = "products"

    id: Mapped[int] = mapped_column(primary_key=True)
    sku: Mapped[str] = mapped_column(String(64), index=True, nullable=False)
    warehouse_id: Mapped[int] = mapped_column(Integer, index=True, nullable=False)
    stock: Mapped[int] = mapped_column(Integer, nullable=False)
    active: Mapped[bool] = mapped_column(Boolean, index=True, nullable=False, default=True)
    # 可选的操作人追踪字段
    last_operator: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)


# ---------- 业务函数 ----------
@dataclass
class DecrementResult:
    affected_rows: int
    successes: List[Dict[str, Any]]
    skipped: List[Dict[str, Any]]


def _normalize_payload(payload: Iterable[Dict[str, Any]]) -> Dict[str, int]:
    """
    归并 payload(同一 sku 多次出现则累加 delta),并做基础校验。
    仅保留 delta > 0 的项。
    """
    sku_to_delta: Dict[str, int] = {}
    for i, item in enumerate(payload):
        if not isinstance(item, dict):
            raise ValueError(f"payload[{i}] 必须是字典")
        sku = item.get("sku")
        delta = item.get("delta")
        if not sku or not isinstance(sku, str):
            raise ValueError(f"payload[{i}].sku 必须是非空字符串")
        if not isinstance(delta, int):
            raise ValueError(f"payload[{i}].delta 必须是整数")
        if delta <= 0:
            # 非正扣减没有意义,跳过
            continue
        sku_to_delta[sku] = sku_to_delta.get(sku, 0) + delta
    return sku_to_delta


def decrement_stocks(
    session: Session,
    *,
    warehouse_id: int,
    payload: Iterable[Dict[str, Any]],
    operator_id: Optional[int] = None,
    lock_nowait: bool = False,
) -> DecrementResult:
    """
    批量扣减库存(行锁 + 事务环境中执行)。
    规则:
      - 仅 active = True 且 warehouse_id 匹配的记录参与。
      - 库存不足(stock < delta)跳过。
      - 支持可选 operator_id -> 写入 last_operator。
    并在事务内返回原始结果(纯 Python 类型)。

    要求:调用方应确保在 session.begin() 或外层事务内调用,以获得行锁效果。
    """
    sku_to_delta = _normalize_payload(payload)
    if not sku_to_delta:
        return DecrementResult(affected_rows=0, successes=[], skipped=[])

    skus = list(sku_to_delta.keys())

    # 1) 选中目标行并加行级锁
    stmt = (
        select(Product)
        .where(
            Product.active.is_(True),
            Product.warehouse_id == warehouse_id,
            Product.sku.in_(skus),
        )
        .with_for_update(nowait=lock_nowait)  # 行锁(SQLite 会忽略,见注意事项)
    )
    rows = session.execute(stmt).scalars().all()

    found_skus = set()
    successes: List[Dict[str, Any]] = []
    skipped: List[Dict[str, Any]] = []

    # 2) 逐行检查并更新(已加锁,避免并发超卖)
    for p in rows:
        found_skus.add(p.sku)
        delta = sku_to_delta.get(p.sku, 0)
        if delta <= 0:
            skipped.append(
                {"sku": p.sku, "delta": delta, "reason": "invalid_delta", "current_stock": p.stock}
            )
            continue

        if p.stock >= delta:
            p.stock = p.stock - delta
            if operator_id is not None:
                p.last_operator = operator_id
            successes.append({"sku": p.sku, "delta": delta, "new_stock": p.stock})
        else:
            skipped.append(
                {"sku": p.sku, "delta": delta, "reason": "insufficient_stock", "current_stock": p.stock}
            )

    # 3) 未找到或不满足 active/warehouse 条件的 SKU 视为跳过
    missing_skus = set(skus) - found_skus
    for sku in missing_skus:
        skipped.append(
            {"sku": sku, "delta": sku_to_delta[sku], "reason": "not_found_or_inactive_or_warehouse_mismatch"}
        )

    # 4) 刷新到数据库(仍在事务内,可继续查询最新库存)
    session.flush()

    return DecrementResult(
        affected_rows=len(successes),
        successes=successes,
        skipped=skipped,
    )


def get_stocks(
    session: Session,
    *,
    warehouse_id: int,
    skus: Iterable[str],
) -> List[Dict[str, Any]]:
    """
    一次性查询给定 sku 的当前库存(原始结果)。
    """
    skus = list(set(skus))
    if not skus:
        return []
    stmt = select(Product.sku, Product.stock).where(
        Product.warehouse_id == warehouse_id,
        Product.sku.in_(skus),
    )
    rows = session.execute(stmt).all()
    return [{"sku": sku, "stock": stock} for (sku, stock) in rows]


# ---------- Session 工厂(示例) ----------
# 请替换为你的数据库 URL(勿在代码中硬编码敏感信息;可用环境变量加载)
# engine = create_engine(os.getenv("DATABASE_URL"), pool_pre_ping=True, future=True)
# 这里仅作占位演示,不要在生产中直接使用 sqlite+aiosqlite 或内存数据库
# engine = create_engine("postgresql+psycopg://user:pass@host:5432/dbname", pool_pre_ping=True)

代码解释

  • Product 映射:最小化定义所需字段 sku、warehouse_id、stock、active、last_operator,用于库存扣减与可选操作者记录。
  • _normalize_payload:合并 payload 中相同 sku 的 delta,过滤 delta <= 0,保证输入有效性并减少重复处理。
  • decrement_stocks:
    • 使用 select(...).with_for_update() 对目标行加锁,保证并发下库存检查与更新的原子性。
    • 对每条锁定的产品行,验证 stock >= delta 再扣减;否则加入 skipped,并记录当前库存。
    • 对数据库中未匹配(可能因 inactive 或仓库不符)的 SKU,标记为跳过。
    • session.flush() 确保更新已发往数据库,但仍处于同一事务中,后续可继续查询最新库存。
    • 返回原始的 Python 数据结构(受影响行数、成功与跳过清单)。
  • get_stocks:一次性查询指定 sku 在该仓库的最新库存,返回原始结果列表。
  • with_for_update(nowait=...):提供可选 nowait 行为;若为 True 且行被其它事务锁定,将抛出数据库异常,调用方可捕获以快速失败。

使用示例

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 1) 创建 Engine / Session
# 替换为你的数据库连接串(请从环境变量读取,避免泄漏敏感信息)
engine = create_engine("postgresql+psycopg://user:pass@host:5432/dbname", pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)

# 首次建表(如果你还没有表结构,生产中请用迁移工具 Alembic 管理)
# Base.metadata.create_all(engine)

payload = [
    {"sku": "SKU-001", "delta": 3},
    {"sku": "SKU-002", "delta": 5},
    {"sku": "SKU-001", "delta": 2},  # 同 SKU 会被合并为 delta=5
    {"sku": "SKU-NotExist", "delta": 1},
]
warehouse_id = 10
operator_id = 9527

# 2) 正常执行并提交
with SessionLocal.begin() as session:
    # 扣减库存(行锁生效)
    result = decrement_stocks(
        session,
        warehouse_id=warehouse_id,
        payload=payload,
        operator_id=operator_id,
        lock_nowait=False,  # 可改 True 以在遇到锁冲突时立即失败
    )
    print("decrement result:", {
        "affected_rows": result.affected_rows,
        "successes": result.successes,
        "skipped": result.skipped
    })

    # 一次性查询更新后的库存(仍在同一事务内,读取最新值)
    updated_skus = list({item["sku"] for item in payload})
    current_stocks = get_stocks(session, warehouse_id=warehouse_id, skus=updated_skus)
    print("current stocks:", current_stocks)

# 3) 回滚示例(演示在出现业务错误时如何回滚)
try:
    with SessionLocal.begin() as session:
        # 故意传入非法 delta,触发 ValueError
        bad_payload = [{"sku": "SKU-001", "delta": -100}]
        _ = decrement_stocks(
            session,
            warehouse_id=warehouse_id,
            payload=bad_payload,
            operator_id=operator_id,
        )
        # 任何异常抛出都会导致该 with 块自动回滚
except Exception as e:
    print("rolled back due to:", repr(e))

# 4) 事务外的读取(验证提交结果)
with SessionLocal() as session:
    current_stocks = get_stocks(session, warehouse_id=warehouse_id, skus=["SKU-001", "SKU-002", "SKU-NotExist"])
    print("stocks after commit:", current_stocks)

注意事项

  • 事务与行锁:
    • with_for_update 需要在事务中使用;示例通过 with SessionLocal.begin() 确保开启事务并在块结束时自动提交或回滚。
    • 并发环境中,如果不希望等待锁,可以传 lock_nowait=True 并捕获数据库异常(如 Postgres 的 LockNotAvailable)。
  • SQLite 差异:
    • SQLite 会忽略 FOR UPDATE 子句,锁语义不同(更偏向整表/数据库级),不适合高并发扣减场景。生产建议使用支持行级锁的数据库(PostgreSQL、MySQL/InnoDB)。
  • 性能建议:
    • 大批量 SKU(如 > 1k)时 IN 子句可能较长,可分批处理或使用临时表/VALUES 表配合单条 UPDATE+RETURNING(若使用 PostgreSQL 可进一步优化)。当前实现重在跨库兼容和可读性。
  • 数据校验:
    • 代码已过滤 delta <= 0 项。根据需要可增加对 sku 字符串长度、字符集等校验。
  • 返回结果:
    • affected_rows 为成功扣减的记录条数,与 successes 长度一致。
    • skipped 会包含库存不足、未找到/不满足条件和非法 delta 的条目,并尽可能附带 current_stock。
  • 安全:
    • 全程仅使用 SQLAlchemy 参数绑定与 ORM 更新,无原始字符串拼接,避免 SQL 注入风险。
  • 审计字段:
    • 如需记录更新时间,可在模型中添加 updated_at = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())。
  • 隔离级别:
    • 默认 READ COMMITTED 对大多数场景足够;若有更严格一致性需求,可在 Engine 层调整隔离级别,但要注意死锁与吞吐的平衡。

查询需求分析

  • 要求:统计近7日活跃用户,按“注册来源”分组,返回每组的总人数、活跃人数与活跃率,并按人数降序取前20。
  • 活跃判定:last_login >= 当前时间-7天,或近7日 user_sessions 计数 >= 2。
  • 过滤条件:仅统计 status='enabled';可选过滤 city 与 app_version。
  • 表结构假设:
    • users(id, status, signup_source, city, app_version, last_login)
    • user_sessions(id, user_id, started_at)
    • 关联条件:users.id == user_sessions.user_id
  • 需要两种实现:子查询方案与CTE方案。
  • 返回格式:字典列表[{source, total_users, active_users, active_rate}, ...]。

若你的列名与上述不同(如注册来源列为 register_source,或会话时间列为 created_at),请在代码中相应替换 Users.signup_source、UserSessions.started_at 等引用。


核心代码

from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Optional

from sqlalchemy import (
    create_engine,
    select,
    func,
    case,
    cast,
    Float,
    and_,
    or_,
    ForeignKey,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from sqlalchemy.types import String, Integer, DateTime


# ----------------------------
# ORM 模型定义(按需调整列名)
# ----------------------------
class Base(DeclarativeBase):
    pass


class Users(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    status: Mapped[str] = mapped_column(String(32), index=True)
    signup_source: Mapped[Optional[str]] = mapped_column(String(64), index=True)  # 注册来源
    city: Mapped[Optional[str]] = mapped_column(String(64), index=True)
    app_version: Mapped[Optional[str]] = mapped_column(String(32), index=True)
    last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), index=True)


class UserSessions(Base):
    __tablename__ = "user_sessions"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
    started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True)  # 会话开始时间


# ----------------------------
# 公共工具
# ----------------------------
def _since_7d(now_utc: Optional[datetime] = None) -> datetime:
    """计算近7日时间窗口的起点(UTC)。"""
    now = now_utc or datetime.now(timezone.utc)
    return now - timedelta(days=7)


def _filters(enabled_only=True,
             city: Optional[str] = None,
             app_version: Optional[str] = None):
    """构造 users 表的过滤条件列表。"""
    conds = []
    if enabled_only:
        conds.append(Users.status == "enabled")
    if city is not None:
        conds.append(Users.city == city)
    if app_version is not None:
        conds.append(Users.app_version == app_version)
    return conds


def _rows_to_dicts(rows: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """将 RowMapping 转换为普通字典列表。"""
    return [dict(r) for r in rows]


# ----------------------------
# 方案一:聚合 + 子查询(推荐,一次 join,去重简单)
# ----------------------------
def active_by_source_subquery(
    session: Session,
    city: Optional[str] = None,
    app_version: Optional[str] = None,
    limit: int = 20,
    now_utc: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
    """
    统计近7日活跃用户(按注册来源分组),子查询方案。
    返回:[{source, total_users, active_users, active_rate}, ...]
    """
    since_ts = _since_7d(now_utc)

    # 近7日每个用户的会话计数(子查询),避免多行重复
    sess_7d_sq = (
        select(
            UserSessions.user_id.label("user_id"),
            func.count().label("sess_7d"),
        )
        .where(UserSessions.started_at >= since_ts)
        .group_by(UserSessions.user_id)
        .subquery("sess_7d")
    )

    # 基础用户过滤
    base_user_cond = and_(*_filters(enabled_only=True, city=city, app_version=app_version))

    # 注册来源为空时归为 'unknown'
    source_expr = func.coalesce(Users.signup_source, "unknown").label("source")

    # 定义活跃条件:last_login>=since_ts 或 sess_7d>=2
    # 注意:left join 后 sess_7d 可能为 NULL,需要处理为 false
    active_cond = or_(
        Users.last_login >= since_ts,
        sess_7d_sq.c.sess_7d >= 2,
    )

    # 统计:总人数、活跃人数(均按 distinct users.id 去重)
    # 使用 count(distinct case when ...) 保证去重统计活跃用户
    active_id_expr = case((active_cond, Users.id))
    total_users_expr = func.count(func.distinct(Users.id)).label("total_users")
    active_users_expr = func.count(func.distinct(active_id_expr)).label("active_users")
    active_rate_expr = (
        cast(func.count(func.distinct(active_id_expr)), Float)
        / func.nullif(func.count(func.distinct(Users.id)), 0)
    ).label("active_rate")

    stmt = (
        select(
            source_expr,
            total_users_expr,
            active_users_expr,
            active_rate_expr,
        )
        .select_from(Users)
        .outerjoin(sess_7d_sq, Users.id == sess_7d_sq.c.user_id)
        .where(base_user_cond)
        .group_by(source_expr)
        # 为了跨数据库兼容,order_by 使用完整表达式而非别名
        .order_by(func.count(func.distinct(Users.id)).desc())
        .limit(limit)
    )

    result = session.execute(stmt).mappings().all()
    return _rows_to_dicts(result)


# ----------------------------
# 方案二:CTE(可读性更好,便于扩展)
# ----------------------------
def active_by_source_cte(
    session: Session,
    city: Optional[str] = None,
    app_version: Optional[str] = None,
    limit: int = 20,
    now_utc: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
    """
    统计近7日活跃用户(按注册来源分组),CTE 方案。
    返回:[{source, total_users, active_users, active_rate}, ...]
    """
    since_ts = _since_7d(now_utc)

    base_users_cte = (
        select(
            Users.id.label("user_id"),
            func.coalesce(Users.signup_source, "unknown").label("source"),
            Users.last_login.label("last_login"),
        )
        .where(and_(*_filters(enabled_only=True, city=city, app_version=app_version)))
        .cte("base_users")
    )

    sess_7d_cte = (
        select(
            UserSessions.user_id.label("user_id"),
            func.count().label hoop("sess

示例详情

解决的问题

把零散的数据库需求,快速变成可直接落地的 SQLAlchemy 查询方案。用户只需用自然语言描述要做什么、想要的返回形式、目标表和条件,即可得到:可粘贴即用的高质量代码、逐步解释与注意事项、以及兼顾性能与安全的实现建议。通过标准化输出、清晰注释和错误防护,帮助个人开发者提升交付速度,帮助团队统一风格、降低线上风险,覆盖查询、插入、更新、删除与常见场景的组合操作。支持多类型项目(Web、数据处理、内部工具),既满足“马上能用”的效率诉求,又兼顾“长期可维护”的工程品质。立即试用,付费升级可解锁复杂关联、多表联查、批量操作、分页、异步用法与表结构管理等高级能力。

适用用户

Python后端工程师

快速生成复杂联表查询与分页排序,现成事务封装可直接复用,减少手写SQL与审查时间,加速功能迭代稳定上线。

数据分析工程师

便捷编写聚合、分组与窗口统计,按需导出查询结果,沉淀可复用脚本,为报表与看板提供一致且可追溯的数据口径。

全栈/独立开发者

用少量需求描述即可产出可用数据访问层,统一风格与异常处理,快速搭建后台管理与原型验证,缩短从想法到上线的路径。

特征总结

一键生成SQLAlchemy查询、插入、更新、删除代码,贴合业务语境,开箱可用,直接复制到项目运行。
自动补全关联关系与筛选条件,复杂多表联结也能清晰表达,避免手写SQL带来的隐性风险。
根据输入需求生成完整注释与使用说明,示例调用一步到位,帮助团队快速上手与统一风格。
内置性能与安全检查,自动规避慢查询与注入风险,给出合理索引与筛选建议,守护线上稳定。
兼容常见数据库差异,一次编写多处适配,减少环境切换带来的改动与回归成本。
支持ORM与Core两种风格自由切换,按团队习惯输出代码体例,易读、易测、易维护。
可参数化控制返回结构、排序分页与空值处理,满足报表、服务联调与后台管理等多样场景。
提供事务与错误处理模板,一键套用到增删改查流程,降低异常恢复与数据回滚负担。
向导式问答引导需求澄清,自动生成最优查询方案,减少沟通成本,避免需求理解偏差。
输出结构化文档与代码并行,方便提交评审与保存知识库,提升团队协作与复用效率。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 665 tokens
- 4 个可调节参数
{ 数据库操作描述 } { 查询返回格式 } { 目标数据表 } { 查询条件说明 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59