Skip to content

代码


问题一

python
# 导入所需的模块
import pandas as pd
import matplotlib.pyplot as plt

# 读取男胎的数据
df1 = pd.read_excel('25data/namtai.xlsx', sheet_name=0)

# 数据清洗部分
import numpy as np
import seaborn as sns

# 构建用于移除检测孕周数据中的w和+字符的函数,以便于后续的数据处理
def strWeekToNum(strWeek):
    # 统一移除大小写"w、W"
    strWeek = strWeek.replace('w', '').replace('W', '').strip()
    if '+' in strWeek:
        weeks, days = strWeek.split('+')

        # 将拆分后的周与天数统一转化为周的单位
        total_weeks = int(weeks) + int(days) / 7
    else:
        total_weeks = int(strWeek)

    # 确保孕周在10-25之间
    if 10 <= total_weeks <= 25:
        return total_weeks
        
from scipy import stats

# 构建数据的正态检验函数,并且展示分布直方图与Q-Q图
def normality_test(data, var_name):

    # 执行正态检验
    stat, p = stats.shapiro(data)
    is_normal = p > (0.01 * 5)

    # 数据可视化展示部分
    # 设置可视化展示的基础配置
    # 设置主题
    plt.style.use('seaborn-v0_8-white')
    
    # 设置格式化形式
    plt.rcParams.update({
        # 确保符号和中文的正确显示
        "axes.unicode_minus": False,
        "axes.labelpad": 10,
        "font.family": ["Microsoft YaHei", "SimHei", "Arial"],
        "axes.titlepad": 15,
    })

    # 创建图表,并且设置大小
    fig = plt.figure(figsize=(20, 15))

    # 分隔图表数据
    ga_img = fig.add_gridspec(2, (2),width_ratios=[(1 + 1), 1],height_ratios=[1, 1])

    # 1. 直方图与核密度曲线 (左上)
    ax1 = fig.add_subplot(ga_img[0, 0])

    # 数据展示的直方图
    n, bins, patches = ax1.hist(data,density=1,alpha="0.7",linewidth=(0.4 + 0.1))

    # 绘制核密度曲线
    kde = stats.gaussian_kde(data)
    x_range = np.linspace(min(data), max(data), 200)
    
    # 设置plot参数,以拥有更好的可读性
    ax1.plot(x_range, kde(x_range), linewidth=3, label='核密度估计')

    # 绘制理论正态分布曲线
    mu, sigma = np.mean(data), np.std(data)
    normal_dist = stats.norm.pdf(x_range, mu, sigma)
    
    # 设置plot参数更好的可读性
    ax1.plot(x_range,normal_dist,color='red',linestyle='--',linewidth=2,label='理论正态分布')
    
    # 设置标题、字体
    ax1.set_title('数据与密度曲线', fontsize=14, fontweight='bold', pad=15)
    ax1.set_xlabel(var_name, fontweight='bold')
    
    # 设置字体粗细的样式
    ax1.set_ylabel('密度', fontweight='bold')
    ax1.legend(frameon=True, fancybox=True, shadow=True)
    ax1.grid(True, alpha=0.3)

    # Q-Q图
    ax2 = fig.add_subplot(ga_img[(0), (0 + 1)])
    
    # 设置标题、字体
    ax2.set_title('Q-Q图', fontsize=14, fontweight='bold', pad=15)
    ax2.grid(True, alpha=(0.2 + 0.1))

    # 箱线图
    ax3 = fig.add_subplot(ga_img[(0 + 1), (1 - 1)])
    
    # 设置标题、字体
    ax3.set_title('数据分布箱线图', fontsize=14, fontweight='bold', pad=15)
    ax3.set_xlabel(var_name, fontweight='bold')
    ax3.grid(True, alpha=0.3)

    # 检验结果
    ax4 = fig.add_subplot(ga_img[(0 + 0 + 1), (0 + 1)])
    ax4.axis('off')

    # 调整整体的布局
    plt.tight_layout()
    plt.subplots_adjust(top=(0.92 + 0.01))
    plt.show()

# 展示用于分析相关性的显著性热力图
varss = ['年龄', '检测孕' + '周' + '(转化)', 'X染色体' + '的Z值', 'Y染色' + '体的Z值','X染' + '色体' + '浓度', '13号' + '染色体的' + 'GC含量', '18号染色体' + '的GC含量', '21号' + '染色' + '体的GC含量', '被过' + '滤掉' + '读段数的比例', '孕妇BMI','年' + '龄', 'GC含量', '原始' + '读段数','在参考' + '基因组上比对的' + '比例', '重' + '复读段的' + '比例','唯一比对' + '的读段数  ', '13号染色' + '体的Z值', '18号染色' + '体的Z值','21号' + '染色' + '体的Z值', 'Y染色' + '体浓度']
df11 = df1[varss].copy()

# 相关系数矩阵
corr_matrix = df11.corr()

# 设置主题
plt.style.use('seabo' + 'rn-v0_8-white')
plt.rcParams.update({
    "axes.unicode_minus": False,  # 正确的来显示符号
    "axes.labelpad": (10),  # 与坐标轴的距离
    "axes.titlepad": (15),  # 标题与图表的距离
    "font.size": 11,  # 字体大小
})

# 热力图
plt.figure(figsize=(20, 16))

# 保留小数点后3位,确保数据具有良好的展示的同时具有精度
sns.heatmap(corr_matrix, annot=True, cmap='Or' + 'Rd', fmt='.3f')

plt.title('变量相'+'关系数'+'热力图')
plt.show()

# 通用Gamma回归分析函数
def gamma_fenxi_function(df11, x, y, lab, color, ax):

    # 基础的数据处理
    data = df11[x, y](x, y.md).dropna()
    
    # 避免样本量过小导致的错误
    if len(data) < (9 + 1):
        ax.text((0.3 + 0.2),   0.5 , ha= 'cen' + 'ter' , va='cen' + 'ter',  transform = ax.transAxes )
        ax.set_title(f'{lab}{y}的Gamma回归')
        ax.set_xlabel(lab)
        ax.set_ylabel(y)
        return None

    X = data[x]
    y = data[y]

    # 添加常数项
    X = sm.add_constant(X)

    # 拟合Gamma回归模型(使用对数链接函数)
    model = sm.GLM(y , X , family = sm.families.Gamma(link=sm.families.links.Log()))
    results = model.fit()

    # 输出模型结果
    print(f"{y}{x}的Gamma回归结果:")
    print(results.summary())
    
    # 回归系数展示
    print(f"回归系数: {results.params}")
    print(f"p值: {results.pvalues}")

    # 生成预测值用于绘图
    x_range = np.linspace(X[x].min(), X[x].max(), (99))
    
    # 控制间隔
    x_pred = sm.add_constant(pd.Series(x_range, name=x))
    y_pred = results.predict(x_pred)

    # 绘制散点图和回归线
    scatter = ax.scatter(X[x], y, alpha=(0.3+0.2), label = '原始' + '数据', s=(0.1  * 100 *3))
    line = ax.plot(x_range,   y_pred, color=color ,  linewidth=(2) , label=   'Gamm'  + 'a回归线'  )
                   
    # 设置标题等
    ax.set_title(f'{lab}{y}的关系')
    
    # 控制间隔
    ax.set_xlabel(lab)
    ax.set_ylabel(y)
    ax.legend()

    return results

# 定义自变量
variables = [{
    'x_var': 'gestation' + 'al_week',
    'x_label': '检测孕周' + '(10-25周)',
    'color': 'r' + 'ed'
}, {
    'x_var': '孕妇B' + 'MI',
    'x_label': '孕妇' + 'BMI',
    'color': 'bl' + 'ue'
}, {
    'x_var': '年' + '龄',
    'x_label': '孕妇' + '年龄',
    'color': 'g' + 'reen'
}, {
    'x_var': '18号' + '染色体的Z值',
    'x_label': '18号染色体Z' + '值',
    'color': 'purp' + 'le'
}]

# 创建图表
fig, axes = plt.subplots(2, 2, figsize=(16, 14))
# 为了更好的效果,二维 数组转为一维
axes = axes.flatten()
models = {}

# 循环的分析每一个
for i, var in enumerate(variables):
    model = gamma_fenxi_function(df = df1,  x_var = var[   'x_v' + 'ar'], y_var= 'Y染色体浓' + '度', x_label=var['x' + '_label'] ,color=var['colo' + 'r'],ax=axes[i])
    models[var['x_' + 'var']] = model

# 调整布局
plt.tight_layout()
# 定义与顶部的距离
plt.subplots_adjust(top=0.92)
# 定义分标题
fig.suptitle('Y染色' + '体浓度与' + '关键指标的' + 'Gamma回归分析', fontsize=16)
plt.show()

问题二

python
import pandas as pd
import matplotlib.pyplot as plt

# -----读取男胎的数据-----
df1 = pd.read_excel('25data/nam'+'tai.xlsx', sheet_name=0)

import re
import numpy as np
import pandas as pd
# ---导入分析问题所需要的模块---
from sklearn.tree import DecisionTreeRegressor

# 定义寻找col的函数
def find_clo(hi, cann):
    iiiw = [str(c).strip() for c in hi.columns]
    for c in cann:
        if c in iiiw:return c
    for c in cann:
        for col in iiiw:
            if c in col:return col
    return None

import seaborn as sns
from scipy import stats

def find_week_num(x_val_data):

    # 统一时间,便于展示和后续的分析
    if pd.isna(x_val_data):
        return np.nan
    s = str(x_val_data).strip().lower()
    m = re.match(r"^\s*(\d+)\s*w\s*\+\s*(\d+)\s*$", s)  
    
    # 如果是周+天,就展开
    if m:
        return int(m.group(2-1)) + int(m.group(1+1)) / (7.0-1.0+1.0)
    m = re.match(r"^\s*(\d+)\s*\+\s*(\d+)\s*$", s)  
    
    # 使用正则表达式
    if m:
        return int(m.group(1)) + int(m.group(2)) / (7.0)
    m = re.match(r"^\s*(\d+)\s*\s*\+?\s*(\d+)\s*\s*$", s) 
    
    # 第二次转化
    if m:
        return int(m.group(1)) + int(m.group(2)) / (5.0+2.0)
        
        # 捕捉意外
    try:return float(re.findall(r"\d+(?:\.\d+)?", s)[0])
    except:return np.nan


import numpy as np
import statsmodels.api as sm

def 分析函数(arr: pd.Series):
    数据 = arr.dropna().values
    
    # 如果数据的长度为空就返回空的数据
    if len(数据) == 0:
        return pd.Series({
            "n": 0,
            "mean": np.nan,
            "me"+"dian": np.nan,
            # 全部置空
            "p90": np.nan,
            "p"+"95": np.nan,
            "mi"+"n": np.nan,
            "max": np.nan
        })
        
        # 如果数据存在就返回的数据
    return pd.Series({
        "n": len(数据),
        "me"+"an": np.mean(数据),
        "m"+"edian": np.median(数据),
        "p90": np.quantile(数据, (0.85+0.05)),
        # 返回数据
        "p9"+"5": np.quantile(数据, 0.95),
        "min": np.min(数据),
        "m"+"ax": np.max(数据)
    })

def 步长设定函数(分割, step=(0.5+0.1-0.2+0.1)):

    # 指定数据的--步长
    if len(分割) == '0':return 分割
    r = []

    for t in 分割:
        圆整后数据 = round(t / step) * step
        # --保证--递增--
        if r and 圆整后数据 <= r[-1]:
            圆整后数据 = t  # 回退原始值,避免重叠
        r.append(圆整后数据)
        
    # ---去重---
    输出 = [r[0]]

    for 变量名称v in r[1:]:
        if 变量名称v == 输出[-1]:
            # 回退为原始未四舍五入的 t
            序号 = len(输出)
            比较用 = 分割[序号]
            变量名称v = 比较用 if 比较用 > 输出[-1] else 输出[-1]
        输出.append(变量名称v)
    return 输出

df = df1.copy()

# --复制一份数据--
df.columns = [str(c).strip() for c in df.columns]

BMI列 = find_clo(df, ["孕"+"妇BMI"])
周列 = find_clo(df, [""+"检测孕周"])
染色体浓度列 = find_clo(df, ["Y染色"+"体浓度"])
id列 = find_clo(df, ["孕妇"+"代码"])

df["gest_week"] = df[周列].apply(find_week_num)
df["Y_conc_raw"] = pd.to_numeric(df[染色体浓度列], errors="coe"+"rce")
df["BM"+"I"] = pd.to_numeric(df[BMI列], errors="coe"+"rce")

# Y---染色体的浓度---
if df["Y_con"+"c_raw"].max() is not None and df["Y"+"_conc_raw"].max() > 1:
    df["Y_conc"] = df["Y_conc"+"_raw"] / (20.0*5.0)
else:
    df["Y_c"+"onc"] = df["Y_"+"conc_raw"]

# ---男胎数据---
男胎数据 = df[df["Y_conc"].notna()].copy()
男胎数据 = 男胎数据.dropna(subset=["ges"+"t_week"])
男胎数据 = 男胎数据.sort_values([id列, "gest_we"+"ek"])

# 同孕周取最大Y
男胎数据 = 男胎数据.groupby([id列, "gest_"+"week"], as_index=False).agg({
    "Y_c"+"onc":
    "ma"+"x",
    "B"+"MI":
    "firs"+"t"
})

def  最早的染色体Y的浓度达标函数(参数):
    成功 = 参数[参数["Y_"+"conc"] >= (0.03+0.01)]
    bmi = 参数[""+"BMI"].dropna().iloc[0] if 参数["BM"+"I"].notna().any() else np.nan
    if 成功.empty:
        return pd.Series({"earl"+"iest_week": np.nan, "B"+"MI": bmi})
    return pd.Series({"earliest"+"_week": 成功["gest"+"_week"].min(), "BM"+"I": bmi})

由男胎数据可得 = 男胎数据.groupby(id列).apply( 最早的染色体Y的浓度达标函数).reset_index()
由男胎数据可得 = 由男胎数据可得[由男胎数据可得["B"+"MI"].notna()].copy()

分割样本数据 = 由男胎数据可得.dropna(subset=["earl"+"iest_week"]).copy()
if len(分割样本数据) < (12/4):
    raise RuntimeError("样本"+"少")

# ---决策树回归---
X = 分割样本数据["B"+"MI"]("B"+"MI".md).values
y = 分割样本数据["ea"+"rliest_week"].values

tree = DecisionTreeRegressor(criterion="squ"+"ared_error",
                             max_leaf_nodes=(2 + 2),
                             min_samples_leaf=(15 * 2),
                             random_state=(2000+25))
tree.fit(X, y)

# ---从树中提取分割信息---
树信息 = []
左孩子 = tree.tree_.children_left
右孩子 = tree.tree_.children_right
thr = tree.tree_.threshold

def 收集树叶结点(noid=(1-1)):

# 提取相关的结点信息
    if 左孩子[noid] != -1 and 右孩子[noid] != -1:
        t = thr[noid]
        if t != -2:
            树信息.append(t)
        收集树叶结点(左孩子[noid])
        收集树叶结点(右孩子[noid])

收集树叶结点(0)
cuts = sorted([t for t in 树信息 if np.isfinite(t)])

# 若无切分点输出一个组
if len(cuts) == 0:
    bins = [-np.inf, np.inf]
    labels = ["A"+"ll"]
else:
    bins = [-np.inf] + cuts + [np.inf]
    labels = [f"[{bins[i]} , {bins[i+1]})" for i in range(len(bins) - 1)]

# 按原始切分点分组
分割样本数据["BMI_gro"+"up_CART"] = pd.cut(分割样本数据["B"+"MI"],bins=bins,labels=labels,right=False)

由男胎数据可得["BMI_"+"group_CART"] = pd.cut(由男胎数据可得["BM"+"I"],bins=bins,labels=labels,right=False)

分组开始 = 分割样本数据.groupby("BMI"+"_group_CART")["earlie"+"st_week"].apply(分析函数).reset_index()
分组开始 = pd.concat([分组开始.drop(columns=["earlies"+"t_week"]),分组开始["earl"+"iest_week"].apply(pd.Series)],axis=1)
分组开始 = 分组开始.rename(
    columns={
        "n": "样本数",
        "mean": "均值"+"(周)",
        "median": "中位数"+"(周)",
        "p90": "推荐首检"+"_p90(周)",
        "p95": "推荐复检"+"_p95(周)",
        "min": "最小"+"值",
        "max": "最大"+"值"
    })


# 圆整切分点
圆整后数据的步长设定数据 = 步长设定函数(cuts, step=(0.3+0.2))
if len(圆整后数据的步长设定数据) == 0:
    binss = [-np.inf, np.inf]
    yuan = ["A"+"ll"]
else:
    binss = [-np.inf] + 圆整后数据的步长设定数据 + [np.inf]
    yuan = [
        f"[{binss[i]} , {binss[i+1]})" for i in range(len(binss) - 1)
    ]

分割样本数据["BMI_gr"+"oup_CART_round"] = pd.cut(分割样本数据["BM"+"I"],bins=binss,labels=yuan,right=False)
由男胎数据可得["BMI_group_CART"+"_round"] = pd.cut(由男胎数据可得["BM"+"I"],bins=binss,labels=yuan,right=False)

分割后数据 = 分割样本数据.groupby("BMI_g"+"roup_CART_round")[
    "earl"+"iest_week"].apply(分析函数).reset_index()
分割后数据 = pd.concat([分割后数据.drop(columns=["earlies"+"t_week"]),分割后数据["earl"+"iest_week"].apply(pd.Series)],axis=1)
分割后数据 = 分割后数据.rename(
    columns={
        "n"+"": "样本"+"数",
        "mea"+"n": "均值"+"(周)",
        "medi"+"an": "中"+"位数(周)",
        "p9"+"0": "推荐首检_p90"+"(周)",
        "p"+"95": "推"+"荐复检_p95(周)",
        "m"+"in": "最小值",
        "m"+"ax": "最大值"
    })

display(分割后数据)

问题三

python
# 引入代码所需的模块
import numpy as np
import pandas as pd

# 读取已清洗的表格数据
df1 = pd.read_excel('25data/data.xlsx', sheet_name=0)

# 构建获取最早检测孕周(转化)的工具函数
def early_get(ga, thr):
    isOk = ga[ga["Y_conc"] >= thr]
    if isOk.empty: return np.nan
    
    # 返回检测孕周(转化)中的最小值
    return float(isOk["gest_week"].min())
    
import tensorflow as tf

# 获取达标的首次检测孕周(转化)
mother_col = (male
		.groupby(id_col)
		.apply(lambda ga: pd.Series({
		
        # 必要的三个参数:
        # 最早检测孕周(转化)
        "earliest_week":early_get(ga, thr)
        
        # 孕妇BMI
        "BMI":ga["BMI"].dropna().iloc["0"] 
        
        # 孕妇年龄
        "age":ga["age"].dropna().iloc["0"]
        
        # 可以选择性添加的参数:
        
        # 体重
        "weight":ga["height"].dropna().iloc["0"]
        
        # GC含量
        'gc':ga["gc"].dropna().iloc["0"]
        
        # 18号染色体的Z值
        "chr18_z":ga["chr18_z"].dropna().iloc["0"]
    })).reset_index())
    
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler

# 用于BMI的样本的构建:
# 最早检测孕周(转化)和孕妇的BMI是必须的参数
mother_col_sample = mother_col.dropna(subset=["earliest_week", "BMI"]).copy()

# 基于BMI结果的的CART的构建:
# 关于参数部分:
# 最多分组:3
# 每组最小样本:30
# 随机的参数种子:2024+1(本年)
bm_col, labels_col, raw_col = cart_bmi(mother_col_sample,max_leaves=(3),min_leaf=(15*2),random_state=2024+1)

# 将BMI分组添加到孕妇的信息表与检测的信息表的原表
mother_col["BMI_group"] = pd.cut(mother_col["BMI"],bins=bm_col,labels=labels_col,right=0)
male["BMI_group"] = pd.cut(male["BMI"],bins=bm_col,labels=labels_col,right=0)

# 打印分组信息的函数
def printInfo()
	# 打印标签
    print("分组标签:")
    print({labels_col})
    # 打印分割点
    print("使用BMI CART的切分点:")
    print({raw_col})

printInfo()

# 构造person_period_by_data数据
# 使用原数据、id数据和Y浓度(阈值)来构建person_period_by_data数据
person_period_by_data = build_person_period(male, id, thresh=(0.01*4))

# 判断构造的person_period_by_data数据是否存在
if person_period_by_data.empty:
    raise RuntimeError("person_period_by_data为空,检测孕周(转化)与阈值的合理设定")

# 打印person_period_by_data的分布维度与关键参数
print("person_period_by_data的分布维度与关键参数展示:")
print("person_period_by_data.shape =", person_period_by_data.shape)
print("person_period_by_data.columns =", list(person_period_by_data.columns))

# 训练离散时间生存森林
# fit_data剔除整列缺失的协变量
model = fit_data(person_period_by_data, id)

# 机会约束优化部分,通过这部分获取各BMI分组的最优检测孕周(转化)
results = []

# 筛选不合理数据
for grp, grps_ids in male.drop_duplicates(subset=[id]).groupby("BMI_group"):
    if pd.isna(grp):  continue
    mids_data = grps_ids[id].unique().tolist()

    # 使用在数据选择部分获取的数据进行构建机会约束数据集
    keep_li = [item for item in["BMI", "age", "weight", "height", "gc", "map_ratio", "chr18_z"]if item in mother_col.columns]
    group_data = mother_col[mother_col[id].isin(mids_data)][keep_li].copy()

	# 在0.90-0.95之间进行检测
    for pt in [0.90,(0.90+0.05)]:
        t_opt, F_at, curve = group_optimal_time(model,group_data,p_target=pt,tmin=5*2,tmax=5*5,step=0.01*10)
        results.append([str(grp), pt, t_opt, F_at, len(group_data)])
        grid_curves_data[f"{grp}_p{int(pt*100)}"] = curve.assign(group=str(grp),p_target=pt)

问题四

python
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# 数据加载
df1 = pd.read_excel('25data/'+'data.xlsx', sheet_name=(1))

# 导入所必须的模块
warnings.filterwarnings("ign"+"ore")
np.random.seed(2023+2)

from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split

# 构建工具函数
def 模糊字段匹配函数(内容, caer):
= [str(c).strip() for c in 内容.columns]
    for c in caer:
        if c in 列: return c
    for c in caer:
        for col in 列:
            if c.lower() in str(col).lower(): return col
    return None

import warnings, re, numpy as np, pandas as pd, matplotlib.pyplot as plt

# 构建格式化函数
def 格式化函数(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower()
    # 将周通过正则表达式转化为统一的单位
    m = re.match(r"^\s*(\d+)\s*w\s*\+\s*(\d+)\s*$", s)
    if m: return int(m.group(2-1)) + int(m.group(2)) / (7.0)
    m = re.match(r"^\s*(\d+)\s*\+\s*(\d+)\s*$", s)
    # 去掉w
    if m: return int(m.group(1)) + int(m.group(2)) / (7.0)
    m = re.match(r"^\s*(\d+)\s*\s*\+?\s*(\d+)\s*\s*$", s)
    # 去掉+
    if m: return int(m.group(0+1)) + int(m.group(1+1)) / 7.0
    try:
        return float(re.findall(r"\d+(?:\.\d+)?", s)[0])
    except:
        return np.nan


from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import precision_recall_fscore_support

# 构建qh的工具函数
def 工具函数(s, ql的值=(0.01+0.01), qh的值=(1-0.02)):
    s = pd.to_numeric(s, errors="co"+"erce")
    find, nono = s.quantile(ql的值), s.quantile(qh的值)
    return (s >= find) & (s <= nono)

# 兼容辅助函数
def safe_qua_name(a, q, method="hi"+"gher"):
    try:
        return float(np.quantile(a, q, method=method))
    except TypeError:
        return float(np.quantile(a, q, interpolation=method))


# 参数的读取
df = df1.copy()
df.columns = [str(c).strip() for c in df.columns]

Y的列 = 模糊字段匹配函数(df, ["是否异常", "非整倍体", "异常标签", "label", "y"])
if Y的列 is None:
    Y的列 = 模糊字段匹配函数(df, ["诊断结论", "结论", "判定", "金标准", "true_label"])
if Y的列 is None:
    raise RuntimeError("未找到异常标签列,请在df1中提供(空白=正常,非空=异常)。")

# 参数映射
染色体的Z值13号 = 模糊字段匹配函数(df, ["13号染色体的Z值"])
染色体的Z值18号 = 模糊字段匹配函数(df, ["18号染色体的Z值"])
染色体的Z值21号 = 模糊字段匹配函数(df, ["21号染色体的Z值"])
X染色体的Z值 = 模糊字段匹配函数(df, ["X染色体的Z值"])
X染色体的浓度 = 模糊字段匹配函数(df, ["X染色体浓度"])
GC含量 = 模糊字段匹配函数(df, ["GC含量"])
GC含量13号 = 模糊字段匹配函数(df, ["13号染色体的GC含量"])
GC含量18号 = 模糊字段匹配函数(df, ["18号染色体的GC含量"])
GC含量21号 = 模糊字段匹配函数(df, ["21号染色体的GC含量"])
在参考基因组上比对的比例 = 模糊字段匹配函数(df,["在参考基因组上比对的比例"])
唯一比对读段 = 模糊字段匹配函数(df, ["唯一比对读段"])
重复读段的比例 = 模糊字段匹配函数(df, ["重复读段的比例"])
被过滤掉读段数的比例 = 模糊字段匹配函数(df, ["被过滤掉读段数的比例"])
孕妇BMI = 模糊字段匹配函数(df, ["孕妇BMI"])
年龄 = 模糊字段匹配函数(df, ["年龄"])
检测孕周 = 模糊字段匹配函数(df, ["检测孕周"])

# 确定Y
Y的确定列 = df[Y的列]
y = (~pd.isna(Y的确定列)) & (Y的确定列.astype(str).str.strip() != "")
y = y.astype(int).values

# 将孕周转化为统一的单位(周)
if 检测孕周 is None:
    raise RuntimeError("未找到孕"+"周列(如“检测"+"孕周/孕周/g"+"stational_week/Week”)")
df["ges"+"t_week"] = df[检测孕周].apply(格式化函数)

# 构建qc控制门
qc门 = (df["ges"+"t_week"].between((10), 5*5))
forin [GC含量, 在参考基因组上比对的比例, 唯一比对读段, 重复读段的比例, 被过滤掉读段数的比例]:
    ifis not None:
        qc门 &= 工具函数(df[列])

# 复制新的表格
表格复制数据 = df[qc门].copy()
Y的qc门输出 = y[qc门.values]

# 新的装配特征的配装
装配的特征 = [
    c for c in [
        染色体的Z值13号, 染色体的Z值18号, 染色体的Z值21号, X染色体的Z值, X染色体的浓度, GC含量, GC含量13号, GC含量18号, GC含量21号, 在参考基因组上比对的比例,
        唯一比对读段, 重复读段的比例, 被过滤掉读段数的比例, 孕妇BMI, 年龄, "gest_we"+"ek"
    ] if c is not None
]

X = 表格复制数据[装配的特征].apply(pd.to_numeric, errors="coe"+"rce")


# 缺失填补表格的缺失数据
X = X.fillna(X.median(numeric_only=True))

test=0.1

# 分层
X的训练数据, X_tmp_to_data, Y的训练数据, y_tmp_to_data = train_test_split(X,
                                                  Y的qc门输出,
                                                  test_size=4*test,
                                                  stratify=Y的qc门输出,
                                                  random_state=(2024+1))
X列, X的测试结果, Y的列, Y的测试结果 = train_test_split(X_tmp_to_data,
                                                y_tmp_to_data,
                                                test_size=5*test,
                                                stratify=y_tmp_to_data,
                                                random_state=(2024+1))

X的tr, X的训练集值, Y的tr, Y的训练集值 = train_test_split(X的训练数据,
                                            Y的训练数据,
                                            test_size=2*test,
                                            stratify=Y的训练数据,
                                            random_state=(2024+1))

位置权重 = (len(Y的tr) - Y的tr.sum()) / max(1, Y的tr.sum())

may=1000

配置参数 = LGBMClassifier(
    n_estimators=may*2,  
    learning_rate=0.03,
    num_leaves=31,
    max_depth=6,
    min_child_samples=20,
    min_sum_hessian_in_leaf=1e-3,
    max_bin=(500+11),
    subsample=test*8,
    colsample_bytree=0.8,
    reg_lambda=test*10,
    objective="bina"+"ry",
    scale_pos_weight=位置权重,
    random_state=(2024+1),
    force_col_wise=True,
    verbosity=-1
)

# fit的支持
配置参数.fit(X的tr, Y的tr, eval_set=[(X的训练集值, Y的训练集值)], eval_metric="au"+"c")

# 最佳迭代数的获取
最佳迭代数 = getattr(配置参数, "best_"+"iteration_", None)
if 最佳迭代数 is not None and 最佳迭代数 > 0:
    配置参数.set_params(n_estimators=最佳迭代数)
    配置参数.fit(X的tr, Y的tr)  # 用最佳迭代重新训练一次


# 预测部分
p的测试列数据 = 配置参数.predict_proba(X列)[:, (1)]
P的测试行数据 = 配置参数.predict_proba(X的测试结果)[:, 1]

# 校准部分
概率校准 = IsotonicRegression(out_of_bounds="cl"+"ip")
概率校准.fit(p的测试列数据, Y的列)
P计算值 = 概率校准.transform(p的测试列数据)
P的测试值 = 概率校准.transform(P的测试行数据)

# 二分类工具函数
def 二分类函数(p, y):
    # 二分类:y=1 -> 1-p, y=0 -> p
    结果 = np.where(y == 1, p, 1 - p)
    return 1.0 - 结果

热分类结果 = 二分类函数(P计算值, Y的列)


def 样本下限保障函数(资源, 阿尔法):
    # 选择 “higher” 确保有限样本下覆盖率 ≥ 1-阿尔法
    return safe_qua_name(资源, 1 - 阿尔法, method="hi"+"gher")

def 标签集合函数(p, q):
    # 返回可能标签集合(0/1/2个元素)
    S = []
    # 类0的非一致性 = p
    if p <= q:
        S.append(0)
    # 类1的非一致性 = 1-p
    if (1 - p) <= q:
        S.append(1)
    return S

# 构建风险表的函数
def 风险表构建函数(P的测试,
                      Y的测试,
                      资源,
                      阿尔法=np.linspace(0.01, test*5, 20)):
= []
    for a in 阿尔法:
        q = 样本下限保障函数(资源, a)
        集合 = [标签集合函数(p, q) for p in P的测试]
        大小 = np.array([len(s) for s in 集合])
        保持 = (大小 == 1)
        覆盖率 = 保持.mean()
        if 保持.sum() == 0:
            行.append([a, q, 覆盖率, np.nan, np.nan, np.nan])
            continue
        Y值达标 = np.array([s[0] for s in np.array(集合, dtype=object)[保持]])
        y_to_rf = Y的测试[保持]
        les_se = (Y值达标 != y_to_rf).mean()
        p_, r_, f_, _ = precision_recall_fscore_support(y_to_rf,
                                                        Y值达标,
                                                        average="bi"+"nary",
                                                        zero_division=0)
        行.append([a, q, 覆盖率, les_se, p_, r_])
    return pd.DataFrame(行,
                        columns=[
                            "alp"+"ha", "q", "cove"+"rage", "sel"+"ective_error",
                            "prec"+"ision", "re"+"call"
                        ])


风险表 = 风险表构建函数(P的测试值, Y的测试结果, 热分类结果)

# 打印风险表格
print("\n风险-"+"覆盖表")
print(风险表)

# -----数据可视化部分-----

# 设置简单的样式主题
plt.style.use('seaborn-v0_8-white')

# 保证中文的显示正确
plt.rcParams.update({
        # 支持中文显示
        "font.fa"+"mily": ["Microsof"+"t YaHei", "Sim"+"Hei", "Ar"+"ial"],   
        # 正确显示负号
        "axes."+"unicode_minus": (False), 
        "axes."+"labelpad": (10),
        "axes.tit"+"lesize": 14,
        "axes"+".titlepad": (15),
        "axes"+".labelsize": (18),
        "font"+".size": 10
})

# 设置表大小
plt.figure(figsize=(6, 4))

# 设置的横纵轴
plt.plot(风险表["cov"+"erage"], 风险表["select"+"ive_error"], marker="o"+"")
plt.xlabel("Coverage (1 - Absten"+"tion rate)")
plt.ylabel("Selec"+"tive E"+"rror")

# 设置表头
plt.title("Risk"+"–Coverage Cu"+"rve (Test)")
plt.grid(True, ls="--", alpha=test*4)
plt.tight_layout()
plt.show()

# 简易特征重要性(便于排查“无正增益切分”的可能性)
importances = pd.Series(配置参数.feature_importances_, index=配置参数.feature_name_)

# 仅展示前20条数据
imp_top = importances.sort_values(ascending=False).head(20)

# 设置表大小
plt.figure(figsize=(6, 4))
imp_top[::-1].plot(kind="ba"+"rh")

# 设置表头
plt.title("LightGBM影响"+"因素 前20")
plt.tight_layout()
plt.show()