生成基于matplotlib的数据可视化Python脚本,技术性强且结构清晰。
下面给出一套可直接运行的标准化可视化脚本(仅使用matplotlib+numpy+pandas)。脚本包含统一风格设置,并输出三张图稿: - 新用户7日留存热力图 - 注册→激活→付费漏斗图 - 渠道转化链路热力图 + 分群(按渠道)转化趋势图 数据未提供时将自动生成示例数据;若提供CSV,将优先读取。脚本内注明各数据文件的标准字段。 代码(保存为plot_growth_analytics.py): ```python # -*- coding: utf-8 -*- """ 标准化图稿:新用户7日留存、注册→激活→付费漏斗、渠道转化链路与分群趋势 - 依赖: pandas, numpy, matplotlib - 数据来源: 1) retention.csv: 队列留存(按注册日期聚合) 列: cohort_date, size, d1, d2, d3, d4, d5, d6, d7 (d1~d7为比例 0~1) 2) funnel.csv: 总体漏斗 列: stage, count (stage取值: 注册, 激活, 付费) 3) channel.csv: 渠道按日数据 列: date, channel, registered, activated, paid 若CSV文件不存在,自动生成模拟数据。 输出: - retention_heatmap.png - funnel.png - channel_chain_trend.png """ import os from pathlib import Path import numpy as np import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt from datetime import datetime, timedelta # --------------------------- # 全局风格与常量 # --------------------------- def set_style(): mpl.rcParams.update({ "figure.dpi": 120, "savefig.dpi": 150, "figure.figsize": (12, 7), "axes.titlesize": 14, "axes.labelsize": 12, "axes.grid": True, "grid.linestyle": "--", "grid.linewidth": 0.5, "axes.edgecolor": "#333333", "axes.linewidth": 0.8, "xtick.labelsize": 10, "ytick.labelsize": 10, "legend.fontsize": 10, "legend.frameon": False, "font.sans-serif": ["Noto Sans CJK SC", "Microsoft YaHei", "SimHei", "Arial", "DejaVu Sans"], "axes.unicode_minus": False, }) PALETTE = { "brand_blue": "#2F6BFF", "brand_green": "#2CB67D", "brand_orange": "#FF7F3F", "brand_purple": "#7C5CFF", "gray": "#7A7A7A", "bg": "#F7F8FA", } OUT_RET = "retention_heatmap.png" OUT_FUNNEL = "funnel.png" OUT_CHANNEL = "channel_chain_trend.png" FILE_RET = "retention.csv" FILE_FUNNEL = "funnel.csv" FILE_CHANNEL = "channel.csv" np.random.seed(42) # --------------------------- # 数据加载或生成 # --------------------------- def load_or_mock_retention(path: str | Path) -> pd.DataFrame: path = Path(path) if path.exists(): df = pd.read_csv(path) # 校验列 required = ["cohort_date", "size"] + [f"d{i}" for i in range(1, 8)] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"retention.csv缺少字段: {missing}") # 类型处理 df["cohort_date"] = pd.to_datetime(df["cohort_date"]).dt.date # 安全裁剪比例 for c in [f"d{i}" for i in range(1, 8)]: df[c] = pd.to_numeric(df[c], errors="coerce").clip(lower=0, upper=1) df["size"] = pd.to_numeric(df["size"], errors="coerce").fillna(0).astype(int) df = df.sort_values("cohort_date", ascending=False) return df # 生成模拟队列(最近10个注册日) today = datetime.today().date() cohorts = [today - timedelta(days=i) for i in range(9, -1, -1)] data = [] base_sizes = np.random.randint(800, 2000, size=len(cohorts)) for i, c in enumerate(cohorts): size = int(base_sizes[i]) # 构造一个递减且带噪声的留存曲线 d = np.maximum(0, 0.42 - 0.05*np.arange(1, 8) + np.random.normal(0, 0.015, 7)) d = np.clip(d, 0.02, 0.6) data.append([c, size, *d]) df = pd.DataFrame(data, columns=["cohort_date", "size"] + [f"d{i}" for i in range(1, 8)]) df = df.sort_values("cohort_date", ascending=False) return df def load_or_mock_funnel(path: str | Path) -> pd.DataFrame: path = Path(path) if path.exists(): df = pd.read_csv(path) required = ["stage", "count"] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"funnel.csv缺少字段: {missing}") # 强制顺序 order = ["注册", "激活", "付费"] df["stage"] = pd.Categorical(df["stage"], categories=order, ordered=True) df = df.sort_values("stage") df["count"] = pd.to_numeric(df["count"], errors="coerce").fillna(0).astype(int) return df # 模拟 stages = ["注册", "激活", "付费"] counts = [50000, 32000, 9000] df = pd.DataFrame({"stage": stages, "count": counts}) df["stage"] = pd.Categorical(df["stage"], categories=stages, ordered=True) return df def load_or_mock_channel(path: str | Path) -> pd.DataFrame: path = Path(path) if path.exists(): df = pd.read_csv(path) required = ["date", "channel", "registered", "activated", "paid"] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"channel.csv缺少字段: {missing}") df["date"] = pd.to_datetime(df["date"]).dt.date for c in ["registered", "activated", "paid"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int) return df # 模拟近8周x 4渠道 end = datetime.today().date() dates = [end - timedelta(days=i) for i in range(55, -1, -1)] # 56天 channels = ["渠道A", "渠道B", "渠道C", "渠道D"] rows = [] for d in dates: for ch in channels: base = {"渠道A": 1200, "渠道B": 800, "渠道C": 500, "渠道D": 300}[ch] # 周期性与噪声 reg = max(0, int(base * (1 + 0.2*np.sin((d.toordinal()%14)/14*2*np.pi)) + np.random.normal(0, base*0.05))) act = int(reg * np.clip(0.55 + np.random.normal(0, 0.03), 0.35, 0.85)) pay = int(act * np.clip(0.28 + np.random.normal(0, 0.02), 0.12, 0.5)) rows.append([d, ch, reg, act, pay]) df = pd.DataFrame(rows, columns=["date", "channel", "registered", "activated", "paid"]) return df # --------------------------- # 绘图函数 # --------------------------- def fmt_pct(x: float) -> str: return f"{x*100:.0f}%" def pick_text_color(bg_rgb): # 依据亮度选择黑/白文字 r, g, b = bg_rgb[:3] luminance = 0.2126*r + 0.7152*g + 0.0722*b return "black" if luminance > 0.6 else "white" def plot_retention_heatmap(df_ret: pd.DataFrame, out_path: str = OUT_RET): set_style() fig, ax = plt.subplots(figsize=(12, 7)) # 构造矩阵(d0=100%) days = ["D0", "D1", "D2", "D3", "D4", "D5", "D6", "D7"] mat = np.c_[np.ones(len(df_ret)), df_ret[[f"d{i}" for i in range(1, 8)]].values] # 显示 cmap = plt.get_cmap("YlGnBu") im = ax.imshow(mat, aspect="auto", cmap=cmap, vmin=0, vmax=1) ax.set_title("新用户7日留存(按注册队列)", pad=12) ax.set_xlabel("天数") ax.set_ylabel("注册队列(cohort_date)") ax.set_xticks(range(len(days)), days) ax.set_yticks(range(len(df_ret)), [str(d) for d in df_ret["cohort_date"]]) # 标注格内数值 for i in range(mat.shape[0]): for j in range(mat.shape[1]): val = mat[i, j] rgba = cmap(val) color = pick_text_color(rgba) ax.text(j, i, fmt_pct(val), ha="center", va="center", color=color, fontsize=9) # 颜色条 cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.ax.set_ylabel("留存率", rotation=-90, va="bottom") # 右侧附加信息:队列规模 for i, size in enumerate(df_ret["size"].tolist()): ax.text(len(days)+0.2, i, f"n={size}", va="center", fontsize=9, color=PALETTE["gray"]) ax.set_xlim(-0.5, len(days)+1.0) ax.set_facecolor(PALETTE["bg"]) plt.tight_layout() plt.savefig(out_path, bbox_inches="tight") plt.close(fig) def plot_funnel(df_funnel: pd.DataFrame, out_path: str = OUT_FUNNEL): set_style() fig, ax = plt.subplots(figsize=(10, 6)) stages = df_funnel["stage"].tolist() counts = df_funnel["count"].astype(float).tolist() max_count = max(counts) if counts else 1.0 # 计算转化率 rate2prev = [1.0] for i in range(1, len(counts)): denom = counts[i-1] if counts[i-1] > 0 else np.nan rate2prev.append((counts[i] / denom) if denom else np.nan) rate2first = [(c / max_count) if max_count > 0 else np.nan for c in counts] # 居中漏斗效果:使用左右留白 y = np.arange(len(stages))[::-1] # 顶部是注册 left = [(max_count - c)/2 for c in counts] colors = [PALETTE["brand_blue"], PALETTE["brand_green"], PALETTE["brand_orange"]] bars = ax.barh(y=y, width=counts, left=left, color=colors[:len(counts)], edgecolor="none", alpha=0.9) ax.set_title("注册→激活→付费 漏斗", pad=12) ax.set_yticks(y, stages) ax.set_xlabel("人数") ax.grid(axis="x", linestyle="--", alpha=0.5) # 标注文字:绝对值 + 环比转化率 for i, b in enumerate(bars): c = counts[i] ax.text(b.get_x() + b.get_width()/2, b.get_y() + b.get_height()/2, f"{int(c):,}\n({fmt_pct(rate2prev[i]) if not np.isnan(rate2prev[i]) else 'NA'})", ha="center", va="center", color="white", fontsize=10, weight="bold") # 辅助信息:首段基准 ax.text(0.98, 0.02, f"首段基准={int(max_count):,}", transform=ax.transAxes, ha="right", va="bottom", color=PALETTE["gray"]) ax.set_facecolor(PALETTE["bg"]) plt.tight_layout() plt.savefig(out_path, bbox_inches="tight") plt.close(fig) def plot_channel_chain_and_trend(df_channel: pd.DataFrame, out_path: str = OUT_CHANNEL, topn: int = 4): set_style() # 聚合与计算转化 df = df_channel.copy() df["date"] = pd.to_datetime(df["date"]) # 按渠道汇总(用于链路热力图) grp_ch = df.groupby("channel", as_index=False)[["registered", "activated", "paid"]].sum() grp_ch = grp_ch.sort_values("registered", ascending=False) top_channels = grp_ch["channel"].head(topn).tolist() grp_ch = grp_ch[grp_ch["channel"].isin(top_channels)].copy() grp_ch["reg_to_act"] = grp_ch["activated"] / grp_ch["registered"].replace(0, np.nan) grp_ch["reg_to_pay"] = grp_ch["paid"] / grp_ch["registered"].replace(0, np.nan) grp_ch["act_to_pay"] = grp_ch["paid"] / grp_ch["activated"].replace(0, np.nan) # 趋势(按日,取TopN渠道) df_top = df[df["channel"].isin(top_channels)].copy() daily = (df_top.groupby(["date", "channel"], as_index=False) .agg(registered=("registered", "sum"), paid=("paid", "sum"))) daily["pay_rate"] = daily["paid"] / daily["registered"].replace(0, np.nan) # 按周重采样更平滑(可选) # 这里保留日级,若需周级:daily = daily.set_index("date").groupby("channel").resample("W")... fig = plt.figure(figsize=(14, 6.5)) gs = fig.add_gridspec(1, 2, width_ratios=[1, 1.2], wspace=0.2) # 左侧:渠道转化链路热力图 ax1 = fig.add_subplot(gs[0, 0]) stages = ["注册→激活", "注册→付费", "激活→付费"] mat = grp_ch[["reg_to_act", "reg_to_pay", "act_to_pay"]].values cmap = plt.get_cmap("Greens") im = ax1.imshow(mat, aspect="auto", cmap=cmap, vmin=0, vmax=1) ax1.set_title("渠道转化链路(Top渠道)", pad=12) ax1.set_xticks(range(len(stages)), stages) ax1.set_yticks(range(len(grp_ch)), grp_ch["channel"].tolist()) # 标注格内数值 for i in range(mat.shape[0]): for j in range(mat.shape[1]): val = mat[i, j] rgba = cmap(0 if np.isnan(val) else val) color = pick_text_color(rgba) text = "NA" if np.isnan(val) else fmt_pct(val) ax1.text(j, i, text, ha="center", va="center", color=color, fontsize=10) cbar = fig.colorbar(im, ax=ax1, fraction=0.046, pad=0.04) cbar.ax.set_ylabel("转化率", rotation=-90, va="bottom") ax1.set_facecolor(PALETTE["bg"]) # 右侧:分群(按渠道)转化趋势 ax2 = fig.add_subplot(gs[0, 1]) ax2.set_title("按渠道的付费转化率趋势", pad=12) colors = [PALETTE["brand_blue"], PALETTE["brand_green"], PALETTE["brand_orange"], PALETTE["brand_purple"]] for idx, ch in enumerate(top_channels): sub = daily[daily["channel"] == ch].sort_values("date") ax2.plot(sub["date"], sub["pay_rate"], label=ch, color=colors[idx % len(colors)], linewidth=2) ax2.set_xlabel("日期") ax2.set_ylabel("付费转化率") ax2.yaxis.set_major_formatter(mpl.ticker.PercentFormatter(1.0)) ax2.grid(True, axis="both", linestyle="--", alpha=0.5) ax2.legend(ncol=2, loc="upper left") ax2.set_facecolor(PALETTE["bg"]) plt.tight_layout() plt.savefig(out_path, bbox_inches="tight") plt.close(fig) # --------------------------- # 主流程 # --------------------------- def main(): # 加载或生成数据 df_ret = load_or_mock_retention(FILE_RET) df_fun = load_or_mock_funnel(FILE_FUNNEL) df_ch = load_or_mock_channel(FILE_CHANNEL) # 绘图 plot_retention_heatmap(df_ret, OUT_RET) plot_funnel(df_fun, OUT_FUNNEL) plot_channel_chain_and_trend(df_ch, OUT_CHANNEL) print(f"输出完成:\n- {OUT_RET}\n- {OUT_FUNNEL}\n- {OUT_CHANNEL}") if __name__ == "__main__": main() ``` 使用说明与数据规范 - 运行 - python3 plot_growth_analytics.py - 若同目录存在retention.csv、funnel.csv、channel.csv,将优先读取;否则自动生成示例数据。 - retention.csv - 字段: cohort_date(日期), size(队列规模), d1~d7(1~7日留存率,0~1) - 示例: 2025-08-01, 1500, 0.36, 0.31, 0.28, 0.24, 0.21, 0.19, 0.17 - funnel.csv - 字段: stage(注册|激活|付费), count(人数) - channel.csv - 字段: date(日期), channel(渠道名), registered, activated, paid - 按日汇总的绝对人数。脚本计算注册→激活、注册→付费、激活→付费的转化率,并绘制按渠道的付费转化率趋势。 可视化标准化要点(已在脚本中实现) - 统一风格:字体、配色、网格、坐标轴线宽、DPI、留白。 - 颜色规范:主色(蓝、绿、橙、紫)用于不同指标或渠道,背景浅灰以提升对比。 - 数值标注:热力图单元格和漏斗条内均标注百分比;自动选择黑/白文字以保证对比度。 - 比例表现:所有转化率均以0~1表示,图中渲染为百分比。 - 布局:标题、轴标签、图例位置与字号统一,图像导出为PNG,适合嵌入报告或看板。 数据挖掘建议(与上述图稿联动) - 留存:优先监控近几期队列的D1与D3陡降点,结合产品事件日志做分段对比(如首日关键动作完成率)。 - 漏斗:在漏斗各层插入“归因切片”(如渠道、端、版本)进行分层输出,定位瓶颈层的异质性。 - 渠道:以注册→付费为主目标,注册→激活为中间目标,热力图发现弱项渠道后,在趋势图验证其是否结构性偏低或阶段性波动。结合投放日历与预算做前后窗口对照检验。
Below is a self-contained, reproducible Python script that uses matplotlib to visualize experimental and control observations. It includes data loading, preprocessing (NaN handling and robust winsorization), statistical estimation (bootstrap confidence intervals), and three plots: distribution, mean with error intervals, and scatter correlation with a bootstrap-based regression band. Usage: - If you have a CSV, provide either: - Wide format: columns "control" and "experiment" (paired). - Long format: columns "group" (values: "control" or "experiment") and "value". Optional "subject_id" for pairing. - If no input is provided, the script generates synthetic reproducible data. - Example: - python visualize_groups.py --input your_data.csv --outdir figures - python visualize_groups.py (uses synthetic data) Required: Python 3.9+, numpy, pandas, matplotlib. Optional: scipy for KDE. If SciPy is not available, the script falls back to histograms without KDE. Script (save as visualize_groups.py): import argparse import sys from pathlib import Path import numpy as np import pandas as pd import matplotlib.pyplot as plt # Optional SciPy try: from scipy.stats import gaussian_kde SCIPY_AVAILABLE = True except Exception: SCIPY_AVAILABLE = False def set_seed(seed: int = 42): np.random.seed(seed) def winsorize_by_mad(x: np.ndarray, zmax: float = 3.0) -> np.ndarray: """ Robust winsorization based on MAD. Caps values beyond zmax * MAD from the median. """ x = np.asarray(x).astype(float) median = np.median(x) mad = np.median(np.abs(x - median)) if mad == 0: return x # No variability; nothing to winsorize z = (x - median) / (1.4826 * mad) # 1.4826 scales MAD to be comparable to std under normality x_w = x.copy() x_w[z > zmax] = median + zmax * 1.4826 * mad x_w[z < -zmax] = median - zmax * 1.4826 * mad return x_w def bootstrap_mean_ci(x: np.ndarray, n_boot: int = 5000, alpha: float = 0.05, seed: int = 123): """ Bootstrap mean and 95% CI (percentile) for a 1D array. Returns: mean, ci_low, ci_high, se_boot """ x = np.asarray(x).astype(float) x = x[~np.isnan(x)] rng = np.random.default_rng(seed) boots = rng.choice(x, (n_boot, x.size), replace=True).mean(axis=1) mean = x.mean() se_boot = boots.std(ddof=1) ci_low = np.quantile(boots, alpha / 2) ci_high = np.quantile(boots, 1 - alpha / 2) return mean, ci_low, ci_high, se_boot def fisher_r_ci(r: float, n: int, alpha: float = 0.05): """ Fisher z-transformation CI for Pearson correlation r with sample size n. Returns (ci_low, ci_high). Assumes |r|<1 and n>3. """ r = np.clip(r, -0.999999, 0.999999) z = np.arctanh(r) se = 1 / np.sqrt(n - 3) z_low = z - 1.96 * se z_high = z + 1.96 * se return np.tanh(z_low), np.tanh(z_high) def load_data(csv_path: Path | None): """ Load data from CSV, or generate synthetic paired data if csv_path is None. Returns: control, experiment, paired (bool) """ if csv_path is None: # Synthetic, reproducible paired data set_seed(42) n = 200 control = np.random.normal(loc=50, scale=10, size=n) effect = 5.0 experiment = control + effect + np.random.normal(loc=0, scale=8, size=n) # Inject some missing values miss_idx = np.random.choice(n, size=int(0.05 * n), replace=False) control[miss_idx[: len(miss_idx)//2]] = np.nan experiment[miss_idx[len(miss_idx)//2:]] = np.nan return control, experiment, True df = pd.read_csv(csv_path) cols = {c.lower(): c for c in df.columns} if "control" in cols and "experiment" in cols: control = df[cols["control"]].to_numpy(dtype=float) experiment = df[cols["experiment"]].to_numpy(dtype=float) return control, experiment, True if "group" in cols and "value" in cols: # Long format; attempt pairing by subject_id if present gcol = cols["group"] vcol = cols["value"] df = df[[gcol, vcol] + ([cols["subject_id"]] if "subject_id" in cols else [])].copy() df[gcol] = df[gcol].str.lower().str.strip() df = df[df[gcol].isin(["control", "experiment"])] if "subject_id" in cols: # Pivot to paired wide wide = df.pivot_table(index=cols["subject_id"], columns=gcol, values=vcol, aggfunc="mean") control = wide.get("control", pd.Series(index=wide.index, dtype=float)).to_numpy() experiment = wide.get("experiment", pd.Series(index=wide.index, dtype=float)).to_numpy() return control, experiment, True else: # Unpaired control = df.loc[df[gcol] == "control", vcol].to_numpy(dtype=float) experiment = df.loc[df[gcol] == "experiment", vcol].to_numpy(dtype=float) return control, experiment, False raise ValueError("CSV must contain either (control, experiment) columns or (group, value) [and optionally subject_id].") def preprocess(control: np.ndarray, experiment: np.ndarray, paired: bool, zmax: float = 3.0): """ - Remove NaN/inf. - Robust winsorization by MAD. - If paired, keep rows where both are valid. """ control = np.asarray(control, dtype=float) experiment = np.asarray(experiment, dtype=float) if paired: mask = np.isfinite(control) & np.isfinite(experiment) control, experiment = control[mask], experiment[mask] control = winsorize_by_mad(control, zmax=zmax) experiment = winsorize_by_mad(experiment, zmax=zmax) else: control = control[np.isfinite(control)] experiment = experiment[np.isfinite(experiment)] control = winsorize_by_mad(control, zmax=zmax) experiment = winsorize_by_mad(experiment, zmax=zmax) return control, experiment def kde_or_none(x: np.ndarray, grid: np.ndarray): if SCIPY_AVAILABLE and x.size > 1: try: kde = gaussian_kde(x) return kde(grid) except Exception: return None return None def plot_distributions(control: np.ndarray, experiment: np.ndarray, outdir: Path): plt.figure(figsize=(8, 5), dpi=120) bins = max(10, int(np.sqrt(control.size + experiment.size))) # Histograms plt.hist(control, bins=bins, density=True, alpha=0.4, color="#1f77b4", label=f"Control (n={control.size})") plt.hist(experiment, bins=bins, density=True, alpha=0.4, color="#ff7f0e", label=f"Experiment (n={experiment.size})") # KDE curves if available xmin = min(np.min(control), np.min(experiment)) xmax = max(np.max(control), np.max(experiment)) grid = np.linspace(xmin, xmax, 200) d_control = kde_or_none(control, grid) d_experiment = kde_or_none(experiment, grid) if d_control is not None: plt.plot(grid, d_control, color="#1f77b4", lw=2) if d_experiment is not None: plt.plot(grid, d_experiment, color="#ff7f0e", lw=2) # Means plt.axvline(control.mean(), color="#1f77b4", ls="--", lw=1) plt.axvline(experiment.mean(), color="#ff7f0e", ls="--", lw=1) plt.title("Distribution: Control vs Experiment") plt.xlabel("Value") plt.ylabel("Density") plt.legend() plt.tight_layout() fp = outdir / "01_distributions.png" plt.savefig(fp, bbox_inches="tight") plt.close() return fp def plot_error_intervals(control: np.ndarray, experiment: np.ndarray, paired: bool, outdir: Path): m_c, ci_c_low, ci_c_high, se_c = bootstrap_mean_ci(control) m_e, ci_e_low, ci_e_high, se_e = bootstrap_mean_ci(experiment) # Effect size: mean difference if paired and control.size == experiment.size: diff = experiment - control md, md_low, md_high, _ = bootstrap_mean_ci(diff) effect_text = f"Mean diff (paired): {md:.2f} [{md_low:.2f}, {md_high:.2f}]" else: md = m_e - m_c # Unpaired bootstrap of difference rng = np.random.default_rng(123) n_boot = 5000 boots = [] for _ in range(n_boot): b_c = rng.choice(control, control.size, replace=True).mean() b_e = rng.choice(experiment, experiment.size, replace=True).mean() boots.append(b_e - b_c) md_low, md_high = np.quantile(boots, [0.025, 0.975]) effect_text = f"Mean diff (unpaired): {md:.2f} [{md_low:.2f}, {md_high:.2f}]" plt.figure(figsize=(7, 5), dpi=120) means = [m_c, m_e] ci_lows = [ci_c_low, ci_e_low] ci_highs = [ci_c_high, ci_e_high] x = np.arange(2) colors = ["#1f77b4", "#ff7f0e"] plt.errorbar(x, means, yerr=[np.array(means) - np.array(ci_lows), np.array(ci_highs) - np.array(means)], fmt="o", capsize=5, color="black", ecolor="black") plt.scatter(x, means, c=colors, s=80, zorder=3) plt.xticks(x, ["Control", "Experiment"]) plt.ylabel("Mean ± 95% CI") plt.title("Group Means with 95% Confidence Intervals") plt.text(0.5, 0.05, effect_text, transform=plt.gca().transAxes, ha="center") plt.tight_layout() fp = outdir / "02_error_intervals.png" plt.savefig(fp, bbox_inches="tight") plt.close() return fp def plot_scatter_correlation(control: np.ndarray, experiment: np.ndarray, paired: bool, outdir: Path): if not paired or control.size != experiment.size: # Skip if not paired return None x = control y = experiment n = x.size # Pearson r x_c = x - x.mean() y_c = y - y.mean() r = (x_c @ y_c) / (np.sqrt((x_c**2).sum()) * np.sqrt((y_c**2).sum())) r_low, r_high = fisher_r_ci(r, n) # Regression line and bootstrap band beta = np.polyfit(x, y, deg=1) # y = beta[0]*x + beta[1] x_line = np.linspace(x.min(), x.max(), 200) y_line = beta[0] * x_line + beta[1] # Bootstrap regression uncertainty rng = np.random.default_rng(123) n_boot = 2000 y_boot = np.empty((n_boot, x_line.size)) idx = np.arange(n) for i in range(n_boot): b = rng.choice(idx, size=n, replace=True) xb, yb = x[b], y[b] bcoef = np.polyfit(xb, yb, deg=1) y_boot[i] = bcoef[0] * x_line + bcoef[1] band_low = np.quantile(y_boot, 0.025, axis=0) band_high = np.quantile(y_boot, 0.975, axis=0) plt.figure(figsize=(7, 6), dpi=120) plt.scatter(x, y, alpha=0.7, s=40, color="#2ca02c", edgecolor="white", linewidth=0.5) plt.plot(x_line, y_line, color="black", lw=2, label="OLS fit") plt.fill_between(x_line, band_low, band_high, color="gray", alpha=0.2, label="95% bootstrap band") plt.xlabel("Control") plt.ylabel("Experiment") plt.title("Scatter and Correlation (Paired)") plt.legend() plt.text(0.05, 0.02, f"Pearson r = {r:.3f} [{r_low:.3f}, {r_high:.3f}] (95% CI)", transform=plt.gca().transAxes) plt.tight_layout() fp = outdir / "03_scatter_correlation.png" plt.savefig(fp, bbox_inches="tight") plt.close() return fp def main(): parser = argparse.ArgumentParser(description="Visualize experimental vs control observations.") parser.add_argument("--input", type=str, default=None, help="CSV file path. See header format in script description.") parser.add_argument("--outdir", type=str, default="figures", help="Output directory for figures.") parser.add_argument("--zmax", type=float, default=3.0, help="MAD-based winsorization threshold.") args = parser.parse_args() outdir = Path(args.outdir) outdir.mkdir(parents=True, exist_ok=True) csv_path = Path(args.input) if args.input else None try: control, experiment, paired = load_data(csv_path) except Exception as e: print(f"Error loading data: {e}", file=sys.stderr) sys.exit(1) control, experiment = preprocess(control, experiment, paired=paired, zmax=args.zmax) dist_fp = plot_distributions(control, experiment, outdir) err_fp = plot_error_intervals(control, experiment, paired, outdir) scat_fp = plot_scatter_correlation(control, experiment, paired, outdir) print("Saved figures:") print(f"- {dist_fp}") print(f"- {err_fp}") if scat_fp is not None: print(f"- {scat_fp}") else: print("- Scatter correlation skipped (requires paired data).") if __name__ == "__main__": main() Notes on methodology: - Preprocessing: NaN/inf values are removed. Robust winsorization uses MAD to cap extreme outliers without discarding data, which stabilizes estimates and plots. - Estimation: Means and 95% confidence intervals use bootstrap percentiles, which are reliable for non-normal distributions. For correlations, Fisher’s z-transform provides an analytical 95% CI for r. - Visualization: - Distribution plots show histograms (and KDE when SciPy is available) with mean indicators. - Error interval plot displays mean ± 95% CI per group and annotates the mean difference with its CI (paired or unpaired, based on data). - Scatter correlation plot requires paired data and includes an OLS regression line with a bootstrap-based 95% uncertainty band.
把表格数据一键生成多图对比脚本;自动标注关键波动;快速完成周报、月报可视化并统一风格。
生成留存、漏斗、转化链路与分群趋势图;清晰呈现实验结果与结论;用于评审会与复盘的标准化图稿。
将实验与观测数据可视化;输出误差区间、分布与相关性图;脚本可复现、可复用,便于论文与附录。
制作渠道对比、活动转化、地域分布图;高清导出用于海报、PPT与公众号;高效交付增长复盘。
按课堂主题生成示例脚本与练习模板;中英注释清晰讲解步骤;让学生专注理解方法而非排版。
让业务与数据团队在最短时间内把“数据描述”转化为可直接运行的可视化脚本与清晰说明,快速产出高质量图表并用于汇报、决策与复盘。具体目标:- 降低从数据到图表的门槛,减少反复调试与搜代码时间,通常能显著缩短制图周期。- 统一图表规范(命名、标题、注释、颜色与版式),提升报告的专业度与一致性。- 以“专家视角”生成脚本与说明,兼顾准确性与可读性,减少误解与偏差。- 根据你的数据与场景,自动匹配合适的图表类型(趋势、对比、分布、相关性、热力等)。- 支持多语言输出,满足跨部门与跨地区沟通。- 促使首次试用即可完成一个可嵌入报告的图表;付费升级可获得风格库、批量生成、质量清单与团队协作能力,进一步提升交付效率与品牌一致性。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期