推送源码

main
pangbai 6 hours ago
parent d4949b0229
commit c32e95878f

@ -0,0 +1,25 @@
FROM python:3.11-slim
WORKDIR /app
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
ENV PYTHONUNBUFFERED=1
RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list.d/debian.sources && \
apt-get update && apt-get install -y --no-install-recommends \
gcc \
g++ \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
COPY predict_with_excel.py .
COPY degree3/ ./degree3/
EXPOSE 8000
CMD ["uvicorn", "predict_with_excel:app", "--host", "0.0.0.0", "--port", "8000"]

@ -0,0 +1,15 @@
# version: '3.8'
services:
welding-app:
build: .
ports:
- "8000:8000"
# volumes:
# - ./degree3:/app/degree3
# - ./data:/app/data
# - ./logs:/app/logs
environment:
- PYTHONUNBUFFERED=1
- TZ=Asia/Shanghai
restart: always

@ -0,0 +1,539 @@
import io
import warnings
from pathlib import Path
from typing import IO, Optional, Union
from urllib.parse import quote
import numpy as np
import pandas as pd
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from sklearn.exceptions import DataConversionWarning
MODEL_RANGES = {
"CAK_仰焊": "12-Infinity",
"CV17.5_平焊": "3-Infinity",
"EV15_立焊": "13-Infinity",
"SX_平焊": "14-Infinity",
"CA_平焊": "0-60",
"CA_横焊": "0-75",
"CAK_平焊": "0-85",
"CAK_立焊": "0-85",
"CAV_平焊": "0-68",
"CAY_平焊": "0-70",
"CV20_平焊": "4-80",
"EV17.5_立焊": "0-43",
"SY_平焊": "0-53",
}
warnings.filterwarnings("ignore", message="X does not have valid feature names, but.*")
warnings.filterwarnings("ignore", category=DeprecationWarning,
message="Conversion of an array with ndim > 0 to a scalar is deprecated.*")
warnings.filterwarnings("ignore", category=DataConversionWarning)
def extract_headers_from_first_three_rows(
excel_source: Union[str, Path, IO[bytes]], sheet_name: Union[int, str] = 0
):
"""
Excel 前两行中提取每列的有效标题支持合并单元格场景
- 每列仅取一个有效标题优先第1行若第1行为空则用第0行
- 自动跳过空值'nan'纯空格
- 支持合并单元格导致的列数据缺失
"""
# Step 1: 读取前两行raw无列名
if hasattr(excel_source, "seek"):
excel_source.seek(0)
df_raw = pd.read_excel(
excel_source, sheet_name=sheet_name, header=None, nrows=3 # 只读前三行
)
if hasattr(excel_source, "seek"):
excel_source.seek(0)
max_cols = df_raw.shape[1]
headers = []
for col_idx in range(max_cols):
val0 = df_raw.iloc[0, col_idx] if len(df_raw) > 0 else None
val1 = df_raw.iloc[1, col_idx] if len(df_raw) > 1 else None
val2 = df_raw.iloc[2, col_idx] if len(df_raw) > 2 else None
# 定义“有效值”判断函数
def is_valid(v):
if pd.isna(v):
return False
s = str(v).strip()
return s != "" and s.lower() not in ("nan", "none", "null")
# 策略优先使用第1行第二行因为合并单元格常导致第0行为空
# 如果第1行有效 → 用第1行
# 如果第1行无效但第0行有效 → 用第0行
# 都无效 → 用默认名
if is_valid(val1):
header = str(val1).strip()
elif is_valid(val0):
header = str(val0).strip()
elif is_valid(val2):
header = str(val2).strip()
else:
header = f"Unnamed_{col_idx}"
headers.append(header)
print("✅ 提取到的表头:", headers)
return headers
def read_excel_with_irregular_headers(
excel_source: Union[str, Path, IO[bytes]], sheet_name: Union[int, str] = 0
):
"""
完整读取提取表头 + 读取剩余数据
"""
headers = extract_headers_from_first_three_rows(excel_source, sheet_name)
if hasattr(excel_source, "seek"):
excel_source.seek(0)
# 读取从第4行开始的数据跳过前3行
df = pd.read_excel(
excel_source,
sheet_name=sheet_name,
header=None,
skiprows=3, # 跳过前三行(标题行)
names=headers, # 使用提取的标题
)
# 可选:删除全空列
df = df.dropna(how="all", axis=1)
print(f"✅ 读取数据形状: {df.shape}")
return df
def split_by_plus(s: str) -> list[str]:
if not isinstance(s, str):
raise TypeError("输入必须是字符串")
# 按 '+' 分割,去除每部分首尾空格,过滤空字符串
return [part.strip() for part in s.split("+") if part.strip()]
# %%
import joblib
folder_path = Path("./degree3")
def load_all_models():
"""
加载所有模型到 model_list
"""
global model_list
# 清空列表以确保重新加载(尽管有启动时加载一次的逻辑)
# model_list = []
# ^^^ 注释掉这行,因为 FastAPI 的 on_event("startup") 应该只执行一次。
# 如果在开发中需要热重载模型,可以取消注释或提供专门的重载端点。
if model_list: # 如果已经加载过,则不再重复加载 (主要由 startup_event 控制)
print(f"[MODEL_LOADER] Models already loaded. Count: {len(model_list)}")
return
print(
f"[MODEL_LOADER] Attempting to load models. Initial model_list length: {len(model_list)}"
)
print(
f"[MODEL_LOADER] Target model folder (relative to WORKDIR /app): {folder_path}"
)
resolved_folder_path = folder_path.resolve()
print(f"[MODEL_LOADER] Target model folder (absolute): {resolved_folder_path}")
if not resolved_folder_path.exists():
print(
f"[MODEL_LOADER] ERROR: Model directory not found at {resolved_folder_path}"
)
return
if not resolved_folder_path.is_dir():
print(f"[MODEL_LOADER] ERROR: Path {resolved_folder_path} is not a directory.")
return
print(
f"[MODEL_LOADER] Model directory found at {resolved_folder_path}. Searching for files with glob pattern: *3.pkl"
)
files_found = list(resolved_folder_path.glob("*3.pkl"))
print(
f"[MODEL_LOADER] Files found by glob ('{resolved_folder_path}/*3.pkl'): {[str(f.name) for f in files_found]}"
)
if not files_found:
print(
f"[MODEL_LOADER] WARNING: No files matched the glob pattern '*3.pkl' in {resolved_folder_path}. Listing all items in directory as a fallback check:"
)
try:
all_items = list(resolved_folder_path.iterdir())
print(
f"[MODEL_LOADER] All items in {resolved_folder_path}: {[str(item.name) for item in all_items]}"
)
except Exception as e:
print(
f"[MODEL_LOADER] ERROR: Could not list items in {resolved_folder_path}: {e}"
)
return
temp_model_list = [] # 使用临时列表,成功加载完再赋值给全局
for file_path_obj in files_found:
if file_path_obj.is_file():
try:
model_name_parts = file_path_obj.name.split(".xlsx")
if len(model_name_parts) > 1:
model_name = model_name_parts[0]
else:
model_name_base = file_path_obj.stem
if model_name_base.endswith("_model_degree3"):
model_name = model_name_base[: -len("_model_degree3")]
elif model_name_base.endswith(
"3"
): # 兼容 *3.pkl 但不含 .xlsx 的情况
model_name = model_name_base[: -len("3")].rstrip(
"."
) # 移除可能的尾部'.' (来自.pkl)
if model_name.endswith("_model_degree"): # 进一步处理
model_name = model_name[: -len("_model_degree")]
else:
model_name = model_name_base
print(
f"[MODEL_LOADER] WARNING: Filename {file_path_obj.name} does not contain '.xlsx' as expected for name splitting. Using '{model_name}' as model name based on stem."
)
loaded_model = joblib.load(file_path_obj)
temp_model_list.append(
{
"name": model_name,
"model": loaded_model,
"filename": file_path_obj.name,
}
)
print(
f"[MODEL_LOADER] Successfully loaded model from {file_path_obj.name} as: {model_name}"
)
except Exception as e:
print(
f"[MODEL_LOADER] ERROR: Failed to load model {file_path_obj.name}: {e}"
)
model_list = temp_model_list # 赋值给全局变量
print(
f"[MODEL_LOADER] Finished loading models. Total models loaded: {len(model_list)}"
)
if not model_list:
print(
f"[MODEL_LOADER] WARNING: model_list is empty after attempting to load models."
)
def get_welding_coefficient(row):
coefficient = 0.0
global model_list
model_map = {model["name"]: model for model in model_list}
lenth = row.长度_m
welding_type = row.坡口代码
welding_position = row.焊接位置
thickness = row.板厚_mm
thickness_value = float(thickness)
thickness_array = np.array([[thickness_value]])
# 处理焊接位置,统一命名
try:
if "" in welding_position:
welding_position = "横焊"
elif "" in welding_position:
welding_position = "立焊"
elif "" in welding_position:
welding_position = "仰焊"
elif "" in welding_position:
welding_position = "平焊"
except Exception as e:
print(f"行⚠️ 处理焊接位置时出错: {welding_position}, 错误: {e}")
welding_position = welding_position # 保持原值
# 处理坡口代码,支持多个坡口代码相加的情况
try:
welding_type = split_by_plus(welding_type)
except Exception as e:
print(f"⚠️ 处理坡口代码时出错: {welding_type}, 错误: {e}")
welding_type = [str(welding_type)] # 确保后续遍历
if isinstance(welding_type, str):
welding_type = [welding_type]
for item in welding_type:
try:
if "min/m" in item:
# 如果已经是系数形式,直接累加
coefficient += float(item.replace("min/m", "").strip())
else:
model_name = f"{item}_{welding_position}"
selected_model_info = model_map.get(model_name)
if selected_model_info is None:
raise KeyError(f"模型 '{model_name}' 未加载")
model = selected_model_info["model"]
range_str = MODEL_RANGES.get(model_name, "0-Infinity")
min_val_str, max_val_str = range_str.split("-")
min_val = float(min_val_str)
max_val = None if max_val_str == "Infinity" else float(max_val_str)
if thickness_value < min_val or (
max_val is not None and thickness_value > max_val
):
print(
f"⚠️ 警告: 厚度 {thickness_value}mm 超出模型 '{model_name}' 的适用范围 ({range_str} mm)。"
)
coefficient = float('nan')
break
else:
prediction = float(model.predict(thickness_array)[0])
coefficient += prediction
except Exception as e:
coefficient = float('nan')
print(
f"⚠️ 处理坡口代码模型不存在: {item}_{welding_position}, 错误: {e}"
)
# print(coefficient)
# print(type(coefficient))
coefficient = float(coefficient)
print(
f"长度: {lenth}, 焊接类型: {welding_type},焊接位置: {welding_position}, 板厚: {thickness}"
)
return coefficient
def get_welding_time(row):
coefficient = row.焊接系数
length = row.长度_m
welding_time = coefficient * length
return welding_time
def save_df_to_excel(df, output_file_path):
df.to_excel(output_file_path, index=False)
def build_prediction_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""清洗原始数据并计算焊接系数、工时。"""
df = df.copy()
df["序号"] = df.reset_index(drop=True).index + 1
df_clean = df.copy()
df_clean.columns = [
col.replace("", "_")
.replace("", "_")
.replace("(", "_")
.replace(")", "_")
.strip("_")
.replace("__", "_")
for col in df.columns
]
df_without_last = df_clean.iloc[:-1].copy() # 去掉最后一行合计
df_without_last["焊接系数"] = df_without_last.apply(
lambda row: get_welding_coefficient(row), axis=1
)
df_without_last["焊接工时"] = df_without_last.apply(
lambda row: get_welding_time(row), axis=1
)
total_row = {col: np.nan for col in df_without_last.columns}
total_row["序号"] = "总计"
total_row["焊接工时"] = df_without_last["焊接工时"].sum()
result_df = pd.concat([df_without_last, pd.DataFrame([total_row])], ignore_index=True)
return result_df
def _prepare_excel_source(
excel_source: Union[str, Path, IO[bytes], bytes, bytearray]
) -> tuple[Optional[Path], Optional[bytes]]:
if isinstance(excel_source, Path):
return excel_source, None
if isinstance(excel_source, str):
return Path(excel_source), None
if isinstance(excel_source, (bytes, bytearray)):
return None, bytes(excel_source)
if hasattr(excel_source, "read"):
try:
if hasattr(excel_source, "seek"):
excel_source.seek(0)
data = excel_source.read()
excel_source.seek(0)
else:
data = excel_source.read()
except Exception as exc: # pragma: no cover - 防御性
raise ValueError("无法读取 Excel 源数据") from exc
return None, data
raise TypeError("excel_source 必须是路径、字节流或支持 read() 的对象")
def _make_excel_io(path_like: Optional[Path], excel_bytes: Optional[bytes]):
if path_like is not None:
return path_like
if excel_bytes is None:
raise ValueError("缺少 Excel 数据")
return io.BytesIO(excel_bytes)
def process_excel(
excel_source: Union[str, Path, IO[bytes], bytes, bytearray],
sheet_name: Optional[Union[int, str]] = 0,
) -> Union[pd.DataFrame, dict[str, pd.DataFrame]]:
"""从 Excel 读取数据并完成完整的预测流程。
sheet_name None 处理工作簿内所有工作表并返回 {sheet_name: DataFrame}
"""
path_like, excel_bytes = _prepare_excel_source(excel_source)
def get_source():
return _make_excel_io(path_like, excel_bytes)
if sheet_name is None:
with pd.ExcelFile(get_source()) as excel_file:
sheet_names = excel_file.sheet_names
results: dict[str, pd.DataFrame] = {}
for name in sheet_names:
df = read_excel_with_irregular_headers(get_source(), sheet_name=name)
results[name] = build_prediction_dataframe(df)
return results
df = read_excel_with_irregular_headers(get_source(), sheet_name=sheet_name)
return build_prediction_dataframe(df)
model_list = []
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/部件.xlsx")
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/搭载.xlsx")
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/分段.xlsx")
# source_excel_path = Path("/root/data/hanjie/1113/rawdata/场景/零件.xlsx")
# source_excel_path = Path("./all.xlsx")
app = FastAPI(title="Welding Time Prediction API")
@app.on_event("startup")
def startup_event():
load_all_models()
def _sanitize_sheet_name(name: Union[str, int], existing: set[str]) -> str:
base = str(name) if str(name).strip() else "Sheet"
base = base[:31]
candidate = base or "Sheet"
counter = 1
while candidate in existing:
suffix = f"_{counter}"
candidate = (base[: 31 - len(suffix)] if base else "Sheet") + suffix
counter += 1
return candidate
def _df_to_records(df: pd.DataFrame) -> list[dict[str, object]]:
cleaned = df.replace({np.inf: None, -np.inf: None})
cleaned = cleaned.where(pd.notnull(cleaned), None)
records: list[dict[str, object]] = []
for raw in cleaned.to_dict(orient="records"):
normalized: dict[str, object] = {}
for key, value in raw.items():
if isinstance(value, np.generic):
value = value.item()
if isinstance(value, float) and (np.isnan(value) or np.isinf(value)):
value = None
normalized[key] = value
records.append(normalized)
return records
@app.post("/predict", summary="上传 Excel 并返回预测结果表")
async def predict(
file: UploadFile = File(...),
sheet_name: Optional[str] = None,
response_format: str = "json",
):
if not file.filename.lower().endswith((".xls", ".xlsx")):
raise HTTPException(status_code=400, detail="仅支持 Excel 文件 (.xls/.xlsx)")
contents = await file.read()
if not contents:
raise HTTPException(status_code=400, detail="上传文件为空")
try:
resolved_sheet: Optional[Union[int, str]] = None
if sheet_name is not None and sheet_name.lower() != "all":
if sheet_name.isdigit():
resolved_sheet = int(sheet_name)
else:
resolved_sheet = sheet_name
result_obj = process_excel(contents, sheet_name=resolved_sheet)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"处理 Excel 时发生错误: {exc}") from exc
response_format = response_format.lower()
if response_format not in {"json", "excel"}:
raise HTTPException(status_code=400, detail="response_format 只支持 'json''excel'")
if response_format == "json":
if isinstance(result_obj, dict):
payload = {str(name): _df_to_records(df) for name, df in result_obj.items()}
else:
key = str(resolved_sheet if resolved_sheet is not None else "Sheet1")
payload = {key: _df_to_records(result_obj)}
return JSONResponse(content=payload)
output_stream = io.BytesIO()
if isinstance(result_obj, dict):
existing_names: set[str] = set()
with pd.ExcelWriter(output_stream, engine="openpyxl") as writer:
for original_name, df in result_obj.items():
sheet_safe = _sanitize_sheet_name(original_name, existing_names)
existing_names.add(sheet_safe)
df.to_excel(writer, sheet_name=sheet_safe, index=False)
else:
result_obj.to_excel(output_stream, index=False)
output_stream.seek(0)
download_name = f"{Path(file.filename).stem}_预测结果.xlsx"
ascii_fallback = "prediction.xlsx"
quoted_name = quote(download_name)
content_disposition = (
f"attachment; filename={ascii_fallback}; filename*=UTF-8''{quoted_name}"
)
headers = {"Content-Disposition": content_disposition}
return StreamingResponse(
output_stream,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers=headers,
)
def main():
load_all_models()
# result_obj = process_excel(source_excel_path, sheet_name=0)
# output_file_path = source_excel_path.parent.joinpath(
# source_excel_path.stem + "_预测结果.xlsx"
# )
#
# if isinstance(result_obj, dict):
# with pd.ExcelWriter(output_file_path, engine="openpyxl") as writer:
# for sheet_name, df in result_obj.items():
# df.to_excel(writer, sheet_name=str(sheet_name)[:31] or "Sheet", index=False)
# else:
# print(len(result_obj))
# save_df_to_excel(result_obj, output_file_path)
if __name__ == "__main__":
main()

@ -0,0 +1,8 @@
numpy
scikit-learn
joblib
fastapi
uvicorn
pandas
openpyxl
python-multipart
Loading…
Cancel
Save