|
|
import io
|
|
|
import warnings
|
|
|
from pathlib import Path
|
|
|
from typing import IO, Optional, Union
|
|
|
from urllib.parse import quote
|
|
|
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from fastapi import FastAPI, File, HTTPException, UploadFile
|
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
from sklearn.exceptions import DataConversionWarning
|
|
|
|
|
|
MODEL_RANGES = {
|
|
|
"CAK_仰焊": "12-Infinity",
|
|
|
"CV17.5_平焊": "3-Infinity",
|
|
|
"EV15_立焊": "13-Infinity",
|
|
|
"SX_平焊": "14-Infinity",
|
|
|
"CA_平焊": "0-60",
|
|
|
"CA_横焊": "0-75",
|
|
|
"CAK_平焊": "0-85",
|
|
|
"CAK_立焊": "0-85",
|
|
|
"CAV_平焊": "0-68",
|
|
|
"CAY_平焊": "0-70",
|
|
|
"CV20_平焊": "4-80",
|
|
|
"EV17.5_立焊": "0-43",
|
|
|
"SY_平焊": "0-53",
|
|
|
}
|
|
|
|
|
|
warnings.filterwarnings("ignore", message="X does not have valid feature names, but.*")
|
|
|
warnings.filterwarnings("ignore", category=DeprecationWarning,
|
|
|
message="Conversion of an array with ndim > 0 to a scalar is deprecated.*")
|
|
|
warnings.filterwarnings("ignore", category=DataConversionWarning)
|
|
|
|
|
|
|
|
|
def extract_headers_from_first_three_rows(
|
|
|
excel_source: Union[str, Path, IO[bytes]], sheet_name: Union[int, str] = 0
|
|
|
):
|
|
|
"""
|
|
|
从 Excel 前两行中提取每列的有效标题,支持合并单元格场景:
|
|
|
- 每列仅取一个有效标题(优先第1行,若第1行为空则用第0行)
|
|
|
- 自动跳过空值、'nan'、纯空格
|
|
|
- 支持合并单元格导致的列数据缺失
|
|
|
"""
|
|
|
# Step 1: 读取前两行(raw,无列名)
|
|
|
if hasattr(excel_source, "seek"):
|
|
|
excel_source.seek(0)
|
|
|
|
|
|
df_raw = pd.read_excel(
|
|
|
excel_source, sheet_name=sheet_name, header=None, nrows=3 # 只读前三行
|
|
|
)
|
|
|
|
|
|
if hasattr(excel_source, "seek"):
|
|
|
excel_source.seek(0)
|
|
|
|
|
|
max_cols = df_raw.shape[1]
|
|
|
headers = []
|
|
|
|
|
|
for col_idx in range(max_cols):
|
|
|
val0 = df_raw.iloc[0, col_idx] if len(df_raw) > 0 else None
|
|
|
val1 = df_raw.iloc[1, col_idx] if len(df_raw) > 1 else None
|
|
|
val2 = df_raw.iloc[2, col_idx] if len(df_raw) > 2 else None
|
|
|
|
|
|
# 定义“有效值”判断函数
|
|
|
def is_valid(v):
|
|
|
if pd.isna(v):
|
|
|
return False
|
|
|
s = str(v).strip()
|
|
|
return s != "" and s.lower() not in ("nan", "none", "null")
|
|
|
|
|
|
# 策略:优先使用第1行(第二行),因为合并单元格常导致第0行为空
|
|
|
# 如果第1行有效 → 用第1行
|
|
|
# 如果第1行无效但第0行有效 → 用第0行
|
|
|
# 都无效 → 用默认名
|
|
|
if is_valid(val1):
|
|
|
header = str(val1).strip()
|
|
|
elif is_valid(val0):
|
|
|
header = str(val0).strip()
|
|
|
elif is_valid(val2):
|
|
|
header = str(val2).strip()
|
|
|
else:
|
|
|
header = f"Unnamed_{col_idx}"
|
|
|
|
|
|
headers.append(header)
|
|
|
|
|
|
print("✅ 提取到的表头:", headers)
|
|
|
return headers
|
|
|
|
|
|
|
|
|
def read_excel_with_irregular_headers(
|
|
|
excel_source: Union[str, Path, IO[bytes]], sheet_name: Union[int, str] = 0
|
|
|
):
|
|
|
"""
|
|
|
完整读取:提取表头 + 读取剩余数据
|
|
|
"""
|
|
|
headers = extract_headers_from_first_three_rows(excel_source, sheet_name)
|
|
|
|
|
|
if hasattr(excel_source, "seek"):
|
|
|
excel_source.seek(0)
|
|
|
|
|
|
# 读取从第4行开始的数据(跳过前3行)
|
|
|
df = pd.read_excel(
|
|
|
excel_source,
|
|
|
sheet_name=sheet_name,
|
|
|
header=None,
|
|
|
skiprows=3, # 跳过前三行(标题行)
|
|
|
names=headers, # 使用提取的标题
|
|
|
)
|
|
|
|
|
|
# 可选:删除全空列
|
|
|
df = df.dropna(how="all", axis=1)
|
|
|
print(f"✅ 读取数据形状: {df.shape}")
|
|
|
return df
|
|
|
|
|
|
|
|
|
def split_by_plus(s: str) -> list[str]:
|
|
|
if not isinstance(s, str):
|
|
|
raise TypeError("输入必须是字符串")
|
|
|
|
|
|
# 按 '+' 分割,去除每部分首尾空格,过滤空字符串
|
|
|
return [part.strip() for part in s.split("+") if part.strip()]
|
|
|
|
|
|
|
|
|
# %%
|
|
|
import joblib
|
|
|
|
|
|
# 使用绝对路径来确保模型文件能够被正确找到
|
|
|
folder_path = Path(__file__).parent / "degree3"
|
|
|
|
|
|
|
|
|
def load_all_models():
|
|
|
"""
|
|
|
加载所有模型到 model_list
|
|
|
"""
|
|
|
global model_list
|
|
|
# 清空列表以确保重新加载
|
|
|
model_list = []
|
|
|
|
|
|
print(f"[MODEL_LOADER] Attempting to load models...")
|
|
|
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Attempting to load models. Initial model_list length: {len(model_list)}"
|
|
|
)
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Target model folder (relative to WORKDIR /app): {folder_path}"
|
|
|
)
|
|
|
resolved_folder_path = folder_path.resolve()
|
|
|
print(f"[MODEL_LOADER] Target model folder (absolute): {resolved_folder_path}")
|
|
|
|
|
|
if not resolved_folder_path.exists():
|
|
|
print(
|
|
|
f"[MODEL_LOADER] ERROR: Model directory not found at {resolved_folder_path}"
|
|
|
)
|
|
|
return
|
|
|
|
|
|
if not resolved_folder_path.is_dir():
|
|
|
print(f"[MODEL_LOADER] ERROR: Path {resolved_folder_path} is not a directory.")
|
|
|
return
|
|
|
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Model directory found at {resolved_folder_path}. Searching for files with glob pattern: *3.pkl"
|
|
|
)
|
|
|
|
|
|
files_found = list(resolved_folder_path.glob("*3.pkl"))
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Files found by glob ('{resolved_folder_path}/*3.pkl'): {[str(f.name) for f in files_found]}"
|
|
|
)
|
|
|
|
|
|
if not files_found:
|
|
|
print(
|
|
|
f"[MODEL_LOADER] WARNING: No files matched the glob pattern '*3.pkl' in {resolved_folder_path}. Listing all items in directory as a fallback check:"
|
|
|
)
|
|
|
try:
|
|
|
all_items = list(resolved_folder_path.iterdir())
|
|
|
print(
|
|
|
f"[MODEL_LOADER] All items in {resolved_folder_path}: {[str(item.name) for item in all_items]}"
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(
|
|
|
f"[MODEL_LOADER] ERROR: Could not list items in {resolved_folder_path}: {e}"
|
|
|
)
|
|
|
return
|
|
|
|
|
|
temp_model_list = [] # 使用临时列表,成功加载完再赋值给全局
|
|
|
for file_path_obj in files_found:
|
|
|
if file_path_obj.is_file():
|
|
|
try:
|
|
|
filename = file_path_obj.name
|
|
|
# 移除文件扩展名 .pkl
|
|
|
model_name = filename[:-4]
|
|
|
# 移除 _model_degree3 后缀
|
|
|
if model_name.endswith("_model_degree3"):
|
|
|
model_name = model_name[:-14]
|
|
|
# 移除 .xlsx 后缀(如果存在)
|
|
|
if ".xlsx" in model_name:
|
|
|
model_name = model_name.split(".xlsx")[0]
|
|
|
|
|
|
loaded_model = joblib.load(file_path_obj)
|
|
|
temp_model_list.append(
|
|
|
{
|
|
|
"name": model_name,
|
|
|
"model": loaded_model,
|
|
|
"filename": file_path_obj.name,
|
|
|
}
|
|
|
)
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Successfully loaded model from {file_path_obj.name} as: {model_name}"
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(
|
|
|
f"[MODEL_LOADER] ERROR: Failed to load model {file_path_obj.name}: {e}"
|
|
|
)
|
|
|
|
|
|
model_list = temp_model_list # 赋值给全局变量
|
|
|
print(
|
|
|
f"[MODEL_LOADER] Finished loading models. Total models loaded: {len(model_list)}"
|
|
|
)
|
|
|
if not model_list:
|
|
|
print(
|
|
|
f"[MODEL_LOADER] WARNING: model_list is empty after attempting to load models."
|
|
|
)
|
|
|
|
|
|
|
|
|
def get_welding_coefficient(row):
|
|
|
coefficient = 0.0
|
|
|
global model_list
|
|
|
|
|
|
# 添加调试日志,查看model_list的长度和内容
|
|
|
print(f"[DEBUG] model_list length: {len(model_list)}")
|
|
|
loaded_model_names = [model["name"] for model in model_list]
|
|
|
print(f"[DEBUG] Loaded models: {loaded_model_names}")
|
|
|
|
|
|
model_map = {model["name"]: model for model in model_list}
|
|
|
lenth = row.长度_m
|
|
|
welding_type = row.坡口代码
|
|
|
welding_position = row.焊接位置
|
|
|
thickness = row.板厚_mm
|
|
|
thickness_value = float(thickness)
|
|
|
thickness_array = np.array([[thickness_value]])
|
|
|
|
|
|
# 处理焊接位置,统一命名
|
|
|
try:
|
|
|
if "横" in welding_position:
|
|
|
welding_position = "横焊"
|
|
|
elif "立" in welding_position:
|
|
|
welding_position = "立焊"
|
|
|
elif "仰" in welding_position:
|
|
|
welding_position = "仰焊"
|
|
|
elif "平" in welding_position:
|
|
|
welding_position = "平焊"
|
|
|
except Exception as e:
|
|
|
print(f"行⚠️ 处理焊接位置时出错: {welding_position}, 错误: {e}")
|
|
|
welding_position = welding_position # 保持原值
|
|
|
|
|
|
# 处理坡口代码,支持多个坡口代码相加的情况
|
|
|
try:
|
|
|
welding_type = split_by_plus(welding_type)
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ 处理坡口代码时出错: {welding_type}, 错误: {e}")
|
|
|
welding_type = [str(welding_type)] # 确保后续遍历
|
|
|
|
|
|
if isinstance(welding_type, str):
|
|
|
welding_type = [welding_type]
|
|
|
|
|
|
for item in welding_type:
|
|
|
try:
|
|
|
if "min/m" in item:
|
|
|
# 如果已经是系数形式,直接累加
|
|
|
coefficient += float(item.replace("min/m", "").strip())
|
|
|
else:
|
|
|
model_name = f"{item}_{welding_position}"
|
|
|
selected_model_info = model_map.get(model_name)
|
|
|
if selected_model_info is None:
|
|
|
print(f"⚠️ 处理坡口代码模型不存在: {model_name}, 错误: 模型未加载")
|
|
|
coefficient = float('nan')
|
|
|
break
|
|
|
model = selected_model_info["model"]
|
|
|
range_str = MODEL_RANGES.get(model_name, "0-Infinity")
|
|
|
min_val_str, max_val_str = range_str.split("-")
|
|
|
min_val = float(min_val_str)
|
|
|
max_val = None if max_val_str == "Infinity" else float(max_val_str)
|
|
|
if thickness_value < min_val or (
|
|
|
max_val is not None and thickness_value > max_val
|
|
|
):
|
|
|
print(
|
|
|
f"⚠️ 警告: 厚度 {thickness_value}mm 超出模型 '{model_name}' 的适用范围 ({range_str} mm)。"
|
|
|
)
|
|
|
coefficient = float('nan')
|
|
|
break
|
|
|
else:
|
|
|
try:
|
|
|
prediction = float(model.predict(thickness_array)[0])
|
|
|
coefficient += prediction
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ 模型预测失败: {model_name}, 错误: {e}")
|
|
|
coefficient = float('nan')
|
|
|
break
|
|
|
except Exception as e:
|
|
|
coefficient = float('nan')
|
|
|
print(
|
|
|
f"⚠️ 处理坡口代码模型不存在: {item}_{welding_position}, 错误: {e}"
|
|
|
)
|
|
|
# print(coefficient)
|
|
|
# print(type(coefficient))
|
|
|
coefficient = float(coefficient)
|
|
|
print(
|
|
|
f"长度: {lenth}, 焊接类型: {welding_type},焊接位置: {welding_position}, 板厚: {thickness}"
|
|
|
)
|
|
|
|
|
|
return coefficient
|
|
|
|
|
|
|
|
|
def get_welding_time(row):
|
|
|
coefficient = row.焊接系数
|
|
|
length = row.长度_m
|
|
|
welding_time = coefficient * length
|
|
|
return welding_time
|
|
|
|
|
|
|
|
|
def save_df_to_excel(df, output_file_path):
|
|
|
df.to_excel(output_file_path, index=False)
|
|
|
|
|
|
|
|
|
def build_prediction_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""清洗原始数据并计算焊接系数、工时。"""
|
|
|
df = df.copy()
|
|
|
df["序号"] = df.reset_index(drop=True).index + 1
|
|
|
df_clean = df.copy()
|
|
|
df_clean.columns = [
|
|
|
col.replace("(", "_")
|
|
|
.replace(")", "_")
|
|
|
.replace("(", "_")
|
|
|
.replace(")", "_")
|
|
|
.strip("_")
|
|
|
.replace("__", "_")
|
|
|
for col in df.columns
|
|
|
]
|
|
|
|
|
|
df_without_last = df_clean.iloc[:-1].copy() # 去掉最后一行合计
|
|
|
df_without_last["焊接系数"] = df_without_last.apply(
|
|
|
lambda row: get_welding_coefficient(row), axis=1
|
|
|
)
|
|
|
df_without_last["焊接工时"] = df_without_last.apply(
|
|
|
lambda row: get_welding_time(row), axis=1
|
|
|
)
|
|
|
|
|
|
total_row = {col: np.nan for col in df_without_last.columns}
|
|
|
total_row["序号"] = "总计"
|
|
|
total_row["焊接工时"] = df_without_last["焊接工时"].sum()
|
|
|
|
|
|
result_df = pd.concat([df_without_last, pd.DataFrame([total_row])], ignore_index=True)
|
|
|
return result_df
|
|
|
|
|
|
|
|
|
def _prepare_excel_source(
|
|
|
excel_source: Union[str, Path, IO[bytes], bytes, bytearray]
|
|
|
) -> tuple[Optional[Path], Optional[bytes]]:
|
|
|
if isinstance(excel_source, Path):
|
|
|
return excel_source, None
|
|
|
if isinstance(excel_source, str):
|
|
|
return Path(excel_source), None
|
|
|
if isinstance(excel_source, (bytes, bytearray)):
|
|
|
return None, bytes(excel_source)
|
|
|
if hasattr(excel_source, "read"):
|
|
|
try:
|
|
|
if hasattr(excel_source, "seek"):
|
|
|
excel_source.seek(0)
|
|
|
data = excel_source.read()
|
|
|
excel_source.seek(0)
|
|
|
else:
|
|
|
data = excel_source.read()
|
|
|
except Exception as exc: # pragma: no cover - 防御性
|
|
|
raise ValueError("无法读取 Excel 源数据") from exc
|
|
|
return None, data
|
|
|
raise TypeError("excel_source 必须是路径、字节流或支持 read() 的对象")
|
|
|
|
|
|
|
|
|
def _make_excel_io(path_like: Optional[Path], excel_bytes: Optional[bytes]):
|
|
|
if path_like is not None:
|
|
|
return path_like
|
|
|
if excel_bytes is None:
|
|
|
raise ValueError("缺少 Excel 数据")
|
|
|
return io.BytesIO(excel_bytes)
|
|
|
|
|
|
|
|
|
def process_excel(
|
|
|
excel_source: Union[str, Path, IO[bytes], bytes, bytearray],
|
|
|
sheet_name: Optional[Union[int, str]] = 0,
|
|
|
) -> Union[pd.DataFrame, dict[str, pd.DataFrame]]:
|
|
|
"""从 Excel 读取数据并完成完整的预测流程。
|
|
|
|
|
|
当 sheet_name 为 None 时,处理工作簿内所有工作表并返回 {sheet_name: DataFrame}。
|
|
|
"""
|
|
|
|
|
|
# 在每次预测之前重新加载模型,确保模型能够被正确加载
|
|
|
load_all_models()
|
|
|
|
|
|
path_like, excel_bytes = _prepare_excel_source(excel_source)
|
|
|
|
|
|
def get_source():
|
|
|
return _make_excel_io(path_like, excel_bytes)
|
|
|
|
|
|
if sheet_name is None:
|
|
|
with pd.ExcelFile(get_source()) as excel_file:
|
|
|
sheet_names = excel_file.sheet_names
|
|
|
|
|
|
results: dict[str, pd.DataFrame] = {}
|
|
|
for name in sheet_names:
|
|
|
df = read_excel_with_irregular_headers(get_source(), sheet_name=name)
|
|
|
results[name] = build_prediction_dataframe(df)
|
|
|
return results
|
|
|
|
|
|
df = read_excel_with_irregular_headers(get_source(), sheet_name=sheet_name)
|
|
|
return build_prediction_dataframe(df)
|
|
|
|
|
|
|
|
|
model_list = []
|
|
|
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/部件.xlsx")
|
|
|
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/搭载.xlsx")
|
|
|
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/分段.xlsx")
|
|
|
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/零件.xlsx")
|
|
|
# source_excel_path = Path("./all.xlsx")
|
|
|
|
|
|
app = FastAPI(title="Welding Time Prediction API")
|
|
|
|
|
|
|
|
|
@app.on_event("startup")
|
|
|
def startup_event():
|
|
|
load_all_models()
|
|
|
|
|
|
|
|
|
@app.get("/models", summary="获取已加载的模型列表")
|
|
|
def get_models():
|
|
|
"""
|
|
|
获取已加载的模型列表,用于调试和检查模型加载状态
|
|
|
"""
|
|
|
global model_list
|
|
|
return {
|
|
|
"total_models": len(model_list),
|
|
|
"models": [{
|
|
|
"name": model["name"],
|
|
|
"filename": model["filename"]
|
|
|
} for model in model_list]
|
|
|
}
|
|
|
|
|
|
|
|
|
def _sanitize_sheet_name(name: Union[str, int], existing: set[str]) -> str:
|
|
|
base = str(name) if str(name).strip() else "Sheet"
|
|
|
base = base[:31]
|
|
|
candidate = base or "Sheet"
|
|
|
counter = 1
|
|
|
while candidate in existing:
|
|
|
suffix = f"_{counter}"
|
|
|
candidate = (base[: 31 - len(suffix)] if base else "Sheet") + suffix
|
|
|
counter += 1
|
|
|
return candidate
|
|
|
|
|
|
|
|
|
def _df_to_records(df: pd.DataFrame) -> list[dict[str, object]]:
|
|
|
cleaned = df.replace({np.inf: None, -np.inf: None})
|
|
|
cleaned = cleaned.where(pd.notnull(cleaned), None)
|
|
|
records: list[dict[str, object]] = []
|
|
|
for raw in cleaned.to_dict(orient="records"):
|
|
|
normalized: dict[str, object] = {}
|
|
|
for key, value in raw.items():
|
|
|
if isinstance(value, np.generic):
|
|
|
value = value.item()
|
|
|
if isinstance(value, float) and (np.isnan(value) or np.isinf(value)):
|
|
|
value = None
|
|
|
normalized[key] = value
|
|
|
records.append(normalized)
|
|
|
return records
|
|
|
|
|
|
|
|
|
@app.post("/predict", summary="上传 Excel 并返回预测结果表")
|
|
|
async def predict(
|
|
|
file: UploadFile = File(...),
|
|
|
sheet_name: Optional[str] = None,
|
|
|
response_format: str = "json",
|
|
|
):
|
|
|
if not file.filename.lower().endswith((".xls", ".xlsx")):
|
|
|
raise HTTPException(status_code=400, detail="仅支持 Excel 文件 (.xls/.xlsx)")
|
|
|
|
|
|
contents = await file.read()
|
|
|
if not contents:
|
|
|
raise HTTPException(status_code=400, detail="上传文件为空")
|
|
|
|
|
|
try:
|
|
|
resolved_sheet: Optional[Union[int, str]] = None
|
|
|
if sheet_name is not None and sheet_name.lower() != "all":
|
|
|
if sheet_name.isdigit():
|
|
|
resolved_sheet = int(sheet_name)
|
|
|
else:
|
|
|
resolved_sheet = sheet_name
|
|
|
result_obj = process_excel(contents, sheet_name=resolved_sheet)
|
|
|
except Exception as exc:
|
|
|
raise HTTPException(status_code=500, detail=f"处理 Excel 时发生错误: {exc}") from exc
|
|
|
|
|
|
response_format = response_format.lower()
|
|
|
if response_format not in {"json", "excel"}:
|
|
|
raise HTTPException(status_code=400, detail="response_format 只支持 'json' 或 'excel'")
|
|
|
|
|
|
if response_format == "json":
|
|
|
if isinstance(result_obj, dict):
|
|
|
payload = {str(name): _df_to_records(df) for name, df in result_obj.items()}
|
|
|
else:
|
|
|
key = str(resolved_sheet if resolved_sheet is not None else "Sheet1")
|
|
|
payload = {key: _df_to_records(result_obj)}
|
|
|
return JSONResponse(content=payload)
|
|
|
|
|
|
output_stream = io.BytesIO()
|
|
|
if isinstance(result_obj, dict):
|
|
|
existing_names: set[str] = set()
|
|
|
with pd.ExcelWriter(output_stream, engine="openpyxl") as writer:
|
|
|
for original_name, df in result_obj.items():
|
|
|
sheet_safe = _sanitize_sheet_name(original_name, existing_names)
|
|
|
existing_names.add(sheet_safe)
|
|
|
df.to_excel(writer, sheet_name=sheet_safe, index=False)
|
|
|
else:
|
|
|
result_obj.to_excel(output_stream, index=False)
|
|
|
|
|
|
output_stream.seek(0)
|
|
|
|
|
|
download_name = f"{Path(file.filename).stem}_预测结果.xlsx"
|
|
|
ascii_fallback = "prediction.xlsx"
|
|
|
quoted_name = quote(download_name)
|
|
|
content_disposition = (
|
|
|
f"attachment; filename={ascii_fallback}; filename*=UTF-8''{quoted_name}"
|
|
|
)
|
|
|
|
|
|
headers = {"Content-Disposition": content_disposition}
|
|
|
|
|
|
return StreamingResponse(
|
|
|
output_stream,
|
|
|
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
|
headers=headers,
|
|
|
)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
print("[TEST] Script started...")
|
|
|
|
|
|
# 检查degree3文件夹是否存在
|
|
|
print(f"[TEST] Current file path: {__file__}")
|
|
|
print(f"[TEST] Folder path: {folder_path}")
|
|
|
print(f"[TEST] Folder exists: {folder_path.exists()}")
|
|
|
print(f"[TEST] Folder is directory: {folder_path.is_dir()}")
|
|
|
|
|
|
# 列出degree3文件夹中的文件
|
|
|
if folder_path.exists() and folder_path.is_dir():
|
|
|
files = list(folder_path.glob("*3.pkl"))
|
|
|
print(f"[TEST] Number of model files found: {len(files)}")
|
|
|
for file in files[:10]: # 显示前10个文件
|
|
|
print(f"[TEST] Found model file: {file.name}")
|
|
|
|
|
|
# 加载模型
|
|
|
load_all_models()
|
|
|
|
|
|
# 添加调试信息,显示加载的模型名称
|
|
|
print(f"[DEBUG] Total models loaded: {len(model_list)}")
|
|
|
for model in model_list:
|
|
|
print(f"[DEBUG] Loaded model: {model['name']} from {model['filename']}")
|
|
|
|
|
|
# 检查是否包含CV17.5相关的模型
|
|
|
cv175_models = [model['name'] for model in model_list if 'CV17.5' in model['name']]
|
|
|
print(f"[DEBUG] CV17.5 models found: {cv175_models}")
|
|
|
|
|
|
# 检查是否包含平焊相关的模型
|
|
|
pinghan_models = [model['name'] for model in model_list if '平焊' in model['name']]
|
|
|
print(f"[DEBUG] 平焊 models found: {pinghan_models}")
|
|
|
# result_obj = process_excel(source_excel_path, sheet_name=0)
|
|
|
# output_file_path = source_excel_path.parent.joinpath(
|
|
|
# source_excel_path.stem + "_预测结果.xlsx"
|
|
|
# )
|
|
|
#
|
|
|
# if isinstance(result_obj, dict):
|
|
|
# with pd.ExcelWriter(output_file_path, engine="openpyxl") as writer:
|
|
|
# for sheet_name, df in result_obj.items():
|
|
|
# df.to_excel(writer, sheet_name=str(sheet_name)[:31] or "Sheet", index=False)
|
|
|
# else:
|
|
|
# print(len(result_obj))
|
|
|
# save_df_to_excel(result_obj, output_file_path)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|