热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
专业分析Python代码片段,识别性能瓶颈并提供具体的优化建议,包括效率提升、运行加速和资源消耗减少。优化方案保持功能不变,旨在生成更高效、更专业的代码,适用于开发人员、代码审查和性能调优场景。
以下是针对给定代码的性能问题定位与可行优化建议,并给出等价但更高效的实现。优化重点围绕减少数据库 I/O 与连接开销、去除不必要的阻塞与对象创建、复用计算和缓存结果,从而提升执行速度、降低 I/O、提升缓存效率并减少 GC 压力。
主要问题与优化要点
N+1 查询与连接风暴
正则重复编译与多次匹配
Python 级循环与中间对象过多
无意义的阻塞等待
可选的轻量级缓存
SQLite 访问细节(可选)
优化后代码(保持返回格式不变;仅使用标准库与轻量改动) from flask import Flask, request, jsonify import sqlite3, re from collections import OrderedDict
app = Flask(name) SKU_PATTERN = r'[A-Z0-9]{3,8}' SKU_RE = re.compile(SKU_PATTERN)
_PRICE_CACHE = OrderedDict() _PRICE_CACHE_MAX = 10000 # 可按内存与命中率调整
def get_db(): # 保持对其他代码兼容的接口 conn = sqlite3.connect('orders.db') # 行为保持一致(可以通过索引或键访问),不强依赖 row_factory 时可去掉以减少一点开销 conn.row_factory = sqlite3.Row # 只读查询为主时,一些较安全的优化(可按实际环境决定是否保留) # conn.execute('PRAGMA synchronous = NORMAL') # conn.execute('PRAGMA temp_store = MEMORY') # conn.execute('PRAGMA optimize') return conn
def _evict_cache_if_needed(): # 简单的容量控制 while len(_PRICE_CACHE) > _PRICE_CACHE_MAX: _PRICE_CACHE.popitem(last=False)
def _fetch_prices_for_skus(conn, missing_skus): # 对缓存缺失的 SKU 进行分批查询,避免超过 SQLite 参数上限(通常 999) if not missing_skus: return cur = conn.cursor() skus = list(missing_skus) chunk_size = 900 for i in range(0, len(skus), chunk_size): chunk = skus[i:i + chunk_size] placeholders = ','.join('?' * len(chunk)) sql = f'SELECT sku, price FROM sku_price WHERE sku IN ({placeholders})' cur.execute(sql, chunk) for row in cur.fetchall(): # row 可通过索引或键访问(Row 支持两种方式) sku = row[0] price = row[1] # 更新缓存并维持 LRU 语义 if sku in _PRICE_CACHE: _PRICE_CACHE.move_to_end(sku) _PRICE_CACHE[sku] = price _evict_cache_if_needed()
@app.get('/orders/summary') def orders_summary(): payload = request.get_json(force=True) or {} orders = payload.get('orders', [])
# 预先抽取每个订单的 SKU 列表,同时汇总全部需要查询的 SKU
# 使用预编译正则的 bound method,减少属性查找
re_match = SKU_RE.match # 保持与原逻辑一致的 match 语义
per_order_skus = []
all_skus = []
for order in orders:
items = order.get('items', [])
skus = [item['sku'] for item in items
if isinstance(item, dict)
and isinstance(item.get('sku'), str)
and re_match(item['sku'])]
per_order_skus.append((order.get('id'), skus))
all_skus.extend(skus)
# 去重后批量查询(请求级别仅一次连接)
unique_skus = set(all_skus)
prices_map = {}
if unique_skus:
conn = get_db()
try:
# 命中缓存的优先从缓存获取
cached = {sku: _PRICE_CACHE[sku] for sku in unique_skus if sku in _PRICE_CACHE}
prices_map.update(cached)
# 对缓存缺失进行分批查询并回填缓存
missing = unique_skus - cached.keys()
if missing:
_fetch_prices_for_skus(conn, missing)
# 将缺失部分补齐到本次请求的映射中
for sku in missing:
if sku in _PRICE_CACHE:
prices_map[sku] = _PRICE_CACHE[sku]
finally:
conn.close()
# 按订单计算小计与总计(内建 sum 在 C 层,减少 Python 循环与对象创建)
details = []
total = 0.0
for order_id, skus in per_order_skus:
order_total = sum(prices_map.get(s, 0.0) for s in skus)
total += order_total
details.append({'order_id': order_id, 'total': order_total})
# 直接返回,无需二次 JSON 序列化
return jsonify({'total': total, 'details': details})
if name == 'main': app.run()
这些改动如何提升性能(对应优化目标)
预期效果与注意事项
可选进一步优化(按需采纳)
下面给出针对代码的性能瓶颈的具体优化建议,以及保持功能等价的高效实现。优化同时面向内存、CPU 与 I/O。
主要优化点与原因
在保持相同最终输出的前提下,“is_api”列在原代码里没有参与统计与输出,因此默认不再构建它;若强制需要保留此副产物,可在分块内部用 .str.startswith('/api/') 快速计算并丢弃,不影响最终结果。
优化后代码 说明:保持同样的输入/输出行为,打印的 JSON 前 1000 字符一致;统计逻辑保持与原代码相同(slow_ratio = (slow + very_slow) / count)。
import os import json import pandas as pd import numpy as np
def analyze_logs(file_paths, chunksize=1_000_000): # 仅读取必要列,减少 I/O 与内存 usecols = ['user_id', 'timestamp', 'latency_ms', 'path'] dtype = { 'user_id': 'int64', # 若 user_id 可能缺失,可改为 'Int64' 'latency_ms': 'int32', # 典型范围可容纳,节省内存 'path': 'object' }
# 累加器:MultiIndex(date, user_id) -> count, slow
acc = None
for path in file_paths:
# 流式分块读取,避免构建巨型 DataFrame
for chunk in pd.read_csv(
path,
usecols=usecols,
dtype=dtype,
chunksize=chunksize,
memory_map=True
):
# 矢量化日期解析(缓存加速重复值),按天归一
ts = pd.to_datetime(chunk['timestamp'], errors='coerce', cache=True)
date = ts.dt.floor('D') # datetime64[ns],更省内存
# 若必须保留与原代码同等副产物,可启用下一行(默认关闭以提升性能)
# chunk['is_api'] = chunk['path'].astype(str).str.startswith('/api/')
# 直接将慢请求定义为 latency_ms >= 300(包含 'slow' 与 'very_slow' 桶)
is_slow = (chunk['latency_ms'].values >= 300)
# 在分块内先矢量化聚合,减少中间数据体积
g = pd.DataFrame({
'date': date.values,
'user_id': chunk['user_id'].values,
'count': np.ones(len(chunk), dtype=np.int64),
'slow': is_slow.astype(np.int64)
}).groupby(['date', 'user_id'], sort=False).sum()
# 累加到总计(避免 Python 字典级循环,使用向量化加法)
if acc is None:
acc = g
else:
acc = acc.add(g, fill_value=0)
# add 可能生成 float,强制回到整数
acc = acc.astype({'count': 'int64', 'slow': 'int64'})
if acc is None:
print('[]')
return
# 按 (date, user_id) 排序并计算 slow_ratio
acc = acc.sort_index(level=['date', 'user_id'])
out = acc.reset_index()
out['date'] = out['date'].dt.strftime('%Y-%m-%d')
# 保持与原逻辑一致的保护(count 至少为 1)
out['slow_ratio'] = (out['slow'] / out['count'].clip(lower=1)).astype(float)
# 生成与原代码等价的输出结构
result = out[['user_id', 'date', 'count', 'slow_ratio']].to_dict(orient='records')
print(json.dumps(result)[:1000])
if name == 'main': files = [os.path.join('logs', name) for name in os.listdir('logs') if name.endswith('.csv')] # 可按机器内存与列宽调整分块大小,一般 2e5~1e6 行较稳妥 analyze_logs(files, chunksize=500_000)
效果与收益预估
可选微调建议
下面给出问题定位、优化方向与改动原因,随后附上在“最小化改动”前提下的优化版代码。该版本保持输入输出结构与数值语义一致(softmax 结果仅因浮点误差可能有极微小差异),但显著减少 I/O、降低 Python 层开销、提升并发与吞吐。
一、主要性能瓶颈与优化点
二、优化改动细节与收益
三、优化后代码(最小化改动,功能等价) 说明:
代码: import re import torch import numpy as np # 仅为兼容保留导入,不在热路径使用 import time # 仅为兼容保留导入
class Predictor: TOKEN_RE = re.compile(r'[A-Za-z0-9]+')
def __init__(self, model_path):
self.model_path = model_path
self._model = None
self._vocab = None
self._vocab_map = None
self._weights = None
# 充分利用 2 核 CPU
try:
torch.set_num_threads(2)
except Exception:
pass
# 推理默认无需梯度
torch.set_grad_enabled(False)
def _ensure_loaded(self):
if self._model is None:
m = torch.load(self.model_path, map_location='cpu')
self._model = m
self._vocab = m.get('vocab', [])
# 词表映射用于 O(1) 查找
self._vocab_map = {w: i for i, w in enumerate(self._vocab)}
# 常驻内存的权重张量,float32,CPU
self._weights = torch.tensor(m['weights'], dtype=torch.float32, device='cpu')
def _load(self):
# 保留原方法名,改为缓存式加载,避免重复 I/O
self._ensure_loaded()
return self._model
def tokenize(self, text):
# 与原逻辑等价:按非字母数字切分,转小写
return self.TOKEN_RE.findall(text.lower())
def featurize(self, tokens, vocab):
# 兼容原签名,但内部使用缓存好的映射以避免 O(N^2)
# 若传入的 vocab 与缓存不同(理论上不会发生),则就地构造一次映射
if self._vocab is not None and vocab is self._vocab:
vocab_map = self._vocab_map
V = len(self._vocab)
else:
vocab_map = {w: i for i, w in enumerate(vocab)}
V = len(vocab)
idxs = [vocab_map[t] for t in tokens if t in vocab_map]
vec = torch.zeros(V, dtype=torch.float32)
if idxs:
idx_tensor = torch.tensor(idxs, dtype=torch.int64)
vec.index_add_(0, idx_tensor, torch.ones(len(idxs), dtype=torch.float32))
return vec.unsqueeze(0)
def _featurize_batch(self, batch_tokens):
# 将一批 tokens 转为 B×V 稀疏累加到稠密 BOW
V = len(self._vocab)
B = len(batch_tokens)
x = torch.zeros(B, V, dtype=torch.float32)
for i, tokens in enumerate(batch_tokens):
idxs = [self._vocab_map[t] for t in tokens if t in self._vocab_map]
if not idxs:
continue
idx_tensor = torch.tensor(idxs, dtype=torch.int64)
x[i].index_add_(0, idx_tensor, torch.ones(len(idxs), dtype=torch.float32))
return x
def predict(self, texts):
# 一次性加载与缓存模型
model = self._load()
# 批量分词
batch_tokens = [self.tokenize(text) for text in texts]
# 批量特征化
x = self._featurize_batch(batch_tokens)
# 矢量化矩阵乘与 softmax(数学等价于原 exp/sum)
with torch.inference_mode():
logits = x.matmul(self._weights.t()) # 形状: [B, C]
exp_logits = torch.exp(logits)
probs = exp_logits / exp_logits.sum(dim=1, keepdim=True)
probs_list = probs.tolist()
outputs = [{'text': text, 'probs': p} for text, p in zip(texts, probs_list)]
return outputs
if name == 'main': p = Predictor('model.pt') texts = ['Quick brown fox jumps over the lazy dog'] * 128 print(p.predict(texts)[0])
四、与优化目标的对照
五、可选进一步优化(保持行为不变)
这版代码在你的约束条件下通常能将 128 条文本批推理的端到端延迟显著下降(I/O 从每条一次降为一次,softmax/特征构建矢量化,CPU 2 核并行),QPS 也会显著提升。
分析用户提供的Python代码片段,帮助用户发现潜在的性能瓶颈并提出切实可行的优化建议,以实现代码效率提升,同时维持原有功能的正确性,最终推动用户在开发过程中提升代码质量与性能。
需要为已有Python项目进行性能优化,以提升代码运行效率、减少计算资源浪费。
在大规模数据计算中使用Python,可以通过性能优化加快数据处理与分析速度。
帮助编程初学者学习和理解如何编写高效代码,提供优化建议作为教学案例。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期