热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词旨在为特定编程任务提供结构化、高质量的解决方案设计与评审。它通过引导用户输入详细的项目背景、具体需求与约束条件,输出包含代码实现、最佳实践、工具选型及潜在风险分析的综合方案,适用于需要严谨、可复用代码的软件开发场景。
下面给出一个面向企业内网的特性开关(Feature Flag)平台的端到端方案与代码骨架,覆盖服务端(TypeScript/Node)、评估侧车与 SDK(Go/TS)、双协议接口(HTTP REST + gRPC)、审计与回滚、RBAC、多租户与多环境、规则评估、灰度发布、事件投递、CLI、可观测性、部署清单(Helm 与 docker-compose)、数据库迁移与种子数据。架构遵循 Hexagonal(端口/适配器),接口契约优先(OpenAPI + protobuf),满足 MIT/Apache-2.0 许可证约束,不使用 AGPL 依赖,支持 OpenTelemetry。
一、总体架构与模块划分
二、目录结构(顶层 mono-repo)
三、核心数据模型(Prisma,PostgreSQL 14)
// SPDX-License-Identifier: Apache-2.0
datasource db { provider = "postgresql"; url = env("DATABASE_URL") }
generator client { provider = "prisma-client-js" }
model Tenant {
id String @id @default(cuid())
name String @unique
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
environments Environment[]
users UserTenantRole[]
rbacPolicies RbacPolicy[]
quotas Json?
}
model Environment {
id String @id @default(cuid())
name String
tenantId String
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade)
flags Flag[]
uniqueIndex String? @unique // optional code
}
model Flag {
id String @id @default(cuid())
key String
description String?
enabled Boolean @default(false)
tags String[] @default([])
version Int @default(1)
envId String
environment Environment @relation(fields: [envId], references: [id], onDelete: Cascade)
rules Rule[]
variants Variant[]
createdBy String
updatedBy String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
archived Boolean @default(false)
@@unique([envId, key])
}
model Variant {
id String @id @default(cuid())
flagId String
flag Flag @relation(fields: [flagId], references: [id], onDelete: Cascade)
name String
payload Json?
weight Int @default(0) // 0-10000 basis points for precision
}
model Rule {
id String @id @default(cuid())
flagId String
flag Flag @relation(fields: [flagId], references: [id], onDelete: Cascade)
priority Int @default(0)
type String // "attribute_match" | "percentage" | "time_window" | "region" | "composite"
expr Json // DSL JSON for operators
enabled Boolean @default(true)
}
model AuditLog {
id String @id @default(cuid())
tenantId String
envId String?
resource String // e.g., "flag:key"
action String // "create"|"update"|"delete"|"rollback"|"freeze"
actor String
payload Json
createdAt DateTime @default(now())
hash String // sha256(current)
prevHash String? // sha256(previous)
immutable Boolean @default(true)
@@index([tenantId, envId, createdAt])
}
model WebhookEndpoint {
id String @id @default(cuid())
tenantId String
url String
secret String // encrypted AES-GCM blob
eventTypes String[] // ["flag.updated","publish.started"]
enabled Boolean @default(true)
createdAt DateTime @default(now())
}
model RbacPolicy {
id String @id @default(cuid())
tenantId String
subject String // user or group id
resource String // "tenant:*"|"env:ENV_ID"|"flag:FLAG_KEY"
action String // "read"|"write"|"admin"
effect String // "allow"|"deny"
createdAt DateTime @default(now())
}
model UserTenantRole {
id String @id @default(cuid())
tenantId String
userId String
role String // "admin"|"editor"|"viewer"
createdAt DateTime @default(now())
}
四、接口契约
openapi: 3.0.3
info:
title: FeatureFlag Platform
version: 1.0.0
servers:
- url: /api/v1
paths:
/tenants/{tenantId}/environments/{envId}/flags:
get:
operationId: listFlags
security: [{ oidc: [] }]
parameters:
- name: tenantId; in: path; required: true; schema: { type: string }
- name: envId; in: path; required: true; schema: { type: string }
responses:
"200": { description: OK, content: { application/json: { schema: { type: array, items: { $ref: "#/components/schemas/Flag" } } } } }
post:
operationId: createFlag
security: [{ oidc: [] }]
requestBody: { required: true, content: { application/json: { schema: { $ref: "#/components/schemas/CreateFlagReq" } } } }
responses:
"201": { description: Created, content: { application/json: { schema: { $ref: "#/components/schemas/Flag" } } } }
/tenants/{tenantId}/environments/{envId}/flags/{key}:
get: { operationId: getFlag, security: [{ oidc: [] }], responses: { "200": { description: OK, content: { application/json: { schema: { $ref: "#/components/schemas/Flag" } } } }, "404": { description: Not Found } } }
put: { operationId: updateFlag, security: [{ oidc: [] }], requestBody: { required: true, content: { application/json: { schema: { $ref: "#/components/schemas/UpdateFlagReq" } } } }, responses: { "200": { description: OK, content: { application/json: { schema: { $ref: "#/components/schemas/Flag" } } } } } }
delete: { operationId: deleteFlag, security: [{ oidc: [] }], responses: { "204": { description: No Content } } }
/evaluation:
post:
operationId: evaluate
security: [{ oidc: [] }]
requestBody: { required: true, content: { application/json: { schema: { $ref: "#/components/schemas/EvalReq" } } } }
responses:
"200": { description: OK, content: { application/json: { schema: { $ref: "#/components/schemas/EvalResp" } } } }
components:
securitySchemes:
oidc:
type: openIdConnect
openIdConnectUrl: https://idp.internal/.well-known/openid-configuration
schemas:
Flag:
type: object
properties:
id: { type: string }
key: { type: string }
enabled: { type: boolean }
tags: { type: array, items: { type: string } }
version: { type: integer }
variants:
type: array
items:
type: object
properties:
name: { type: string }
payload: { type: object }
weight: { type: integer }
rules:
type: array
items:
type: object
properties:
priority: { type: integer }
type: { type: string }
expr: { type: object }
CreateFlagReq:
type: object
required: [key]
properties:
key: { type: string }
description: { type: string }
tags: { type: array, items: { type: string } }
UpdateFlagReq:
type: object
properties:
enabled: { type: boolean }
tags: { type: array, items: { type: string } }
rules: { type: array, items: { type: object } }
EvalReq:
type: object
required: [tenantId, envId, key, user]
properties:
tenantId: { type: string }
envId: { type: string }
key: { type: string }
user:
type: object
properties:
id: { type: string }
attributes: { type: object }
EvalResp:
type: object
properties:
variant: { type: string }
payload: { type: object }
reason: { type: string }
matchedRule: { type: string }
syntax = "proto3";
package featureflag.v1;
option go_package = "github.com/yourorg/featureflag/proto/gen;gen";
message TenantEnv {
string tenant_id = 1;
string env_id = 2;
}
message Flag {
string id = 1;
string key = 2;
bool enabled = 3;
repeated string tags = 4;
int32 version = 5;
repeated Variant variants = 6;
repeated Rule rules = 7;
}
message Variant {
string name = 1;
string payload_json = 2; // JSON string for simplicity
int32 weight_bp = 3; // basis points (0-10000)
}
message Rule {
int32 priority = 1;
string type = 2;
string expr_json = 3;
bool enabled = 4;
}
message ListFlagsRequest { TenantEnv scope = 1; }
message ListFlagsResponse { repeated Flag flags = 1; }
message GetFlagRequest { TenantEnv scope = 1; string key = 2; }
message GetFlagResponse { Flag flag = 1; }
message UpsertFlagRequest { TenantEnv scope = 1; Flag flag = 2; }
message UpsertFlagResponse { Flag flag = 1; }
message DeleteFlagRequest { TenantEnv scope = 1; string key = 2; }
message DeleteFlagResponse {}
message EvalRequest {
TenantEnv scope = 1;
string key = 2;
string user_id = 3;
string attributes_json = 4;
}
message EvalResponse {
string variant = 1;
string payload_json = 2;
string reason = 3;
string matched_rule = 4;
}
message StreamConfigRequest { TenantEnv scope = 1; }
message ConfigEnvelope {
string data_json = 1; // e.g., snapshot of env flags
int64 version = 2;
}
service FlagService {
rpc ListFlags(ListFlagsRequest) returns (ListFlagsResponse);
rpc GetFlag(GetFlagRequest) returns (GetFlagResponse);
rpc UpsertFlag(UpsertFlagRequest) returns (UpsertFlagResponse);
rpc DeleteFlag(DeleteFlagRequest) returns (DeleteFlagResponse);
rpc StreamConfig(StreamConfigRequest) returns (stream ConfigEnvelope);
}
service EvaluationService {
rpc Evaluate(EvalRequest) returns (EvalResponse);
}
五、服务端关键代码骨架(TypeScript/Node)
// SPDX-License-Identifier: Apache-2.0
import Fastify from 'fastify';
import { initOtel } from './adapters/observability/otel';
import { registerRoutes } from './adapters/http/routes';
import { PrismaClient } from '@prisma/client';
import Redis from 'ioredis';
import { createAppServices } from './bootstrap/app';
import { registerGrpcServer } from './adapters/grpc/server';
async function start() {
const otel = initOtel();
const app = Fastify({ logger: true });
const prisma = new PrismaClient();
const redis = new Redis(process.env.REDIS_URL!);
const services = createAppServices({ prisma, redis });
await registerRoutes(app, services);
await registerGrpcServer(app.server, services); // share server if desired or start separate
const port = Number(process.env.PORT ?? 8080);
await app.listen({ port, host: '0.0.0.0' });
process.on('SIGINT', async () => {
await app.close(); await prisma.$disconnect(); redis.disconnect(); otel.shutdown();
});
}
start().catch(err => { console.error(err); process.exit(1); });
// SPDX-License-Identifier: Apache-2.0
import crypto from 'crypto';
export type UserContext = { id: string; attributes?: Record<string, any> };
export type Variant = { name: string; payload?: any; weightBp: number };
export type RuleExpr = any;
export type Rule = {
priority: number;
type: 'attribute_match' | 'percentage' | 'time_window' | 'region' | 'composite';
expr: RuleExpr;
enabled: boolean;
};
export type Flag = {
key: string;
enabled: boolean;
variants: Variant[];
rules: Rule[];
};
export type EvalResult = { variant: string; payload?: any; reason: string; matchedRule?: string };
function hashToBucket(userId: string, key: string): number {
const h = crypto.createHash('sha256').update(userId + ':' + key).digest();
// convert first 4 bytes to int, map to 0..9999 basis points
const n = h.readUInt32BE(0);
return n % 10000;
}
function attributeMatch(attrs: Record<string, any>, expr: RuleExpr): boolean {
// simple operators: equals, in, contains, regex
for (const cond of expr?.conditions ?? []) {
const v = attrs?.[cond.field];
switch (cond.op) {
case 'equals': if (v !== cond.value) return false; break;
case 'in': if (!Array.isArray(cond.values) || !cond.values.includes(v)) return false; break;
case 'contains': if (!Array.isArray(v) || !v.includes(cond.value)) return false; break;
case 'regex': if (!(new RegExp(cond.pattern)).test(String(v ?? ''))) return false; break;
default: return false;
}
}
return true;
}
function timeWindow(now: Date, expr: RuleExpr): boolean {
const start = expr?.start ? new Date(expr.start) : undefined;
const end = expr?.end ? new Date(expr.end) : undefined;
if (start && now < start) return false;
if (end && now > end) return false;
return true;
}
function regionMatch(attrs: Record<string, any>, expr: RuleExpr): boolean {
const allowed = expr?.regions ?? [];
const userRegion = attrs?.region || attrs?.country;
return allowed.includes(userRegion);
}
export function evaluate(flag: Flag, user: UserContext, now = new Date()): EvalResult {
if (!flag.enabled) return { variant: 'off', reason: 'flag_disabled' };
const sortedRules = [...flag.rules]
.filter(r => r.enabled)
.sort((a, b) => a.priority - b.priority);
const attrs = user.attributes ?? {};
for (const rule of sortedRules) {
let matched = false;
switch (rule.type) {
case 'attribute_match': matched = attributeMatch(attrs, rule.expr); break;
case 'time_window': matched = timeWindow(now, rule.expr); break;
case 'region': matched = regionMatch(attrs, rule.expr); break;
case 'percentage':
const pctBp = Number(rule.expr?.percentageBp ?? 0);
const bucket = hashToBucket(user.id, flag.key);
matched = bucket < pctBp;
break;
case 'composite':
// AND/OR of sub-rules
const clauses = rule.expr?.clauses ?? [];
matched = clauses.every((c: any) => {
switch (c.type) {
case 'attribute_match': return attributeMatch(attrs, c.expr);
case 'time_window': return timeWindow(now, c.expr);
case 'region': return regionMatch(attrs, c.expr);
case 'percentage': return hashToBucket(user.id, flag.key) < Number(c.expr?.percentageBp ?? 0);
default: return false;
}
});
break;
default: matched = false;
}
if (matched) {
// select variant by weights in basis points deterministically
const bucket = hashToBucket(user.id, flag.key + ':variant');
let acc = 0;
for (const v of flag.variants) {
acc += v.weightBp;
if (bucket < acc) return { variant: v.name, payload: v.payload, reason: 'rule_match', matchedRule: rule.type };
}
// fallback: first variant or off
return { variant: flag.variants[0]?.name ?? 'on', payload: flag.variants[0]?.payload, reason: 'rule_match_default', matchedRule: rule.type };
}
}
// default behavior if no rule matches: treat enabled as "on"
return { variant: flag.variants[0]?.name ?? 'on', payload: flag.variants[0]?.payload, reason: 'no_rule_match' };
}
// SPDX-License-Identifier: Apache-2.0
import { FastifyInstance } from 'fastify';
import { requireAuth } from '../auth/oidc';
import { checkRbac } from '../auth/rbac';
import { PrismaClient } from '@prisma/client';
import { evaluate } from '../../application/engine';
export async function registerRoutes(app: FastifyInstance, services: any) {
app.addHook('preHandler', requireAuth(services.auth)); // OIDC guard
app.get('/api/v1/tenants/:tenantId/environments/:envId/flags', async (req, reply) => {
const { tenantId, envId } = req.params as any;
await checkRbac(req, { tenantId, resource: `env:${envId}`, action: 'read' });
const flags = await services.repo.listFlags(tenantId, envId);
return reply.send(flags);
});
app.post('/api/v1/tenants/:tenantId/environments/:envId/flags', async (req, reply) => {
const { tenantId, envId } = req.params as any;
await checkRbac(req, { tenantId, resource: `env:${envId}`, action: 'write' });
const created = await services.repo.createFlag(tenantId, envId, req.body as any, req.user.sub);
await services.audit.log({ tenantId, envId, resource: `flag:${created.key}`, action: 'create', actor: req.user.sub, payload: created });
await services.publisher.publishEnv(tenantId, envId); // push config snapshot to Redis channel
return reply.code(201).send(created);
});
app.post('/api/v1/evaluation', async (req, reply) => {
const body = req.body as any;
await checkRbac(req, { tenantId: body.tenantId, resource: `env:${body.envId}`, action: 'read' });
const flag = await services.cache.getFlag(body.tenantId, body.envId, body.key) ?? await services.repo.getFlag(body.tenantId, body.envId, body.key);
if (!flag) return reply.code(404).send({ message: 'Flag not found' });
const result = evaluate(flag, body.user);
return reply.send(result);
});
}
// SPDX-License-Identifier: Apache-2.0
import { FastifyRequest } from 'fastify';
import { createRemoteJWKSet, jwtVerify } from 'jose';
export function requireAuth(auth: any) {
const JWKS = createRemoteJWKSet(new URL(process.env.OIDC_JWKS_URL!));
return async (req: FastifyRequest, reply: any) => {
try {
const authz = req.headers.authorization || '';
const token = authz.replace(/^Bearer\s+/, '');
const { payload } = await jwtVerify(token, JWKS, { issuer: process.env.OIDC_ISSUER, audience: process.env.OIDC_AUDIENCE });
(req as any).user = payload;
} catch (e) {
return reply.code(401).send({ message: 'unauthorized' });
}
};
}
// Simplified RBAC check
export async function checkRbac(req: any, input: { tenantId: string; resource: string; action: 'read'|'write'|'admin' }) {
const userId = req.user.sub;
// query RbacPolicy or UserTenantRole; cache decisions
// For brevity, assume services injected via req
const allowed = await req.server.services.rbac.isAllowed(userId, input.tenantId, input.resource, input.action);
if (!allowed) throw new Error('forbidden');
}
// SPDX-License-Identifier: Apache-2.0
export class ConfigPublisher {
constructor(private redis: any, private repo: any) {}
async publishEnv(tenantId: string, envId: string) {
const snapshot = await this.repo.snapshotEnv(tenantId, envId); // JSON of all flags
const channel = `ff:cfg:${tenantId}:${envId}`;
await this.redis.publish(channel, JSON.stringify({ version: Date.now(), data: snapshot }));
}
}
六、单元测试示例(Vitest,server-ts/test/engine.spec.ts)
// SPDX-License-Identifier: Apache-2.0
import { describe, it, expect } from 'vitest';
import { evaluate } from '../src/application/engine';
describe('evaluation engine', () => {
it('percentage rollout and variant selection', () => {
const flag = {
key: 'new_ui',
enabled: true,
variants: [{ name: 'control', weightBp: 5000 }, { name: 'treatment', weightBp: 5000 }],
rules: [{ priority: 0, type: 'percentage', expr: { percentageBp: 3000 }, enabled: true }]
};
const res1 = evaluate(flag as any, { id: 'userA' });
const res2 = evaluate(flag as any, { id: 'userB' });
expect(['control','treatment']).toContain(res1.variant);
expect(['control','treatment']).toContain(res2.variant);
});
it('attribute match', () => {
const flag = {
key: 'beta_access',
enabled: true,
variants: [{ name: 'on', weightBp: 10000 }],
rules: [{ priority: 0, type: 'attribute_match', expr: { conditions: [{ field: 'plan', op: 'equals', value: 'pro' }] }, enabled: true }]
};
const res = evaluate(flag as any, { id: 'u', attributes: { plan: 'pro' } });
expect(res.variant).toBe('on');
});
});
七、Go 侧车与 SDK骨架
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"net"
"os"
"time"
"google.golang.org/grpc"
"go.opentelemetry.io/otel"
"github.com/yourorg/featureflag/sidecar/internal/server"
"github.com/yourorg/featureflag/sidecar/internal/grpcclient"
)
func main() {
// Init OTel (exporter to OTLP endpoint)
// ...
s := grpc.NewServer()
evalServer := server.NewEvalServer()
server.RegisterEvalService(s, evalServer)
go func() {
addr := ":9090"
lis, err := net.Listen("tcp", addr)
if err != nil { log.Fatal(err) }
log.Printf("sidecar listening %s", addr)
if err := s.Serve(lis); err != nil { log.Fatal(err) }
}()
// subscribe config stream from platform
client := grpcclient.New(os.Getenv("PLATFORM_GRPC_ADDR"))
ctx := context.Background()
for {
if err := client.StreamConfig(ctx, os.Getenv("TENANT_ID"), os.Getenv("ENV_ID"), evalServer.ApplySnapshot); err != nil {
log.Printf("stream error: %v; retrying...", err)
time.Sleep(2 * time.Second)
}
}
}
// SPDX-License-Identifier: Apache-2.0
package server
import (
"encoding/json"
"sync"
"time"
pb "github.com/yourorg/featureflag/proto/gen"
)
type Variant struct {
Name string `json:"name"`
Payload json.RawMessage `json:"payload"`
WeightBp int `json:"weightBp"`
}
type Rule struct {
Priority int `json:"priority"`
Type string `json:"type"`
Expr json.RawMessage `json:"expr"`
Enabled bool `json:"enabled"`
}
type Flag struct {
Key string `json:"key"`
Enabled bool `json:"enabled"`
Variants []Variant`json:"variants"`
Rules []Rule `json:"rules"`
}
type EvalServer struct {
mu sync.RWMutex
flags map[string]Flag // key -> Flag for current env
}
func NewEvalServer() *EvalServer {
return &EvalServer{flags: make(map[string]Flag)}
}
func (s *EvalServer) ApplySnapshot(version int64, data []byte) error {
var m map[string]Flag
if err := json.Unmarshal(data, &m); err != nil { return err }
s.mu.Lock()
defer s.mu.Unlock()
s.flags = m
return nil
}
// Evaluate implements pb.EvaluationService
func (s *EvalServer) Evaluate(req *pb.EvalRequest) (*pb.EvalResponse, error) {
s.mu.RLock()
flag, ok := s.flags[req.Key]
s.mu.RUnlock()
if !ok || !flag.Enabled {
return &pb.EvalResponse{Variant: "off", Reason: "flag_disabled"}, nil
}
// Use same hashing and rule logic as TS (omitted for brevity)
// Choose variant deterministically
return &pb.EvalResponse{Variant: "on", Reason: "no_rule_match"}, nil
}
// SPDX-License-Identifier: Apache-2.0
import LRU from 'lru-cache';
export type EvalOptions = { timeoutMs?: number, fallback?: { variant: string, payload?: any } };
export class FeatureFlagClient {
private cache = new LRU<string, any>({ max: 1000, ttl: 60_000 });
constructor(private baseUrl?: string, private grpc?: any) {}
async evaluate(scope: { tenantId: string; envId: string }, key: string, user: { id: string; attributes?: any }, opt?: EvalOptions) {
const cacheKey = `${scope.tenantId}:${scope.envId}:${key}:${user.id}`;
const cached = this.cache.get(cacheKey);
if (cached) return cached;
try {
let res;
if (this.grpc) {
res = await this.grpc.evaluate(scope, key, user, opt?.timeoutMs ?? 300);
} else {
const r = await fetch(`${this.baseUrl}/api/v1/evaluation`, {
method: 'POST',
headers: { 'content-type': 'application/json', authorization: `Bearer ${opt?.['token'] ?? ''}` },
body: JSON.stringify({ ...scope, key, user })
});
res = await r.json();
}
this.cache.set(cacheKey, res);
return res;
} catch (e) {
return opt?.fallback ?? { variant: 'off' };
}
}
}
// SPDX-License-Identifier: Apache-2.0
package sdk
import (
"context"
"time"
)
type Client struct {
httpBase string
grpc EvaluationTransport
cache *LRU
}
func New(httpBase string, grpc EvaluationTransport) *Client {
return &Client{httpBase: httpBase, grpc: grpc, cache: NewLRU(1000, time.Minute)}
}
func (c *Client) Evaluate(ctx context.Context, tenantId, envId, key string, user map[string]any, fallback Variant) (Variant, error) {
ck := tenantId + ":" + envId + ":" + key
if v, ok := c.cache.Get(ck); ok {
return v.(Variant), nil
}
// call gRPC if available; else HTTP
// handle timeout and fallback
return fallback, nil
}
八、CLI 示例(TypeScript,cli/src/commands/init.ts、push.ts)
// SPDX-License-Identifier: Apache-2.0
import { Command } from 'commander';
import fetch from 'node-fetch';
export const initCmd = new Command('init')
.requiredOption('--tenant <id>', 'Tenant ID')
.requiredOption('--env <id>', 'Environment ID')
.action(async (opts) => {
console.log(`Initializing tenant ${opts.tenant} env ${opts.env}`);
// Create default flag via REST
await fetch(`${process.env.BASE_URL}/api/v1/tenants/${opts.tenant}/environments/${opts.env}/flags`, {
method: 'POST',
headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.TOKEN}` },
body: JSON.stringify({ key: 'example', description: 'Sample flag', tags: ['demo'] })
});
console.log('Done');
});
// SPDX-License-Identifier: Apache-2.0
import { Command } from 'commander';
import fs from 'fs';
import fetch from 'node-fetch';
export const pushCmd = new Command('push')
.requiredOption('--tenant <id>')
.requiredOption('--env <id>')
.requiredOption('--file <path>')
.action(async (opts) => {
const data = JSON.parse(fs.readFileSync(opts.file, 'utf8'));
for (const f of data.flags) {
await fetch(`${process.env.BASE_URL}/api/v1/tenants/${opts.tenant}/environments/${opts.env}/flags/${f.key}`, {
method: 'PUT',
headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.TOKEN}` },
body: JSON.stringify(f)
});
}
console.log('Pushed');
});
九、数据库迁移与种子数据
// SPDX-License-Identifier: Apache-2.0
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
async function main() {
const tenant = await prisma.tenant.upsert({
where: { name: 'demo' },
update: {},
create: { name: 'demo' }
});
const env = await prisma.environment.create({ data: { name: 'prod', tenantId: tenant.id } });
await prisma.flag.create({
data: {
key: 'new_ui',
envId: env.id,
enabled: true,
tags: ['beta'],
variants: { create: [{ name: 'control', weight: 5000 }, { name: 'treatment', weight: 5000 }] },
rules: { create: [{ priority: 0, type: 'percentage', expr: { percentageBp: 3000 } as any, enabled: true }] },
createdBy: 'seed'
}
});
}
main().finally(() => prisma.$disconnect());
十、部署清单
version: "3.9"
services:
postgres:
image: postgres:14
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: featureflag
ports: ["5432:5432"]
redis:
image: redis:7
ports: ["6379:6379"]
server:
build: ../server-ts
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/featureflag
REDIS_URL: redis://redis:6379
OIDC_JWKS_URL: http://idp.local/jwks.json
OIDC_ISSUER: http://idp.local/
OIDC_AUDIENCE: featureflag
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
ports: ["8080:8080", "50051:50051"]
depends_on: [postgres, redis]
image:
repository: harbor.internal/featureflag/server
tag: v1.0.0
env:
DATABASE_URL: postgresql://user:pass@postgresql:5432/featureflag
REDIS_URL: redis://redis:6379
OIDC_JWKS_URL: https://idp.internal/jwks.json
OIDC_ISSUER: https://idp.internal/
OIDC_AUDIENCE: featureflag
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
resources:
requests: { cpu: "200m", memory: "256Mi" }
limits: { cpu: "1000m", memory: "512Mi" }
apiVersion: apps/v1
kind: Deployment
metadata: { name: featureflag-server }
spec:
replicas: 2
selector: { matchLabels: { app: featureflag-server } }
template:
metadata:
labels: { app: featureflag-server }
annotations:
instrumentation.opentelemetry.io/inject-nodejs: "true"
spec:
containers:
- name: server
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
ports: [{ containerPort: 8080 }, { containerPort: 50051 }]
env:
- { name: DATABASE_URL, valueFrom: { secretKeyRef: { name: ff-secrets, key: DATABASE_URL } } }
- { name: REDIS_URL, value: "{{ .Values.env.REDIS_URL }}" }
- { name: OTEL_EXPORTER_OTLP_ENDPOINT, value: "{{ .Values.env.OTEL_EXPORTER_OTLP_ENDPOINT }}" }
readinessProbe: { httpGet: { path: /health, port: 8080 }, initialDelaySeconds: 5 }
livenessProbe: { httpGet: { path: /health, port: 8080 }, initialDelaySeconds: 10 }
十一、CI(GitHub Actions)
name: ci
on: [push]
jobs:
build-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with: { node-version: '20' }
- name: Install
run: cd server-ts && npm ci
- name: Lint
run: npm run lint
- name: Test
run: npm test -- --coverage
- name: Build image
run: docker build -t harbor.internal/featureflag/server:${{ github.sha }} server-ts
- name: Login Harbor
run: docker login harbor.internal -u $HARBOR_USER -p $HARBOR_PASS
env:
HARBOR_USER: ${{ secrets.HARBOR_USER }}
HARBOR_PASS: ${{ secrets.HARBOR_PASS }}
- name: Push
run: docker push harbor.internal/featureflag/server:${{ github.sha }}
十二、关键非功能实现建议
十三、接口与代码生成
十四、示例规则 DSL(expr JSON)
十五、Webhook 与事件投递
十六、RBAC 模型建议
十七、示例加密适配器(AES-GCM)
十八、本地开发脚手架
十九、许可证与合规
二十、后续扩展建议
以上骨架与代码示例可直接作为项目起点。你可以先运行 docker-compose 启动 Postgres 与 Redis,执行 Prisma 迁移与种子,启动 server-ts,再运行 sidecar-go 订阅配置流与本地评估,然后用 SDK 或 CLI 进行验证。需要我把上述骨架打包为可下载的最小工作示例时,请进一步说明你偏好的包管理与代码生成工具版本。
下面给出一套可复用的“在线 + 批量”推理服务模板与实践指南,围绕 FastAPI + Pydantic 构建,支持模型版本仓、A/B 分流、批量作业、监控与追踪、签名校验、数据漂移检测等。示例遵循 MIT 许可,仅使用常见开源依赖(BSD/MIT/Apache-2.0),不包含任何外部数据上传逻辑,适配容器化、S3 兼容对象存储、RabbitMQ、Prometheus/Grafana、Airflow 调度。
一、项目结构(建议)
二、依赖与环境隔离
[project]
name = "inference-service"
version = "0.1.0"
description = "Reusable online & batch inference template"
license = { text = "MIT" }
requires-python = ">=3.10"
dependencies = [
"fastapi==0.115.5",
"uvicorn[standard]==0.32.0",
"pydantic==2.9.2",
"pydantic-settings==2.6.1",
"orjson==3.10.7",
"numpy==2.1.3",
"scikit-learn==1.5.2",
"aioboto3==13.1.1",
"boto3==1.35.57",
"aio-pika==9.4.1",
"prometheus-client==0.21.0",
"structlog==24.4.0",
"opentelemetry-api==1.27.0",
"opentelemetry-sdk==1.27.0",
"opentelemetry-instrumentation-fastapi==0.48b0",
"cryptography==43.0.1",
"xxhash==3.5.0"
]
[project.optional-dependencies]
gpu = ["torch==2.5.1"]
dev = ["pytest==8.3.3", "httpx==0.27.2", "pytest-asyncio==0.24.0", "ruff==0.7.3", "mypy==1.13.0", "bandit==1.7.9", "types-requests"]
s3 = ["aioboto3==13.1.1", "boto3==1.35.57"]
rabbit = ["aio-pika==9.4.1"]
[tool.uv] # uv 锁定与构建
dev-dependencies = ["pre-commit==4.0.1"]
name: inference-service
channels: [conda-forge, defaults]
dependencies:
- python=3.10
- pip=24.2
- pip:
- inference-service @ file://.
- -r requirements.txt # 如需,也可以让 uv 生成 requirements 静态快照
三、Docker 多阶段与非 root docker/Dockerfile
# syntax=docker/dockerfile:1.7-labs
FROM python:3.11-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PIP_DISABLE_PIP_VERSION_CHECK=1 \
UV_LINK_MODE=copy UV_SYSTEM_PYTHON=1
RUN apt-get update && apt-get install -y --no-install-recommends build-essential curl \
&& rm -rf /var/lib/apt/lists/*
FROM base AS builder
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN /root/.cargo/bin/uv sync --frozen --no-dev
COPY src ./src
RUN /root/.cargo/bin/uv sync --frozen --no-dev
FROM base AS runtime
# 非 root
RUN useradd -m -u 10001 appuser
WORKDIR /app
COPY --from=builder /app/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
COPY src ./src
COPY docker/gunicorn_conf.py ./gunicorn_conf.py
ENV GUNICORN_CMD_ARGS="--config gunicorn_conf.py"
ENV OMP_NUM_THREADS=1 MKL_NUM_THREADS=1 # 避免过多并行
EXPOSE 8000
USER appuser
CMD ["uvicorn", "inference_service.app:create_app", "--factory", "--host", "0.0.0.0", "--port", "8000"]
四、Docker Compose 一键起服 docker-compose.yml
version: "3.9"
services:
app:
build: ./docker
image: inference-service:latest
environment:
APP_ENV: dev
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: minioadmin
S3_SECRET_KEY: minioadmin
S3_BUCKET: models
RABBIT_URL: amqp://guest:guest@rabbitmq:5672/
PROMETHEUS_SCRAPE: "true"
MODEL_MANIFEST_S3_URI: s3://models/manifest.json
AB_CONFIG: '{"weights":{"v1":70,"v2":30}}'
depends_on: [rabbitmq, minio]
ports: ["8000:8000"]
worker:
image: inference-service:latest
command: ["uv", "run", "python", "-m", "inference_service.batch_worker"]
environment:
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: minioadmin
S3_SECRET_KEY: minioadmin
S3_BUCKET: datasets
RABBIT_URL: amqp://guest:guest@rabbitmq:5672/
depends_on: [rabbitmq, minio]
rabbitmq:
image: rabbitmq:3.13-management
ports: ["15672:15672", "5672:5672"]
minio:
image: minio/minio:RELEASE.2024-10-13T13-34-11Z
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports: ["9000:9000", "9001:9001"]
volumes: ["minio_data:/data"]
volumes:
minio_data:
五、核心代码片段
from pydantic_settings import BaseSettings
from pydantic import Field
from typing import Optional
class Settings(BaseSettings):
app_env: str = Field(default="prod", alias="APP_ENV")
s3_endpoint: str = Field(..., alias="S3_ENDPOINT")
s3_access_key: str = Field(..., alias="S3_ACCESS_KEY")
s3_secret_key: str = Field(..., alias="S3_SECRET_KEY")
s3_bucket: str = Field(..., alias="S3_BUCKET")
rabbit_url: str = Field(..., alias="RABBIT_URL")
model_manifest_s3_uri: str = Field(..., alias="MODEL_MANIFEST_S3_URI")
ab_config: Optional[str] = Field(default=None, alias="AB_CONFIG")
prometheus_scrape: bool = Field(default=True, alias="PROMETHEUS_SCRAPE")
allow_gpu: bool = Field(default=False, alias="ALLOW_GPU")
class Config:
extra = "ignore"
settings = Settings()
from pydantic import BaseModel, Field, conlist, AwareDatetime
from typing import List, Optional, Literal, Dict
class HealthResp(BaseModel):
status: Literal["ok"] = "ok"
version: str
model_active: str
class PredictRequest(BaseModel):
request_id: Optional[str] = None
model_tag: Optional[str] = None
features: conlist(float, min_length=3, max_length=1024) # 举例: 至少3维
extra: Optional[Dict[str, str]] = None
class PredictResponse(BaseModel):
request_id: str
model_tag: str
score: float = Field(ge=0.0, le=1.0)
latency_ms: float
breakdown_ms: Dict[str, float]
class BatchRequest(BaseModel):
job_id: Optional[str] = None
input_s3_uri: str # s3://bucket/path/input.csv
output_s3_uri: str
model_tag: Optional[str] = None
callback_url: Optional[str] = None
class BatchStatus(BaseModel):
job_id: str
status: Literal["queued", "running", "done", "failed"]
message: Optional[str] = None
class DriftReport(BaseModel):
model_tag: str
psi: float
threshold: float
exceeded: bool
ts: AwareDatetime
import structlog
import re
PII_KEYS = {"name", "email", "phone", "id_number"}
def _mask_value(v: str) -> str:
if not isinstance(v, str):
return v
if "@" in v:
return re.sub(r"(^.).*(@.*$)", r"*\2", v)
return v[:1] + "*" * max(0, len(v) - 2) + v[-1:] if len(v) > 2 else "**"
def pii_filter(logger, method, event_dict):
for k in list(event_dict.keys()):
if k.lower() in PII_KEYS:
event_dict[k] = _mask_value(str(event_dict[k]))
return event_dict
def configure_logging():
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
pii_filter,
structlog.processors.JSONRenderer(),
],
)
return structlog.get_logger()
import time
import uuid
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from pydantic import ValidationError
from .metrics import REQ_LATENCY, REQ_COUNT, ERR_COUNT
class RequestContextMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
req_id = request.headers.get("x-request-id", str(uuid.uuid4()))
start = time.perf_counter()
try:
response = await call_next(request)
status = response.status_code
except ValidationError as ve:
ERR_COUNT.labels(type="validation").inc()
return Response(
status_code=422, content=f"validation_error:{ve.errors()}", media_type="text/plain"
)
except Exception as e:
ERR_COUNT.labels(type="server").inc()
return Response(status_code=500, content="internal_error", media_type="text/plain")
finally:
dur = (time.perf_counter() - start) * 1000
REQ_LATENCY.observe(dur)
REQ_COUNT.inc()
response.headers["x-request-id"] = req_id
return response
from prometheus_client import Counter, Histogram, Gauge
REQ_COUNT = Counter("http_requests_total", "Total HTTP requests")
ERR_COUNT = Counter("errors_total", "Errors", labelnames=("type",))
LAT_HIST = Histogram("predict_latency_ms", "Predict latency ms", buckets=(1,5,10,20,40,60,80,100,200,500))
STAGE_HIST = Histogram("stage_latency_ms", "Stage latency ms", labelnames=("stage",))
MODEL_QPS = Counter("model_infer_total", "Model infer count", labelnames=("model",))
MODEL_ERR = Counter("model_infer_errors_total", "Model infer errors", labelnames=("model", "type"))
DRIFT_PSI = Gauge("model_drift_psi", "PSI value", labelnames=("model",))
import aioboto3
import io
from urllib.parse import urlparse
from .config import settings
def _parse_s3_uri(uri: str):
p = urlparse(uri)
return (p.netloc, p.path.lstrip("/"))
def s3_session():
return aioboto3.Session()
async def get_object_bytes(s3_uri: str) -> bytes:
bucket, key = _parse_s3_uri(s3_uri)
async with s3_session().client(
"s3",
endpoint_url=settings.s3_endpoint,
aws_secret_access_key=settings.s3_secret_key,
aws_access_key_id=settings.s3_access_key,
) as s3:
obj = await s3.get_object(Bucket=bucket, Key=key)
return await obj["Body"].read()
async def put_object_bytes(s3_uri: str, data: bytes):
bucket, key = _parse_s3_uri(s3_uri)
async with s3_session().client(
"s3",
endpoint_url=settings.s3_endpoint,
aws_secret_access_key=settings.s3_secret_key,
aws_access_key_id=settings.s3_access_key,
) as s3:
await s3.put_object(Bucket=bucket, Key=key, Body=data)
import hashlib
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
def sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def verify_signature(data: bytes, signature: bytes, public_pem: bytes) -> bool:
public_key = serialization.load_pem_public_key(public_pem)
public_key.verify(
signature,
data,
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
import random
import xxhash
from typing import Dict, Optional
class WeightedRouter:
def __init__(self, weights: Dict[str, int]):
self.weights = weights
self.variants = list(weights.keys())
self.buckets = []
s = 0
for v in self.variants:
s += weights[v]
self.buckets.append((s, v))
self.total = s
def pick(self, seed: Optional[str] = None) -> str:
r = random.randint(1, self.total) if seed is None else (xxhash.xxh32(seed).intdigest() % self.total) + 1
for up, v in self.buckets:
if r <= up:
return v
return self.variants[-1]
import asyncio
import json
from typing import Dict, Optional
from .storage import get_object_bytes
from .crypto import sha256, verify_signature
from .predictor import SklearnPredictor
from .metrics import MODEL_ERR
from .config import settings
class ModelRegistry:
def __init__(self):
self._models: Dict[str, SklearnPredictor] = {}
self._lock = asyncio.Lock()
self._manifest = {}
async def refresh_manifest(self):
data = await get_object_bytes(settings.model_manifest_s3_uri)
self._manifest = json.loads(data.decode("utf-8"))
async def load(self, tag: str):
async with self._lock:
if tag in self._models:
return self._models[tag]
meta = self._manifest["models"].get(tag)
if not meta:
raise KeyError(f"model tag not found: {tag}")
blob = await get_object_bytes(meta["artifact"])
if "sha256" in meta and sha256(blob) != meta["sha256"]:
raise ValueError("checksum mismatch")
if "signature" in meta and "public_key" in self._manifest:
sig = await get_object_bytes(meta["signature"])
pub = await get_object_bytes(self._manifest["public_key"])
verify_signature(blob, sig, pub)
predictor = SklearnPredictor.from_bytes(blob, device="cpu")
self._models[tag] = predictor
return predictor
def get(self, tag: str) -> Optional[SklearnPredictor]:
return self._models.get(tag)
async def hot_swap(self, tag: str):
await self.load(tag) # 先加载到内存
# 调整活跃权重由 routing 控制;无需停机
registry = ModelRegistry()
manifest.json 示例(S3 上):
{
"public_key": "s3://models/keys/public.pem",
"models": {
"v1": {"artifact": "s3://models/v1/model.pkl", "sha256": "abc123...", "signature": "s3://models/v1/model.sig"},
"v2": {"artifact": "s3://models/v2/model.pkl", "sha256": "def456...", "signature": "s3://models/v2/model.sig"}
}
}
import numpy as np
from typing import List
def normalize_minmax(x: List[float]):
"""
>>> normalize_minmax([0, 5, 10])
array([0. , 0.5, 1. ])
"""
arr = np.asarray(x, dtype=np.float32)
mn, mx = arr.min(), arr.max()
if mx == mn:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
import io, pickle, time
import numpy as np
from typing import Dict, Any, Optional
class SklearnPredictor:
def __init__(self, model):
self.model = model
@classmethod
def from_bytes(cls, blob: bytes, device: str = "cpu"):
model = pickle.loads(blob)
return cls(model)
def predict_proba(self, feats: np.ndarray) -> float:
# 这里假设二分类,取正类概率
proba = self.model.predict_proba(feats.reshape(1, -1))[0, 1]
return float(proba)
# GPU 可选(示意)
try:
import torch
class TorchPredictor:
def __init__(self, model, device="cpu"):
self.model = model.to(device)
self.device = device
@classmethod
def from_bytes(cls, blob: bytes, device="cpu"):
buffer = io.BytesIO(blob)
model = torch.jit.load(buffer, map_location=device)
model.eval()
return cls(model, device)
@torch.inference_mode()
def predict(self, feats: np.ndarray) -> float:
x = torch.from_numpy(feats).float().to(self.device)
y = self.model(x)
return float(torch.sigmoid(y).item())
except Exception:
TorchPredictor = None
def clip_score(s: float, lo: float = 0.0, hi: float = 1.0) -> float:
return max(lo, min(hi, s))
import numpy as np
from .metrics import DRIFT_PSI
def psi(expected: np.ndarray, actual: np.ndarray, bins=10):
quantiles = np.linspace(0, 1, bins+1)
e_cuts = np.quantile(expected, quantiles)
a_cuts = e_cuts # 用 expected 分箱
e_hist, _ = np.histogram(expected, bins=e_cuts)
a_hist, _ = np.histogram(actual, bins=a_cuts)
e_rat = np.clip(e_hist / max(1, e_hist.sum()), 1e-6, 1)
a_rat = np.clip(a_hist / max(1, a_hist.sum()), 1e-6, 1)
val = float(np.sum((a_rat - e_rat) * np.log(a_rat / e_rat)))
return val
def report_and_export(model_tag: str, expected: np.ndarray, actual: np.ndarray, threshold=0.2) -> dict:
v = psi(expected, actual)
DRIFT_PSI.labels(model=model_tag).set(v)
return {"model_tag": model_tag, "psi": v, "threshold": threshold, "exceeded": v > threshold}
import time
import numpy as np
from fastapi import APIRouter, Depends
from fastapi.responses import ORJSONResponse, PlainTextResponse
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from .schemas import HealthResp, PredictRequest, PredictResponse, BatchRequest, BatchStatus
from .model_registry import registry
from .routing import WeightedRouter
from .preprocessing import normalize_minmax
from .postprocessing import clip_score
from .metrics import LAT_HIST, STAGE_HIST, MODEL_QPS, MODEL_ERR
from .config import settings
from .batch_worker import enqueue_job
router = APIRouter()
router_ab = WeightedRouter(weights={"v1":70, "v2":30})
@router.get("/health", response_model=HealthResp)
async def health():
return HealthResp(version="0.1.0", model_active="dynamic")
@router.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
t0 = time.perf_counter()
tag = req.model_tag or router_ab.pick(seed=req.request_id or str(req.features[0]))
t_pre = time.perf_counter()
feats = normalize_minmax(req.features)
t_infer_s = time.perf_counter()
model = registry.get(tag)
if not model:
model = await registry.load(tag)
try:
score = model.predict_proba(np.asarray(feats))
except Exception as e:
MODEL_ERR.labels(model=tag, type="infer").inc()
raise
score = clip_score(score)
t1 = time.perf_counter()
MODEL_QPS.labels(model=tag).inc()
total_ms = (t1 - t0)*1000
LAT_HIST.observe(total_ms)
STAGE_HIST.labels("preprocess").observe((t_infer_s - t_pre)*1000)
STAGE_HIST.labels("inference").observe((t1 - t_infer_s)*1000)
return ORJSONResponse(
PredictResponse(
request_id=req.request_id or "",
model_tag=tag, score=score,
latency_ms=total_ms,
breakdown_ms={"preprocess": (t_infer_s-t_pre)*1000, "inference": (t1-t_infer_s)*1000}
).model_dump()
)
@router.post("/batch", response_model=BatchStatus)
async def batch(req: BatchRequest):
job_id = await enqueue_job(req)
return BatchStatus(job_id=job_id, status="queued")
@router.get("/metrics")
async def metrics():
data = generate_latest()
return PlainTextResponse(data, media_type=CONTENT_TYPE_LATEST)
import asyncio, csv, io, json, uuid
import aio_pika
import numpy as np
from .config import settings
from .model_registry import registry
from .preprocessing import normalize_minmax
from .postprocessing import clip_score
from .storage import get_object_bytes, put_object_bytes
QUEUE = "batch_jobs"
async def enqueue_job(req) -> str:
job_id = req.job_id or str(uuid.uuid4())
payload = req.model_dump()
payload["job_id"] = job_id
conn = await aio_pika.connect_robust(settings.rabbit_url)
async with conn:
ch = await conn.channel()
q = await ch.declare_queue(QUEUE, durable=True)
await ch.default_exchange.publish(
aio_pika.Message(body=json.dumps(payload).encode(), delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key=QUEUE,
)
return job_id
async def worker():
conn = await aio_pika.connect_robust(settings.rabbit_url)
async with conn:
ch = await conn.channel()
await ch.set_qos(prefetch_count=8)
q = await ch.declare_queue(QUEUE, durable=True)
async with q.iterator() as qiter:
async for msg in qiter:
async with msg.process():
job = json.loads(msg.body.decode())
await process_job(job)
async def process_job(job: dict):
tag = job.get("model_tag") or "v1"
model = registry.get(tag) or await registry.load(tag)
raw = await get_object_bytes(job["input_s3_uri"])
# 假设 CSV 一行一个样本,逗号分隔
reader = csv.reader(io.StringIO(raw.decode()))
scores = []
for row in reader:
feats = [float(x) for x in row]
feats = normalize_minmax(feats)
s = clip_score(model.predict_proba(np.asarray(feats)))
scores.append(s)
out = "\n".join(map(str, scores)).encode()
await put_object_bytes(job["output_s3_uri"], out)
# 可选:回调通知
# 若有 job["callback_url"],在内网触发 HTTP 回调
if __name__ == "__main__":
asyncio.run(worker())
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .api import router
from .middleware import RequestContextMiddleware
from .logging import configure_logging
from .model_registry import registry
from .config import settings
def create_app() -> FastAPI:
log = configure_logging()
app = FastAPI(title="Inference Service", version="0.1.0")
app.add_middleware(RequestContextMiddleware)
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
app.include_router(router)
@app.on_event("startup")
async def _startup():
await registry.refresh_manifest()
# 可选预热: 加载主流量模型
# await registry.load("v1")
return app
六、API 契约文档(概要)
提示:FastAPI 自动生成 OpenAPI 文档,可在 /docs 查看契约细节。
七、灰度与回滚方案
八、批量离线作业入口(Airflow 对接)
九、错误处理与数据校验
十、性能优化建议(目标:CPU p95 ≤ 60ms,200 RPS 单实例)
十一、监控与追踪
十二、模型打包与签名
十三、数据漂移与阈值告警
十四、单元测试与 doctest tests/test_preprocess.py
from inference_service.preprocessing import normalize_minmax
import numpy as np
def test_normalize():
out = normalize_minmax([0, 5, 10])
assert np.isclose(out[1], 0.5)
tests/test_postprocess.py
from inference_service.postprocessing import clip_score
def test_clip():
assert clip_score(1.2) == 1.0
assert clip_score(-0.1) == 0.0
tests/test_api_smoke.py
import pytest, httpx, asyncio
from inference_service.app import create_app
from asgiref.wsgi import WsgiToAsgi
@pytest.mark.asyncio
async def test_predict_smoke():
app = create_app()
async with httpx.AsyncClient(app=app, base_url="http://test") as c:
resp = await c.post("/predict", json={"features":[1,2,3]})
assert resp.status_code == 200
十五、本地开发脚手架 Makefile
.PHONY: setup lint test run docker-build compose-up smoke
setup:
uv sync --all-extras --frozen
pre-commit install
lint:
ruff check .
mypy src
test:
pytest -q
python -m doctest -v src/inference_service/preprocessing.py
run:
uv run uvicorn inference_service.app:create_app --factory --reload
docker-build:
docker build -t inference-service:latest -f docker/Dockerfile .
compose-up:
docker compose up -d --build
smoke:
bash scripts/smoke_test.sh
.pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.3
hooks: [{id: ruff}]
- repo: https://github.com/psf/black
rev: 24.10.0
hooks: [{id: black}]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9
hooks: [{id: bandit}]
scripts/smoke_test.sh
#!/usr/bin/env bash
set -e
curl -sf http://localhost:8000/health | jq .
curl -sf -X POST http://localhost:8000/predict -H 'content-type: application/json' \
-d '{"features":[1,2,3],"request_id":"smoke-1"}' | jq .
十六、压力测试指导
十七、热更新无中断
十八、安全与合规
十九、GPU 分支(可选)
二十、冷启动与复现
这套模板提供了端到端的落地骨架。你可以直接 clone(或复制上述文件结构与代码片段)后,接入你们的模型与特征逻辑,按需在 manifest.json 中管理版本与签名,在 docker-compose 环境完成一键起服与冒烟/压测,随后对接内网 Prometheus/Grafana 与 Airflow 即可。若你提供具体模型与特征定义,我可以进一步补全示例模型的打包脚本和 manifest 生成器,以及生产级 gunicorn/反向代理配置。
为开发者提供高效的编程支持和指导,解决日常编程中的痛点问题,并帮助他们优化代码质量、加速调试流程,选择合适的开发工具和技术方案。
快速上手多种编程语言,获得实时指导和代码优化建议,轻松完成小型项目或单功能实现。
提升编码效率与代码质量,高效解决工作中遇到的技术问题并优化当前项目开发流程。
找到适合团队或项目目标的工具、库与框架,推动团队实施更高效的开发方案。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期