热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
智能解析代码问题,一键优化性能与可读性,让程序跑得更快、维护更轻松!
import json
import logging
import re
import time
import tracemalloc
from collections import defaultdict
from typing import Dict, Iterable, Iterator, List, Tuple, Optional
import io
import os
import tempfile
import random
import unittest
# -------------------------
# 配置与常量
# -------------------------
logger = logging.getLogger(__name__)
# 建议在主程序里调用 basicConfig,保持库代码可复用性
DEFAULT_LEVEL = "INFO"
USER_ID_RE = re.compile(r"user_id=(\d+)") # 预编译正则
# 类型别名
Level = str
UserId = int
Key = Tuple[Level, UserId]
Count = int
AggResult = List[Tuple[Key, Count]]
# -------------------------
# 核心功能
# -------------------------
def iter_events(lines: Iterable[str]) -> Iterator[Tuple[Level, UserId]]:
"""
从可迭代的文本行中解析事件,按需产出 (level, user_id) 元组。
- 空行跳过
- JSON 解析失败 -> 记录警告并跳过
- 缺失 level -> 使用 "INFO"
- 缺失/非字符串 message -> 视为无 user_id,uid = -1
"""
loads = json.loads # 本地绑定减少属性查找
search = USER_ID_RE.search
for lineno, raw in enumerate(lines, start=1):
line = raw.strip()
if not line:
continue
try:
data = loads(line)
except json.JSONDecodeError as e:
logger.warning("Skipping invalid JSON at line %d: %s", lineno, e.msg)
continue
lvl = data.get("level", DEFAULT_LEVEL)
# message 可能缺失或非字符串
msg = data.get("message", "")
if not isinstance(msg, str):
msg = str(msg) if msg is not None else ""
m = search(msg)
uid: UserId = int(m.group(1)) if m else -1
# 保持与原意一致,仅用于聚合,不保留 timestamp 等无关字段
yield (str(lvl), uid)
def load_events(path: str) -> Iterator[Tuple[Level, UserId]]:
"""
流式从文件读取事件,返回生成器。
文件编码按 UTF-8 读取,遇到解码错误将替换错误字符并继续,以提高稳健性。
发生 OSError(如文件不存在)时记录并抛出异常。
"""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
for item in iter_events(f):
yield item
except OSError:
logger.exception("Failed to open or read file: %s", path)
raise
def aggregate(events: Iterable[Tuple[Level, UserId]]) -> AggResult:
"""
对 (level, user_id) 进行计数聚合,并按键排序返回。
复杂度:
- 计数:O(n)
- 排序:O(k log k),k 为唯一键数量
"""
counts: Dict[Key, int] = defaultdict(int)
for lvl, uid in events:
counts[(lvl, uid)] += 1
# 使用内置排序(Timsort),按键 (level, uid) 排序
return sorted(counts.items(), key=lambda item: item[0])
def main() -> None:
"""
主流程:读取 logs.jsonl -> 聚合 -> 输出
输出格式严格保持:level user_id count
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s"
)
events = load_events("logs.jsonl")
result = aggregate(events)
for (lvl, uid), c in result:
print(lvl, uid, c)
# -------------------------
# 基准测试(默认不在主流程调用)
# -------------------------
def _generate_synthetic_logs(path: str, n: int = 1_000_000, unique_users: int = 50_000) -> None:
"""
生成测试数据:JSON Lines 格式。
- level 分布在 {INFO, WARN, ERROR, DEBUG}
- user_id 均为非负整数
"""
levels = ("INFO", "WARN", "ERROR", "DEBUG")
# 使用简单的线性同余生成器避免随机模块的全局锁开销
seed = 123456789
a, c, m = 1103515245, 12345, 2 ** 31
def fast_rand() -> int:
nonlocal seed
seed = (a * seed + c) % m
return seed
with open(path, "w", encoding="utf-8") as f:
for i in range(n):
uid = fast_rand() % unique_users
lvl = levels[i % len(levels)]
msg = f"processed request ok user_id={uid}"
obj = {"timestamp": 1700000000 + i, "level": lvl, "message": msg}
# 手写 JSON以减少 dumps 开销(更快)
f.write(
'{"timestamp":%d,"level":"%s","message":"%s"}\n'
% (obj["timestamp"], obj["level"], obj["message"])
)
def run_benchmark(n: int = 1_000_000, unique_users: int = 50_000) -> None:
"""
构造 n 行日志并测量:
- 端到端时间:加载+聚合+排序
- 内存峰值(tracemalloc)
"""
logging.basicConfig(level=logging.INFO, format="%(message)s")
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "bench.jsonl")
logger.info("Generating synthetic logs: %d lines ...", n)
t0 = time.perf_counter()
_generate_synthetic_logs(path, n=n, unique_users=unique_users)
t1 = time.perf_counter()
logger.info("Data generation time: %.2fs", t1 - t0)
logger.info("Running pipeline ...")
tracemalloc.start()
t2 = time.perf_counter()
result = aggregate(load_events(path))
t3 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# 输出结果统计(不打印明细)
logger.info("Unique keys: %d", len(result))
logger.info("Pipeline time (n=%d): %.2fs", n, t3 - t2)
logger.info("Peak memory: %.2f MB", peak / (1024 * 1024))
# -------------------------
# 单元测试(关键路径)
# -------------------------
class TestLogAggregation(unittest.TestCase):
def test_basic_aggregation_and_sort(self):
data = """\
{"timestamp":1,"level":"INFO","message":"ok user_id=42"}
{"timestamp":2,"level":"ERROR","message":"fail user_id=42"}
{"timestamp":3,"level":"INFO","message":"ok user_id=7"}
{"timestamp":4,"level":"INFO","message":"ok user_id=42"}
"""
events = list(iter_events(io.StringIO(data)))
self.assertEqual(events, [("INFO", 42), ("ERROR", 42), ("INFO", 7), ("INFO", 42)])
result = aggregate(events)
# 按 (level, uid) 排序:ERROR 先于 INFO;uid 递增
expected = [
(("ERROR", 42), 1),
(("INFO", 7), 1),
(("INFO", 42), 2),
]
self.assertEqual(result, expected)
def test_invalid_json_and_empty_lines(self):
data = """\n
{"timestamp":1,"level":"INFO","message":"ok user_id=1"}
{invalid json
{"timestamp":2,"level":"WARN","message":"missing uid"}
{"timestamp":3,"level":"WARN","message":"uid user_id=1"}
"""
events = list(iter_events(io.StringIO(data)))
# 第二行无效 JSON 跳过;无 uid -> -1
self.assertEqual(events, [("INFO", 1), ("WARN", -1), ("WARN", 1)])
result = aggregate(events)
self.assertEqual(
result,
[(("INFO", 1), 1), (("WARN", -1), 1), (("WARN", 1), 1)]
)
def test_non_string_message(self):
data = """\
{"timestamp":1,"level":"INFO","message":null}
{"timestamp":2,"level":"INFO","message":123}
"""
events = list(iter_events(io.StringIO(data)))
# 非字符串 message 视为无 uid
self.assertEqual(events, [("INFO", -1), ("INFO", -1)])
result = aggregate(events)
self.assertEqual(result, [(("INFO", -1), 2)])
def test_default_level(self):
data = """\
{"timestamp":1,"message":"ok user_id=10"}
"""
events = list(iter_events(io.StringIO(data)))
self.assertEqual(events, [("INFO", 10)])
if __name__ == "__main__":
# 主流程(严格保持输出格式:level user_id count)
main()
# 如需运行基准或测试,可手动调用:
# run_benchmark(1_000_000, 50_000)
# unittest.main()
问题分析
优化思路
优化后代码 以下提供完整可执行实现(Go 1.20+),包含主代码与测试/基准。
文件: main.go
package main
import (
"container/list"
"context"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"hash/fnv"
"io"
"log"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/singleflight"
)
// ========================= LRU with TTL =========================
type lruEntry struct {
key string
value string
expireAt time.Time
elem *list.Element
}
// lruCache is a threadsafe LRU with TTL (entry count bounded).
type lruCache struct {
mu sync.RWMutex
ll *list.List
items map[string]*lruEntry
maxEntries int
ttl time.Duration
}
func newLRU(maxEntries int, ttl time.Duration) *lruCache {
if maxEntries <= 0 {
maxEntries = 1024
}
if ttl <= 0 {
ttl = 10 * time.Minute
}
return &lruCache{
ll: list.New(),
items: make(map[string]*lruEntry, maxEntries),
maxEntries: maxEntries,
ttl: ttl,
}
}
func (c *lruCache) Get(key string) (string, bool) {
now := time.Now()
c.mu.Lock()
defer c.mu.Unlock()
if e, ok := c.items[key]; ok {
// TTL check
if now.After(e.expireAt) {
c.removeElement(e)
return "", false
}
// move to front on hit
c.ll.MoveToFront(e.elem)
return e.value, true
}
return "", false
}
func (c *lruCache) Add(key, value string) {
now := time.Now()
c.mu.Lock()
defer c.mu.Unlock()
// Update if exists
if e, ok := c.items[key]; ok {
e.value = value
e.expireAt = now.Add(c.ttl)
c.ll.MoveToFront(e.elem)
return
}
// Insert new
elem := c.ll.PushFront(key)
ent := &lruEntry{
key: key,
value: value,
expireAt: now.Add(c.ttl),
elem: elem,
}
c.items[key] = ent
// Evict if over capacity
if c.ll.Len() > c.maxEntries {
c.removeOldestLocked()
}
}
func (c *lruCache) removeOldestLocked() {
if back := c.ll.Back(); back != nil {
key := back.Value.(string)
if e, ok := c.items[key]; ok {
c.removeElement(e)
}
}
}
func (c *lruCache) removeElement(e *lruEntry) {
c.ll.Remove(e.elem)
delete(c.items, e.key)
}
// ========================= Fingerprint =========================
// fileFingerprint computes a cheap, content-sensitive fingerprint for a file.
// It combines file size, modtime, (dev,inode on Unix), and the first up-to-4KB content sample
// hashed by FNV-1a 64 to form a hex key. This provides high collision resistance for caching.
func fileFingerprint(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return "", err
}
if !fi.Mode().IsRegular() {
return "", fmt.Errorf("not a regular file: %s", path)
}
const sampleSize = 4 * 1024
buf := make([]byte, sampleSize)
n, _ := io.ReadFull(f, buf) // may be < sampleSize, ignore EOF
h := fnv.New64a()
// size
if err := binary.Write(h, binary.LittleEndian, fi.Size()); err != nil {
return "", err
}
// modtime (ns)
if err := binary.Write(h, binary.LittleEndian, fi.ModTime().UnixNano()); err != nil {
return "", err
}
// platform-specific identity (best-effort)
// On Unix, include dev+ino to detect same file via hard links and changes across paths.
type statT interface{ Dev() uint64; Ino() uint64 }
// We cannot rely on syscall.Stat_t in a portable main file without build tags; instead we omit it here for portability.
// The size+mtime+content sample already provide strong uniqueness.
// content sample
if n > 0 {
if _, err := h.Write(buf[:n]); err != nil {
return "", err
}
}
// Return as fixed-width hex to be stable key
sum := h.Sum64()
key := fmt.Sprintf("%016x", sum)
return key, nil
}
// ========================= Hasher with cache =========================
type hasherCache struct {
lru *lruCache
group singleflight.Group
bufPool sync.Pool
timeout time.Duration
computes int64 // instrumentation for tests
maxReaders int // hint for pool sizing
}
type HasherOptions struct {
MaxEntries int
TTL time.Duration
Timeout time.Duration // per-hash best-effort timeout used by background computation
}
func newHasherCache(opt HasherOptions) *hasherCache {
if opt.MaxEntries <= 0 {
opt.MaxEntries = 2048
}
if opt.TTL <= 0 {
opt.TTL = 10 * time.Minute
}
// Background computation timeout: avoid runaway I/O if no ctx provided via wrapper
if opt.Timeout <= 0 {
opt.Timeout = 0 // 0 means no internal timeout; rely on file IO to finish
}
h := &hasherCache{
lru: newLRU(opt.MaxEntries, opt.TTL),
timeout: opt.Timeout,
maxReaders: runtime.GOMAXPROCS(0),
}
h.bufPool.New = func() any {
// 32KB buffer significantly reduces memory vs original 1MB
b := make([]byte, 32*1024)
return &b
}
return h
}
// Global hasher instance (thread-safe).
// Adjust MaxEntries/TTL as needed to control memory footprint.
var globalHasher = newHasherCache(HasherOptions{
MaxEntries: 4096,
TTL: 10 * time.Minute,
Timeout: 0, // background computes without internal timeout; users can pass ctx with timeout to getHashCtx
})
// GetHashCtx returns SHA-256 hex string of the file content, using a threadsafe LRU+TTL cache and singleflight.
// It uses a content fingerprint as the cache key.
// The hash computation itself uses small chunked reads with buffer pooling for memory efficiency.
func (h *hasherCache) GetHashCtx(ctx context.Context, path string) (string, error) {
// 1) Compute content fingerprint (cheap) for cache key
key, err := fileFingerprint(path)
if err != nil {
return "", err
}
// 2) Fast path: cache hit
if v, ok := h.lru.Get(key); ok {
return v, nil
}
// 3) Singleflight to deduplicate concurrent computes for the same key
// Use DoChan to allow caller cancellation while the compute may continue in background for others.
ch := h.group.DoChan(key, func() (any, error) {
atomic.AddInt64(&h.computes, 1)
// Compute hash with background context (do not bind to a single caller's cancellation).
// Optionally wrap with internal timeout if configured.
callCtx := context.Background()
var cancel context.CancelFunc
if h.timeout > 0 {
callCtx, cancel = context.WithTimeout(context.Background(), h.timeout)
defer cancel()
}
sum, err := hashFileCtx(callCtx, path)
if err != nil {
return "", err
}
// Save in cache
h.lru.Add(key, sum)
return sum, nil
})
select {
case r := <-ch:
if r.Err != nil {
return "", r.Err
}
return r.Val.(string), nil
case <-ctx.Done():
// Caller gave up; background computation may still proceed and populate cache.
return "", ctx.Err()
}
}
// hashFileCtx reads file in small chunks using a pooled buffer and computes SHA-256.
// It checks ctx in the read loop to support cancellation.
func hashFileCtx(ctx context.Context, path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return "", err
}
if !fi.Mode().IsRegular() {
return "", fmt.Errorf("not a regular file: %s", path)
}
h := sha256.New()
// Get pooled buffer (32KB)
p := globalHasher.bufPool.Get().(*[]byte)
buf := *p
defer func() {
// Return buffer to pool
globalHasher.bufPool.Put(p)
}()
for {
// Respect cancellation
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
n, rerr := f.Read(buf)
if n > 0 {
if _, werr := h.Write(buf[:n]); werr != nil {
return "", werr
}
}
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
return "", rerr
}
}
sum := h.Sum(nil)
return hex.EncodeToString(sum), nil
}
// ========================= Compatibility wrappers (signatures unchanged) =========================
// Original signature retained. Delegates to context-aware implementation.
// Errors are not swallowed internally; due to signature constraints we log and return empty string on error.
func getHash(path string) string {
v, err := globalHasher.GetHashCtx(context.Background(), path)
if err != nil {
// Log the error as we cannot return it per original signature.
log.Printf("getHash error for %s: %v", path, err)
return ""
}
return v
}
// Original signature retained. Delegates to context-aware implementation.
func hashFile(path string) (string, error) {
return hashFileCtx(context.Background(), path)
}
// ========================= Demo main =========================
func main() {
// Example workload
for i := 0; i < 1000; i++ {
fmt.Println(getHash("input.bin"))
}
}
文件: cache_test.go
package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
)
func writeTempFile(t *testing.T, dir string, name string, size int) string {
t.Helper()
p := filepath.Join(dir, name)
f, err := os.Create(p)
if err != nil {
t.Fatalf("create: %v", err)
}
defer f.Close()
const chunk = 4096
buf := make([]byte, chunk)
for i := 0; i < size; i += chunk {
for j := range buf {
buf[j] = byte((i + j) % 251)
}
n := chunk
if size-i < chunk {
n = size - i
}
if _, err := f.Write(buf[:n]); err != nil {
t.Fatalf("write: %v", err)
}
}
return p
}
func sha256File(t *testing.T, path string) string {
f, err := os.Open(path)
if err != nil {
t.Fatalf("open: %v", err)
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
t.Fatalf("copy: %v", err)
}
return hex.EncodeToString(h.Sum(nil))
}
func TestConcurrentSamePathSingleflight(t *testing.T) {
t.Parallel()
dir := t.TempDir()
p := writeTempFile(t, dir, "a.bin", 2*1024*1024) // 2MB
want := sha256File(t, p)
// Configure a small cache for test isolation
h := newHasherCache(HasherOptions{
MaxEntries: 8,
TTL: time.Minute,
})
var wg sync.WaitGroup
n := 64
got := make([]string, n)
errs := make([]error, n)
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// Each goroutine gets its own context; some cancel early to test DoChan path
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
v, err := h.GetHashCtx(ctx, p)
got[i] = v
errs[i] = err
}(i)
}
wg.Wait()
for i := 0; i < n; i++ {
if errs[i] != nil {
t.Fatalf("goroutine %d error: %v", i, errs[i])
}
if got[i] != want {
t.Fatalf("hash mismatch: got=%s want=%s", got[i], want)
}
}
// Ensure only one compute happened
if c := atomic.LoadInt64(&h.computes); c != 1 {
t.Fatalf("expected 1 compute, got %d", c)
}
}
func TestLRUEvictionAndTTL(t *testing.T) {
t.Parallel()
dir := t.TempDir()
p1 := writeTempFile(t, dir, "f1.bin", 256*1024)
p2 := writeTempFile(t, dir, "f2.bin", 256*1024)
p3 := writeTempFile(t, dir, "f3.bin", 256*1024)
h := newHasherCache(HasherOptions{
MaxEntries: 2, // small capacity
TTL: 300 * time.Millisecond, // short TTL
})
// Fill two entries
v1, err := h.GetHashCtx(context.Background(), p1)
if err != nil {
t.Fatal(err)
}
v2, err := h.GetHashCtx(context.Background(), p2)
if err != nil {
t.Fatal(err)
}
// Access p1 to make it MRU
if _, err := h.GetHashCtx(context.Background(), p1); err != nil {
t.Fatal(err)
}
// Add third, should evict LRU (p2)
if _, err := h.GetHashCtx(context.Background(), p3); err != nil {
t.Fatal(err)
}
// p2 should be a miss and recomputed
v2b, err := h.GetHashCtx(context.Background(), p2)
if err != nil {
t.Fatal(err)
}
if v2b != v2 {
t.Fatalf("hash mismatch after eviction: %s vs %s", v2b, v2)
}
// TTL expiry
time.Sleep(400 * time.Millisecond)
// Both p1 and p3 likely expired; a new Get should recompute
v1b, err := h.GetHashCtx(context.Background(), p1)
if err != nil {
t.Fatal(err)
}
if v1b != v1 {
t.Fatalf("hash mismatch after TTL: %s vs %s", v1b, v1)
}
}
func TestErrorCases(t *testing.T) {
t.Parallel()
// Non-existent
h := newHasherCache(HasherOptions{MaxEntries: 4, TTL: time.Minute})
_, err := h.GetHashCtx(context.Background(), "no_such_file")
if err == nil {
t.Fatalf("expected error for non-existent file")
}
// Directory
dir := t.TempDir()
_, err = h.GetHashCtx(context.Background(), dir)
if err == nil {
t.Fatalf("expected error for directory")
}
}
func TestContextCancellation(t *testing.T) {
t.Parallel()
dir := t.TempDir()
p := writeTempFile(t, dir, "big.bin", 8*1024*1024)
h := newHasherCache(HasherOptions{MaxEntries: 8, TTL: time.Minute})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := h.GetHashCtx(ctx, p)
if err == nil {
t.Fatalf("expected context error")
}
}
文件: bench_test.go
package main
import (
"context"
"os"
"path/filepath"
"runtime"
"testing"
"time"
)
func BenchmarkGetHashParallel(b *testing.B) {
dir := b.TempDir()
p := filepath.Join(dir, "big.bin")
// Create a ~16MB file once
const size = 16 * 1024 * 1024
f, err := os.Create(p)
if err != nil {
b.Fatal(err)
}
defer f.Close()
if err := f.Truncate(size); err != nil {
b.Fatal(err)
}
// Use a shared cache with reasonable capacity
h := newHasherCache(HasherOptions{
MaxEntries: 128,
TTL: 5 * time.Minute,
})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
ctx := context.Background()
for pb.Next() {
_, err := h.GetHashCtx(ctx, p)
if err != nil {
b.Fatal(err)
}
}
})
b.ReportAllocs()
b.SetBytes(size)
_ = runtime.KeepAlive(h)
}
改动说明
说明:由于“接口签名保持不变”的要求,getHash 仍仅返回字符串,不能直接返回错误,因此在错误时记录日志并返回空串;推荐上游优先使用带 context 的 getHashCtx 以获得完整的错误与取消控制。
问题分析
优化思路
优化后代码 注:为便于集成与阅读,以下单文件包含多个包内可见类。编译需使用 Java 8。默认按第 3 列(索引 2)排序并逐行打印 Arrays.toString(...)。
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
import java.util.function.Consumer;
/**
* 报表系统:CSV 导入与按列稳定排序并输出。
* - 流式 UTF-8 CSV 解析(RFC 4180):引号、转义、多行、BOM、CRLF/LF。
* - 内存小文件:稳定 O(n log n) TimSort。
* - 大文件:外部排序(分块稳定排序 + k 路归并,比较器在相等键值时按 chunkId 保持全局稳定)。
* - 明确异常与资源管理,模块化、可测试。
*/
public class Report {
// 默认每块最多保留的行数(可按环境调优)
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 200_000;
public static void main(String[] args) {
String path = "data.csv";
int sortIdx = 2; // 第3列(从0计)
if (args != null) {
if (args.length >= 1 && args[0] != null && !args[0].trim().isEmpty()) {
path = args[0];
}
if (args.length >= 2) {
try {
sortIdx = Integer.parseInt(args[1]);
} catch (NumberFormatException ignore) {
// 使用默认列
}
}
}
try {
process(path, sortIdx, DEFAULT_MAX_ROWS_IN_MEMORY, System.out);
} catch (IOException e) {
System.err.println("处理文件失败: " + e.getMessage());
e.printStackTrace(System.err);
System.exit(1);
}
}
/**
* 执行读取、排序与输出。
*/
public static void process(String inputPath, int sortIdx, int maxRowsInMemory, PrintStream out) throws IOException {
Objects.requireNonNull(inputPath, "inputPath");
if (maxRowsInMemory <= 0) maxRowsInMemory = DEFAULT_MAX_ROWS_IN_MEMORY;
Consumer<String[]> printer = row -> out.println(Arrays.toString(row));
Sorter.sortCsvByColumnStable(inputPath, sortIdx, maxRowsInMemory, printer);
}
}
/**
* 外部/内存排序选择器,确保稳定排序与内存可控。
*/
class Sorter {
private static final Charset UTF8 = StandardCharsets.UTF_8;
public static void sortCsvByColumnStable(String inputPath, int columnIndex, int maxRowsInMemory,
Consumer<String[]> consumer) throws IOException {
List<File> chunks = new ArrayList<>();
int chunkIdCounter = 0;
List<String[]> buffer = new ArrayList<>(Math.min(maxRowsInMemory, 50_000));
// 读取并按块写出或全部放内存
try (CsvReader reader = CsvReader.fromFile(inputPath, UTF8)) {
String[] row;
while ((row = reader.nextRow()) != null) {
buffer.add(row);
if (buffer.size() >= maxRowsInMemory) {
File chunk = writeSortedChunk(buffer, columnIndex, chunkIdCounter++);
chunks.add(chunk);
buffer.clear();
}
}
}
if (chunks.isEmpty()) {
// 全部在内存内:稳定排序输出
stableSort(buffer, columnIndex);
for (String[] r : buffer) {
consumer.accept(r);
}
return;
} else {
// 有落盘块时,写出最后一块
if (!buffer.isEmpty()) {
File chunk = writeSortedChunk(buffer, columnIndex, chunkIdCounter++);
chunks.add(chunk);
buffer.clear();
}
// k 路归并输出
try {
mergeChunksAndConsume(chunks, columnIndex, consumer);
} finally {
// 清理临时文件
for (File f : chunks) {
try {
Files.deleteIfExists(f.toPath());
} catch (IOException ignore) { /* 尽力而为 */ }
}
}
}
}
private static void stableSort(List<String[]> rows, int columnIndex) {
Comparator<String[]> cmp = (a, b) -> {
String ka = keyOf(a, columnIndex);
String kb = keyOf(b, columnIndex);
return ka.compareTo(kb);
};
// Collections.sort 在 Java 8 为 TimSort,稳定
Collections.sort(rows, cmp);
}
private static File writeSortedChunk(List<String[]> buffer, int columnIndex, int chunkId) throws IOException {
// 稳定排序每一块
stableSort(buffer, columnIndex);
File tmp = File.createTempFile("report_chunk_" + chunkId + "_", ".csv");
// 写 CSV(RFC 4180)
try (OutputStream os = new FileOutputStream(tmp);
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(osw)) {
for (String[] row : buffer) {
CsvWriter.writeLine(bw, row);
}
}
return tmp;
}
private static void mergeChunksAndConsume(List<File> chunks, int columnIndex, Consumer<String[]> consumer) throws IOException {
PriorityQueue<MergeEntry> pq = new PriorityQueue<>(new Comparator<MergeEntry>() {
@Override
public int compare(MergeEntry e1, MergeEntry e2) {
int c = e1.key.compareTo(e2.key);
if (c != 0) return c;
// 关键:键相等时按 chunkId 排序,保证全局稳定性(早读入的块优先)
return Integer.compare(e1.chunkId, e2.chunkId);
}
});
List<CsvReader> readers = new ArrayList<>(chunks.size());
try {
// 初始化每个块的首行入堆
for (int i = 0; i < chunks.size(); i++) {
CsvReader r = CsvReader.fromFile(chunks.get(i).getAbsolutePath(), StandardCharsets.UTF_8);
readers.add(r);
String[] row = r.nextRow();
if (row != null) {
pq.add(new MergeEntry(i, row, keyOf(row, columnIndex), r));
}
}
while (!pq.isEmpty()) {
MergeEntry e = pq.poll();
consumer.accept(e.row);
String[] next = e.reader.nextRow();
if (next != null) {
pq.add(new MergeEntry(e.chunkId, next, keyOf(next, columnIndex), e.reader));
}
}
} finally {
// 关闭所有 reader
for (CsvReader r : readers) {
try {
r.close();
} catch (IOException ignore) { /* 尽力而为 */ }
}
}
}
private static String keyOf(String[] row, int idx) {
if (row == null) return "";
return (idx >= 0 && idx < row.length && row[idx] != null) ? row[idx] : "";
}
private static final class MergeEntry {
final int chunkId;
final String[] row;
final String key;
final CsvReader reader;
MergeEntry(int chunkId, String[] row, String key, CsvReader reader) {
this.chunkId = chunkId;
this.row = row;
this.key = key;
this.reader = reader;
}
}
}
/**
* 简单 RFC 4180 CSV 写出器(仅行写)。
*/
class CsvWriter {
public static void writeLine(Writer w, String[] fields) throws IOException {
if (fields == null) {
w.write("\n");
return;
}
for (int i = 0; i < fields.length; i++) {
if (i > 0) w.write(',');
writeField(w, fields[i]);
}
w.write('\n');
}
private static void writeField(Writer w, String field) throws IOException {
if (field == null) field = "";
boolean needQuote = false;
for (int i = 0; i < field.length(); i++) {
char ch = field.charAt(i);
if (ch == '"' || ch == ',' || ch == '\n' || ch == '\r') {
needQuote = true;
break;
}
}
if (!needQuote) {
w.write(field);
return;
}
w.write('"');
for (int i = 0; i < field.length(); i++) {
char ch = field.charAt(i);
if (ch == '"') {
w.write("\"\""); // 转义双引号
} else {
w.write(ch);
}
}
w.write('"');
}
}
/**
* 流式 CSV 读取器(UTF-8,支持 BOM、引号、转义、多行、CRLF/LF)。
* 参考 RFC 4180 基本规则。
*/
class CsvReader implements Closeable {
private final Reader in;
private boolean eof = false;
private boolean bomHandled = false;
private int pushed = -2; // 单字符回退缓冲;-2 表示空
private CsvReader(Reader in) {
this.in = in;
}
public static CsvReader fromFile(String path, Charset charset) throws FileNotFoundException {
InputStream is = new FileInputStream(path);
InputStreamReader isr = new InputStreamReader(is, charset);
BufferedReader br = new BufferedReader(isr, 64 * 1024);
return new CsvReader(br);
}
@Override
public void close() throws IOException {
in.close();
}
public String[] nextRow() throws IOException {
if (eof) return null;
List<String> fields = new ArrayList<>(16);
StringBuilder sb = new StringBuilder(64);
boolean inQuotes = false;
boolean fieldStarted = false;
while (true) {
int ch = readChar();
if (ch == -1) {
eof = true;
if (inQuotes) {
// 文件结束但仍在引号中:按已读内容结束字段(宽松处理)
inQuotes = false;
}
// 如果已有内容或已有字段,提交最后一个字段
if (sb.length() > 0 || fieldStarted || !fields.isEmpty()) {
fields.add(sb.toString());
return fields.toArray(new String[0]);
} else {
return null; // 真正 EOF,无行
}
}
// 处理 UTF-8 BOM(仅首字符)
if (!bomHandled) {
bomHandled = true;
if (ch == 0xFEFF) {
// 跳过 BOM,继续读
continue;
}
}
if (inQuotes) {
if (ch == '"') {
int pk = readChar();
if (pk == '"') {
// 转义双引号
sb.append('"');
} else {
// 引号结束,回退非引号字符
inQuotes = false;
unread(pk);
}
} else {
sb.append((char) ch);
}
} else {
if (ch == ',') {
fields.add(sb.toString());
sb.setLength(0);
fieldStarted = false;
} else if (ch == '\r') {
// 处理 CRLF 或单独 CR
int pk = readChar();
if (pk != '\n') {
unread(pk);
}
fields.add(sb.toString());
return fields.toArray(new String[0]);
} else if (ch == '\n') {
fields.add(sb.toString());
return fields.toArray(new String[0]);
} else if (ch == '"') {
if (!fieldStarted && sb.length() == 0) {
inQuotes = true;
fieldStarted = true;
} else {
// 非法位置的引号,宽松地当作普通字符
sb.append('"');
fieldStarted = true;
}
} else {
sb.append((char) ch);
fieldStarted = true;
}
}
}
}
private int readChar() throws IOException {
if (pushed != -2) {
int tmp = pushed;
pushed = -2;
return tmp;
}
return in.read();
}
private void unread(int ch) {
if (ch == -1) return;
if (pushed != -2) throw new IllegalStateException("Pushback buffer overflow");
pushed = ch;
}
}
/**
* 单元测试(JUnit 4)。
* 如需运行,确保引入 junit:junit:4.x 依赖,并在构建时包含测试。
*/
class ReportTest {
@org.junit.Test
public void testCsvParseBasic() throws Exception {
String s = "a,b,c\n1,2,3\n";
try (CsvReader r = new CsvReader(new StringReader(s))) {
org.junit.Assert.assertArrayEquals(new String[]{"a","b","c"}, r.nextRow());
org.junit.Assert.assertArrayEquals(new String[]{"1","2","3"}, r.nextRow());
org.junit.Assert.assertNull(r.nextRow());
}
}
@org.junit.Test
public void testCsvQuotesAndEscapes() throws Exception {
String s = "\"a,b\",\"\nmultiline\"\"quote\"\"\",x\r\nlast,,";
try (CsvReader r = new CsvReader(new StringReader(s))) {
org.junit.Assert.assertArrayEquals(new String[]{"a,b", "\nmultiline\"quote\"", "x"}, r.nextRow());
org.junit.Assert.assertArrayEquals(new String[]{"last", "", ""}, r.nextRow());
org.junit.Assert.assertNull(r.nextRow());
}
}
@org.junit.Test
public void testInMemoryStableSort() throws Exception {
File f = File.createTempFile("report_test_", ".csv");
try (Writer w = new OutputStreamWriter(new FileOutputStream(f), StandardCharsets.UTF_8)) {
w.write("k,b\n");
w.write("a,1\n");
w.write("a,2\n");
w.write("b,0\n");
}
List<String[]> out = new ArrayList<>();
Sorter.sortCsvByColumnStable(f.getAbsolutePath(), 0, 1000, out::add);
// 稳定性:同键 a 时顺序不变
org.junit.Assert.assertArrayEquals(new String[]{"a","1"}, out.get(1));
org.junit.Assert.assertArrayEquals(new String[]{"a","2"}, out.get(2));
f.delete();
}
@org.junit.Test
public void testExternalStableSort() throws Exception {
// 强制小阈值促使外部排序
File f = File.createTempFile("report_big_", ".csv");
try (Writer w = new OutputStreamWriter(new FileOutputStream(f), StandardCharsets.UTF_8)) {
w.write("k,val\n");
for (int i = 0; i < 100; i++) {
w.write("b," + i + "\n");
w.write("a," + i + "\n");
}
}
List<String[]> out = new ArrayList<>();
Sorter.sortCsvByColumnStable(f.getAbsolutePath(), 0, 10, out::add);
// 头部应是表头后所有 a*
org.junit.Assert.assertEquals("k", out.get(0)[0]);
for (int i = 1; i <= 100; i++) {
org.junit.Assert.assertEquals("a", out.get(i)[0]);
}
f.delete();
}
}
改动说明
整体上,本方案确保了在大文件下的内存可控、稳定排序与高可维护性,并显著提升了健壮性与性能。
针对 对低效代码进行问题解析并优化改写,提升性能和可维护性 的日常工作场景,该工具旨在解决以下问题:
工具名称: 代码优化
功能简介: 专业分析低效代码问题,提供针对性的优化方案和重构建议,帮助开发者提升代码性能、可读性和可维护性,确保软件质量。
构建完整的代码质量保障流程,从问题发现到优化实施,确保代码质量和团队协作效率。
帮助开发者快速定位低效代码问题并提供优化方案,从而提升程序性能,减少项目开发时间。
为团队提供代码标准化支持,保障高代码质量,减少技术债,降低后期维护成本。
通过优化提示学习代码改进的最佳实践,快速掌握写出高效、可维护代码的能力。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期