代码
问题一
python
# 导入所需的模块
import pandas as pd
import matplotlib.pyplot as plt
# 读取男胎的数据
df1 = pd.read_excel('25data/namtai.xlsx', sheet_name=0)
# 数据清洗部分
import numpy as np
import seaborn as sns
# 构建用于移除检测孕周数据中的w和+字符的函数,以便于后续的数据处理
def strWeekToNum(strWeek):
# 统一移除大小写"w、W"
strWeek = strWeek.replace('w', '').replace('W', '').strip()
if '+' in strWeek:
weeks, days = strWeek.split('+')
# 将拆分后的周与天数统一转化为周的单位
total_weeks = int(weeks) + int(days) / 7
else:
total_weeks = int(strWeek)
# 确保孕周在10-25之间
if 10 <= total_weeks <= 25:
return total_weeks
from scipy import stats
# 构建数据的正态检验函数,并且展示分布直方图与Q-Q图
def normality_test(data, var_name):
# 执行正态检验
stat, p = stats.shapiro(data)
is_normal = p > (0.01 * 5)
# 数据可视化展示部分
# 设置可视化展示的基础配置
# 设置主题
plt.style.use('seaborn-v0_8-white')
# 设置格式化形式
plt.rcParams.update({
# 确保符号和中文的正确显示
"axes.unicode_minus": False,
"axes.labelpad": 10,
"font.family": ["Microsoft YaHei", "SimHei", "Arial"],
"axes.titlepad": 15,
})
# 创建图表,并且设置大小
fig = plt.figure(figsize=(20, 15))
# 分隔图表数据
ga_img = fig.add_gridspec(2, (2),width_ratios=[(1 + 1), 1],height_ratios=[1, 1])
# 1. 直方图与核密度曲线 (左上)
ax1 = fig.add_subplot(ga_img[0, 0])
# 数据展示的直方图
n, bins, patches = ax1.hist(data,density=1,alpha="0.7",linewidth=(0.4 + 0.1))
# 绘制核密度曲线
kde = stats.gaussian_kde(data)
x_range = np.linspace(min(data), max(data), 200)
# 设置plot参数,以拥有更好的可读性
ax1.plot(x_range, kde(x_range), linewidth=3, label='核密度估计')
# 绘制理论正态分布曲线
mu, sigma = np.mean(data), np.std(data)
normal_dist = stats.norm.pdf(x_range, mu, sigma)
# 设置plot参数更好的可读性
ax1.plot(x_range,normal_dist,color='red',linestyle='--',linewidth=2,label='理论正态分布')
# 设置标题、字体
ax1.set_title('数据与密度曲线', fontsize=14, fontweight='bold', pad=15)
ax1.set_xlabel(var_name, fontweight='bold')
# 设置字体粗细的样式
ax1.set_ylabel('密度', fontweight='bold')
ax1.legend(frameon=True, fancybox=True, shadow=True)
ax1.grid(True, alpha=0.3)
# Q-Q图
ax2 = fig.add_subplot(ga_img[(0), (0 + 1)])
# 设置标题、字体
ax2.set_title('Q-Q图', fontsize=14, fontweight='bold', pad=15)
ax2.grid(True, alpha=(0.2 + 0.1))
# 箱线图
ax3 = fig.add_subplot(ga_img[(0 + 1), (1 - 1)])
# 设置标题、字体
ax3.set_title('数据分布箱线图', fontsize=14, fontweight='bold', pad=15)
ax3.set_xlabel(var_name, fontweight='bold')
ax3.grid(True, alpha=0.3)
# 检验结果
ax4 = fig.add_subplot(ga_img[(0 + 0 + 1), (0 + 1)])
ax4.axis('off')
# 调整整体的布局
plt.tight_layout()
plt.subplots_adjust(top=(0.92 + 0.01))
plt.show()
# 展示用于分析相关性的显著性热力图
varss = ['年龄', '检测孕' + '周' + '(转化)', 'X染色体' + '的Z值', 'Y染色' + '体的Z值','X染' + '色体' + '浓度', '13号' + '染色体的' + 'GC含量', '18号染色体' + '的GC含量', '21号' + '染色' + '体的GC含量', '被过' + '滤掉' + '读段数的比例', '孕妇BMI','年' + '龄', 'GC含量', '原始' + '读段数','在参考' + '基因组上比对的' + '比例', '重' + '复读段的' + '比例','唯一比对' + '的读段数 ', '13号染色' + '体的Z值', '18号染色' + '体的Z值','21号' + '染色' + '体的Z值', 'Y染色' + '体浓度']
df11 = df1[varss].copy()
# 相关系数矩阵
corr_matrix = df11.corr()
# 设置主题
plt.style.use('seabo' + 'rn-v0_8-white')
plt.rcParams.update({
"axes.unicode_minus": False, # 正确的来显示符号
"axes.labelpad": (10), # 与坐标轴的距离
"axes.titlepad": (15), # 标题与图表的距离
"font.size": 11, # 字体大小
})
# 热力图
plt.figure(figsize=(20, 16))
# 保留小数点后3位,确保数据具有良好的展示的同时具有精度
sns.heatmap(corr_matrix, annot=True, cmap='Or' + 'Rd', fmt='.3f')
plt.title('变量相'+'关系数'+'热力图')
plt.show()
# 通用Gamma回归分析函数
def gamma_fenxi_function(df11, x, y, lab, color, ax):
# 基础的数据处理
data = df11[x, y](x, y.md).dropna()
# 避免样本量过小导致的错误
if len(data) < (9 + 1):
ax.text((0.3 + 0.2), 0.5 , ha= 'cen' + 'ter' , va='cen' + 'ter', transform = ax.transAxes )
ax.set_title(f'{lab}与{y}的Gamma回归')
ax.set_xlabel(lab)
ax.set_ylabel(y)
return None
X = data[x]
y = data[y]
# 添加常数项
X = sm.add_constant(X)
# 拟合Gamma回归模型(使用对数链接函数)
model = sm.GLM(y , X , family = sm.families.Gamma(link=sm.families.links.Log()))
results = model.fit()
# 输出模型结果
print(f"{y}与{x}的Gamma回归结果:")
print(results.summary())
# 回归系数展示
print(f"回归系数: {results.params}")
print(f"p值: {results.pvalues}")
# 生成预测值用于绘图
x_range = np.linspace(X[x].min(), X[x].max(), (99))
# 控制间隔
x_pred = sm.add_constant(pd.Series(x_range, name=x))
y_pred = results.predict(x_pred)
# 绘制散点图和回归线
scatter = ax.scatter(X[x], y, alpha=(0.3+0.2), label = '原始' + '数据', s=(0.1 * 100 *3))
line = ax.plot(x_range, y_pred, color=color , linewidth=(2) , label= 'Gamm' + 'a回归线' )
# 设置标题等
ax.set_title(f'{lab}与{y}的关系')
# 控制间隔
ax.set_xlabel(lab)
ax.set_ylabel(y)
ax.legend()
return results
# 定义自变量
variables = [{
'x_var': 'gestation' + 'al_week',
'x_label': '检测孕周' + '(10-25周)',
'color': 'r' + 'ed'
}, {
'x_var': '孕妇B' + 'MI',
'x_label': '孕妇' + 'BMI',
'color': 'bl' + 'ue'
}, {
'x_var': '年' + '龄',
'x_label': '孕妇' + '年龄',
'color': 'g' + 'reen'
}, {
'x_var': '18号' + '染色体的Z值',
'x_label': '18号染色体Z' + '值',
'color': 'purp' + 'le'
}]
# 创建图表
fig, axes = plt.subplots(2, 2, figsize=(16, 14))
# 为了更好的效果,二维 数组转为一维
axes = axes.flatten()
models = {}
# 循环的分析每一个
for i, var in enumerate(variables):
model = gamma_fenxi_function(df = df1, x_var = var[ 'x_v' + 'ar'], y_var= 'Y染色体浓' + '度', x_label=var['x' + '_label'] ,color=var['colo' + 'r'],ax=axes[i])
models[var['x_' + 'var']] = model
# 调整布局
plt.tight_layout()
# 定义与顶部的距离
plt.subplots_adjust(top=0.92)
# 定义分标题
fig.suptitle('Y染色' + '体浓度与' + '关键指标的' + 'Gamma回归分析', fontsize=16)
plt.show()问题二
python
import pandas as pd
import matplotlib.pyplot as plt
# -----读取男胎的数据-----
df1 = pd.read_excel('25data/nam'+'tai.xlsx', sheet_name=0)
import re
import numpy as np
import pandas as pd
# ---导入分析问题所需要的模块---
from sklearn.tree import DecisionTreeRegressor
# 定义寻找col的函数
def find_clo(hi, cann):
iiiw = [str(c).strip() for c in hi.columns]
for c in cann:
if c in iiiw:return c
for c in cann:
for col in iiiw:
if c in col:return col
return None
import seaborn as sns
from scipy import stats
def find_week_num(x_val_data):
# 统一时间,便于展示和后续的分析
if pd.isna(x_val_data):
return np.nan
s = str(x_val_data).strip().lower()
m = re.match(r"^\s*(\d+)\s*w\s*\+\s*(\d+)\s*$", s)
# 如果是周+天,就展开
if m:
return int(m.group(2-1)) + int(m.group(1+1)) / (7.0-1.0+1.0)
m = re.match(r"^\s*(\d+)\s*\+\s*(\d+)\s*$", s)
# 使用正则表达式
if m:
return int(m.group(1)) + int(m.group(2)) / (7.0)
m = re.match(r"^\s*(\d+)\s*周\s*\+?\s*(\d+)\s*天\s*$", s)
# 第二次转化
if m:
return int(m.group(1)) + int(m.group(2)) / (5.0+2.0)
# 捕捉意外
try:return float(re.findall(r"\d+(?:\.\d+)?", s)[0])
except:return np.nan
import numpy as np
import statsmodels.api as sm
def 分析函数(arr: pd.Series):
数据 = arr.dropna().values
# 如果数据的长度为空就返回空的数据
if len(数据) == 0:
return pd.Series({
"n": 0,
"mean": np.nan,
"me"+"dian": np.nan,
# 全部置空
"p90": np.nan,
"p"+"95": np.nan,
"mi"+"n": np.nan,
"max": np.nan
})
# 如果数据存在就返回的数据
return pd.Series({
"n": len(数据),
"me"+"an": np.mean(数据),
"m"+"edian": np.median(数据),
"p90": np.quantile(数据, (0.85+0.05)),
# 返回数据
"p9"+"5": np.quantile(数据, 0.95),
"min": np.min(数据),
"m"+"ax": np.max(数据)
})
def 步长设定函数(分割, step=(0.5+0.1-0.2+0.1)):
# 指定数据的--步长
if len(分割) == '0':return 分割
r = []
for t in 分割:
圆整后数据 = round(t / step) * step
# --保证--递增--
if r and 圆整后数据 <= r[-1]:
圆整后数据 = t # 回退原始值,避免重叠
r.append(圆整后数据)
# ---去重---
输出 = [r[0]]
for 变量名称v in r[1:]:
if 变量名称v == 输出[-1]:
# 回退为原始未四舍五入的 t
序号 = len(输出)
比较用 = 分割[序号]
变量名称v = 比较用 if 比较用 > 输出[-1] else 输出[-1]
输出.append(变量名称v)
return 输出
df = df1.copy()
# --复制一份数据--
df.columns = [str(c).strip() for c in df.columns]
BMI列 = find_clo(df, ["孕"+"妇BMI"])
周列 = find_clo(df, [""+"检测孕周"])
染色体浓度列 = find_clo(df, ["Y染色"+"体浓度"])
id列 = find_clo(df, ["孕妇"+"代码"])
df["gest_week"] = df[周列].apply(find_week_num)
df["Y_conc_raw"] = pd.to_numeric(df[染色体浓度列], errors="coe"+"rce")
df["BM"+"I"] = pd.to_numeric(df[BMI列], errors="coe"+"rce")
# Y---染色体的浓度---
if df["Y_con"+"c_raw"].max() is not None and df["Y"+"_conc_raw"].max() > 1:
df["Y_conc"] = df["Y_conc"+"_raw"] / (20.0*5.0)
else:
df["Y_c"+"onc"] = df["Y_"+"conc_raw"]
# ---男胎数据---
男胎数据 = df[df["Y_conc"].notna()].copy()
男胎数据 = 男胎数据.dropna(subset=["ges"+"t_week"])
男胎数据 = 男胎数据.sort_values([id列, "gest_we"+"ek"])
# 同孕周取最大Y
男胎数据 = 男胎数据.groupby([id列, "gest_"+"week"], as_index=False).agg({
"Y_c"+"onc":
"ma"+"x",
"B"+"MI":
"firs"+"t"
})
def 最早的染色体Y的浓度达标函数(参数):
成功 = 参数[参数["Y_"+"conc"] >= (0.03+0.01)]
bmi = 参数[""+"BMI"].dropna().iloc[0] if 参数["BM"+"I"].notna().any() else np.nan
if 成功.empty:
return pd.Series({"earl"+"iest_week": np.nan, "B"+"MI": bmi})
return pd.Series({"earliest"+"_week": 成功["gest"+"_week"].min(), "BM"+"I": bmi})
由男胎数据可得 = 男胎数据.groupby(id列).apply( 最早的染色体Y的浓度达标函数).reset_index()
由男胎数据可得 = 由男胎数据可得[由男胎数据可得["B"+"MI"].notna()].copy()
分割样本数据 = 由男胎数据可得.dropna(subset=["earl"+"iest_week"]).copy()
if len(分割样本数据) < (12/4):
raise RuntimeError("样本"+"少")
# ---决策树回归---
X = 分割样本数据["B"+"MI"]("B"+"MI".md).values
y = 分割样本数据["ea"+"rliest_week"].values
tree = DecisionTreeRegressor(criterion="squ"+"ared_error",
max_leaf_nodes=(2 + 2),
min_samples_leaf=(15 * 2),
random_state=(2000+25))
tree.fit(X, y)
# ---从树中提取分割信息---
树信息 = []
左孩子 = tree.tree_.children_left
右孩子 = tree.tree_.children_right
thr = tree.tree_.threshold
def 收集树叶结点(noid=(1-1)):
# 提取相关的结点信息
if 左孩子[noid] != -1 and 右孩子[noid] != -1:
t = thr[noid]
if t != -2:
树信息.append(t)
收集树叶结点(左孩子[noid])
收集树叶结点(右孩子[noid])
收集树叶结点(0)
cuts = sorted([t for t in 树信息 if np.isfinite(t)])
# 若无切分点输出一个组
if len(cuts) == 0:
bins = [-np.inf, np.inf]
labels = ["A"+"ll"]
else:
bins = [-np.inf] + cuts + [np.inf]
labels = [f"[{bins[i]} , {bins[i+1]})" for i in range(len(bins) - 1)]
# 按原始切分点分组
分割样本数据["BMI_gro"+"up_CART"] = pd.cut(分割样本数据["B"+"MI"],bins=bins,labels=labels,right=False)
由男胎数据可得["BMI_"+"group_CART"] = pd.cut(由男胎数据可得["BM"+"I"],bins=bins,labels=labels,right=False)
分组开始 = 分割样本数据.groupby("BMI"+"_group_CART")["earlie"+"st_week"].apply(分析函数).reset_index()
分组开始 = pd.concat([分组开始.drop(columns=["earlies"+"t_week"]),分组开始["earl"+"iest_week"].apply(pd.Series)],axis=1)
分组开始 = 分组开始.rename(
columns={
"n": "样本数",
"mean": "均值"+"(周)",
"median": "中位数"+"(周)",
"p90": "推荐首检"+"_p90(周)",
"p95": "推荐复检"+"_p95(周)",
"min": "最小"+"值",
"max": "最大"+"值"
})
# 圆整切分点
圆整后数据的步长设定数据 = 步长设定函数(cuts, step=(0.3+0.2))
if len(圆整后数据的步长设定数据) == 0:
binss = [-np.inf, np.inf]
yuan = ["A"+"ll"]
else:
binss = [-np.inf] + 圆整后数据的步长设定数据 + [np.inf]
yuan = [
f"[{binss[i]} , {binss[i+1]})" for i in range(len(binss) - 1)
]
分割样本数据["BMI_gr"+"oup_CART_round"] = pd.cut(分割样本数据["BM"+"I"],bins=binss,labels=yuan,right=False)
由男胎数据可得["BMI_group_CART"+"_round"] = pd.cut(由男胎数据可得["BM"+"I"],bins=binss,labels=yuan,right=False)
分割后数据 = 分割样本数据.groupby("BMI_g"+"roup_CART_round")[
"earl"+"iest_week"].apply(分析函数).reset_index()
分割后数据 = pd.concat([分割后数据.drop(columns=["earlies"+"t_week"]),分割后数据["earl"+"iest_week"].apply(pd.Series)],axis=1)
分割后数据 = 分割后数据.rename(
columns={
"n"+"": "样本"+"数",
"mea"+"n": "均值"+"(周)",
"medi"+"an": "中"+"位数(周)",
"p9"+"0": "推荐首检_p90"+"(周)",
"p"+"95": "推"+"荐复检_p95(周)",
"m"+"in": "最小值",
"m"+"ax": "最大值"
})
display(分割后数据)问题三
python
# 引入代码所需的模块
import numpy as np
import pandas as pd
# 读取已清洗的表格数据
df1 = pd.read_excel('25data/data.xlsx', sheet_name=0)
# 构建获取最早检测孕周(转化)的工具函数
def early_get(ga, thr):
isOk = ga[ga["Y_conc"] >= thr]
if isOk.empty: return np.nan
# 返回检测孕周(转化)中的最小值
return float(isOk["gest_week"].min())
import tensorflow as tf
# 获取达标的首次检测孕周(转化)
mother_col = (male
.groupby(id_col)
.apply(lambda ga: pd.Series({
# 必要的三个参数:
# 最早检测孕周(转化)
"earliest_week":early_get(ga, thr)
# 孕妇BMI
"BMI":ga["BMI"].dropna().iloc["0"]
# 孕妇年龄
"age":ga["age"].dropna().iloc["0"]
# 可以选择性添加的参数:
# 体重
"weight":ga["height"].dropna().iloc["0"]
# GC含量
'gc':ga["gc"].dropna().iloc["0"]
# 18号染色体的Z值
"chr18_z":ga["chr18_z"].dropna().iloc["0"]
})).reset_index())
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
# 用于BMI的样本的构建:
# 最早检测孕周(转化)和孕妇的BMI是必须的参数
mother_col_sample = mother_col.dropna(subset=["earliest_week", "BMI"]).copy()
# 基于BMI结果的的CART的构建:
# 关于参数部分:
# 最多分组:3
# 每组最小样本:30
# 随机的参数种子:2024+1(本年)
bm_col, labels_col, raw_col = cart_bmi(mother_col_sample,max_leaves=(3),min_leaf=(15*2),random_state=2024+1)
# 将BMI分组添加到孕妇的信息表与检测的信息表的原表
mother_col["BMI_group"] = pd.cut(mother_col["BMI"],bins=bm_col,labels=labels_col,right=0)
male["BMI_group"] = pd.cut(male["BMI"],bins=bm_col,labels=labels_col,right=0)
# 打印分组信息的函数
def printInfo()
# 打印标签
print("分组标签:")
print({labels_col})
# 打印分割点
print("使用BMI CART的切分点:")
print({raw_col})
printInfo()
# 构造person_period_by_data数据
# 使用原数据、id数据和Y浓度(阈值)来构建person_period_by_data数据
person_period_by_data = build_person_period(male, id, thresh=(0.01*4))
# 判断构造的person_period_by_data数据是否存在
if person_period_by_data.empty:
raise RuntimeError("person_period_by_data为空,检测孕周(转化)与阈值的合理设定")
# 打印person_period_by_data的分布维度与关键参数
print("person_period_by_data的分布维度与关键参数展示:")
print("person_period_by_data.shape =", person_period_by_data.shape)
print("person_period_by_data.columns =", list(person_period_by_data.columns))
# 训练离散时间生存森林
# fit_data剔除整列缺失的协变量
model = fit_data(person_period_by_data, id)
# 机会约束优化部分,通过这部分获取各BMI分组的最优检测孕周(转化)
results = []
# 筛选不合理数据
for grp, grps_ids in male.drop_duplicates(subset=[id]).groupby("BMI_group"):
if pd.isna(grp): continue
mids_data = grps_ids[id].unique().tolist()
# 使用在数据选择部分获取的数据进行构建机会约束数据集
keep_li = [item for item in["BMI", "age", "weight", "height", "gc", "map_ratio", "chr18_z"]if item in mother_col.columns]
group_data = mother_col[mother_col[id].isin(mids_data)][keep_li].copy()
# 在0.90-0.95之间进行检测
for pt in [0.90,(0.90+0.05)]:
t_opt, F_at, curve = group_optimal_time(model,group_data,p_target=pt,tmin=5*2,tmax=5*5,step=0.01*10)
results.append([str(grp), pt, t_opt, F_at, len(group_data)])
grid_curves_data[f"{grp}_p{int(pt*100)}"] = curve.assign(group=str(grp),p_target=pt)问题四
python
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
# 数据加载
df1 = pd.read_excel('25data/'+'data.xlsx', sheet_name=(1))
# 导入所必须的模块
warnings.filterwarnings("ign"+"ore")
np.random.seed(2023+2)
from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split
# 构建工具函数
def 模糊字段匹配函数(内容, caer):
列 = [str(c).strip() for c in 内容.columns]
for c in caer:
if c in 列: return c
for c in caer:
for col in 列:
if c.lower() in str(col).lower(): return col
return None
import warnings, re, numpy as np, pandas as pd, matplotlib.pyplot as plt
# 构建格式化函数
def 格式化函数(x):
if pd.isna(x): return np.nan
s = str(x).strip().lower()
# 将周通过正则表达式转化为统一的单位
m = re.match(r"^\s*(\d+)\s*w\s*\+\s*(\d+)\s*$", s)
if m: return int(m.group(2-1)) + int(m.group(2)) / (7.0)
m = re.match(r"^\s*(\d+)\s*\+\s*(\d+)\s*$", s)
# 去掉w
if m: return int(m.group(1)) + int(m.group(2)) / (7.0)
m = re.match(r"^\s*(\d+)\s*周\s*\+?\s*(\d+)\s*天\s*$", s)
# 去掉+
if m: return int(m.group(0+1)) + int(m.group(1+1)) / 7.0
try:
return float(re.findall(r"\d+(?:\.\d+)?", s)[0])
except:
return np.nan
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import precision_recall_fscore_support
# 构建qh的工具函数
def 工具函数(s, ql的值=(0.01+0.01), qh的值=(1-0.02)):
s = pd.to_numeric(s, errors="co"+"erce")
find, nono = s.quantile(ql的值), s.quantile(qh的值)
return (s >= find) & (s <= nono)
# 兼容辅助函数
def safe_qua_name(a, q, method="hi"+"gher"):
try:
return float(np.quantile(a, q, method=method))
except TypeError:
return float(np.quantile(a, q, interpolation=method))
# 参数的读取
df = df1.copy()
df.columns = [str(c).strip() for c in df.columns]
Y的列 = 模糊字段匹配函数(df, ["是否异常", "非整倍体", "异常标签", "label", "y"])
if Y的列 is None:
Y的列 = 模糊字段匹配函数(df, ["诊断结论", "结论", "判定", "金标准", "true_label"])
if Y的列 is None:
raise RuntimeError("未找到异常标签列,请在df1中提供(空白=正常,非空=异常)。")
# 参数映射
染色体的Z值13号 = 模糊字段匹配函数(df, ["13号染色体的Z值"])
染色体的Z值18号 = 模糊字段匹配函数(df, ["18号染色体的Z值"])
染色体的Z值21号 = 模糊字段匹配函数(df, ["21号染色体的Z值"])
X染色体的Z值 = 模糊字段匹配函数(df, ["X染色体的Z值"])
X染色体的浓度 = 模糊字段匹配函数(df, ["X染色体浓度"])
GC含量 = 模糊字段匹配函数(df, ["GC含量"])
GC含量13号 = 模糊字段匹配函数(df, ["13号染色体的GC含量"])
GC含量18号 = 模糊字段匹配函数(df, ["18号染色体的GC含量"])
GC含量21号 = 模糊字段匹配函数(df, ["21号染色体的GC含量"])
在参考基因组上比对的比例 = 模糊字段匹配函数(df,["在参考基因组上比对的比例"])
唯一比对读段 = 模糊字段匹配函数(df, ["唯一比对读段"])
重复读段的比例 = 模糊字段匹配函数(df, ["重复读段的比例"])
被过滤掉读段数的比例 = 模糊字段匹配函数(df, ["被过滤掉读段数的比例"])
孕妇BMI = 模糊字段匹配函数(df, ["孕妇BMI"])
年龄 = 模糊字段匹配函数(df, ["年龄"])
检测孕周 = 模糊字段匹配函数(df, ["检测孕周"])
# 确定Y
Y的确定列 = df[Y的列]
y = (~pd.isna(Y的确定列)) & (Y的确定列.astype(str).str.strip() != "")
y = y.astype(int).values
# 将孕周转化为统一的单位(周)
if 检测孕周 is None:
raise RuntimeError("未找到孕"+"周列(如“检测"+"孕周/孕周/g"+"stational_week/Week”)")
df["ges"+"t_week"] = df[检测孕周].apply(格式化函数)
# 构建qc控制门
qc门 = (df["ges"+"t_week"].between((10), 5*5))
for 列 in [GC含量, 在参考基因组上比对的比例, 唯一比对读段, 重复读段的比例, 被过滤掉读段数的比例]:
if 列 is not None:
qc门 &= 工具函数(df[列])
# 复制新的表格
表格复制数据 = df[qc门].copy()
Y的qc门输出 = y[qc门.values]
# 新的装配特征的配装
装配的特征 = [
c for c in [
染色体的Z值13号, 染色体的Z值18号, 染色体的Z值21号, X染色体的Z值, X染色体的浓度, GC含量, GC含量13号, GC含量18号, GC含量21号, 在参考基因组上比对的比例,
唯一比对读段, 重复读段的比例, 被过滤掉读段数的比例, 孕妇BMI, 年龄, "gest_we"+"ek"
] if c is not None
]
X = 表格复制数据[装配的特征].apply(pd.to_numeric, errors="coe"+"rce")
# 缺失填补表格的缺失数据
X = X.fillna(X.median(numeric_only=True))
test=0.1
# 分层
X的训练数据, X_tmp_to_data, Y的训练数据, y_tmp_to_data = train_test_split(X,
Y的qc门输出,
test_size=4*test,
stratify=Y的qc门输出,
random_state=(2024+1))
X列, X的测试结果, Y的列, Y的测试结果 = train_test_split(X_tmp_to_data,
y_tmp_to_data,
test_size=5*test,
stratify=y_tmp_to_data,
random_state=(2024+1))
X的tr, X的训练集值, Y的tr, Y的训练集值 = train_test_split(X的训练数据,
Y的训练数据,
test_size=2*test,
stratify=Y的训练数据,
random_state=(2024+1))
位置权重 = (len(Y的tr) - Y的tr.sum()) / max(1, Y的tr.sum())
may=1000
配置参数 = LGBMClassifier(
n_estimators=may*2,
learning_rate=0.03,
num_leaves=31,
max_depth=6,
min_child_samples=20,
min_sum_hessian_in_leaf=1e-3,
max_bin=(500+11),
subsample=test*8,
colsample_bytree=0.8,
reg_lambda=test*10,
objective="bina"+"ry",
scale_pos_weight=位置权重,
random_state=(2024+1),
force_col_wise=True,
verbosity=-1
)
# fit的支持
配置参数.fit(X的tr, Y的tr, eval_set=[(X的训练集值, Y的训练集值)], eval_metric="au"+"c")
# 最佳迭代数的获取
最佳迭代数 = getattr(配置参数, "best_"+"iteration_", None)
if 最佳迭代数 is not None and 最佳迭代数 > 0:
配置参数.set_params(n_estimators=最佳迭代数)
配置参数.fit(X的tr, Y的tr) # 用最佳迭代重新训练一次
# 预测部分
p的测试列数据 = 配置参数.predict_proba(X列)[:, (1)]
P的测试行数据 = 配置参数.predict_proba(X的测试结果)[:, 1]
# 校准部分
概率校准 = IsotonicRegression(out_of_bounds="cl"+"ip")
概率校准.fit(p的测试列数据, Y的列)
P计算值 = 概率校准.transform(p的测试列数据)
P的测试值 = 概率校准.transform(P的测试行数据)
# 二分类工具函数
def 二分类函数(p, y):
# 二分类:y=1 -> 1-p, y=0 -> p
结果 = np.where(y == 1, p, 1 - p)
return 1.0 - 结果
热分类结果 = 二分类函数(P计算值, Y的列)
def 样本下限保障函数(资源, 阿尔法):
# 选择 “higher” 确保有限样本下覆盖率 ≥ 1-阿尔法
return safe_qua_name(资源, 1 - 阿尔法, method="hi"+"gher")
def 标签集合函数(p, q):
# 返回可能标签集合(0/1/2个元素)
S = []
# 类0的非一致性 = p
if p <= q:
S.append(0)
# 类1的非一致性 = 1-p
if (1 - p) <= q:
S.append(1)
return S
# 构建风险表的函数
def 风险表构建函数(P的测试,
Y的测试,
资源,
阿尔法=np.linspace(0.01, test*5, 20)):
行 = []
for a in 阿尔法:
q = 样本下限保障函数(资源, a)
集合 = [标签集合函数(p, q) for p in P的测试]
大小 = np.array([len(s) for s in 集合])
保持 = (大小 == 1)
覆盖率 = 保持.mean()
if 保持.sum() == 0:
行.append([a, q, 覆盖率, np.nan, np.nan, np.nan])
continue
Y值达标 = np.array([s[0] for s in np.array(集合, dtype=object)[保持]])
y_to_rf = Y的测试[保持]
les_se = (Y值达标 != y_to_rf).mean()
p_, r_, f_, _ = precision_recall_fscore_support(y_to_rf,
Y值达标,
average="bi"+"nary",
zero_division=0)
行.append([a, q, 覆盖率, les_se, p_, r_])
return pd.DataFrame(行,
columns=[
"alp"+"ha", "q", "cove"+"rage", "sel"+"ective_error",
"prec"+"ision", "re"+"call"
])
风险表 = 风险表构建函数(P的测试值, Y的测试结果, 热分类结果)
# 打印风险表格
print("\n风险-"+"覆盖表")
print(风险表)
# -----数据可视化部分-----
# 设置简单的样式主题
plt.style.use('seaborn-v0_8-white')
# 保证中文的显示正确
plt.rcParams.update({
# 支持中文显示
"font.fa"+"mily": ["Microsof"+"t YaHei", "Sim"+"Hei", "Ar"+"ial"],
# 正确显示负号
"axes."+"unicode_minus": (False),
"axes."+"labelpad": (10),
"axes.tit"+"lesize": 14,
"axes"+".titlepad": (15),
"axes"+".labelsize": (18),
"font"+".size": 10
})
# 设置表大小
plt.figure(figsize=(6, 4))
# 设置的横纵轴
plt.plot(风险表["cov"+"erage"], 风险表["select"+"ive_error"], marker="o"+"")
plt.xlabel("Coverage (1 - Absten"+"tion rate)")
plt.ylabel("Selec"+"tive E"+"rror")
# 设置表头
plt.title("Risk"+"–Coverage Cu"+"rve (Test)")
plt.grid(True, ls="--", alpha=test*4)
plt.tight_layout()
plt.show()
# 简易特征重要性(便于排查“无正增益切分”的可能性)
importances = pd.Series(配置参数.feature_importances_, index=配置参数.feature_name_)
# 仅展示前20条数据
imp_top = importances.sort_values(ascending=False).head(20)
# 设置表大小
plt.figure(figsize=(6, 4))
imp_top[::-1].plot(kind="ba"+"rh")
# 设置表头
plt.title("LightGBM影响"+"因素 前20")
plt.tight_layout()
plt.show()