From 33ae1d8e31a8e7fccc86189ace033f8d6d17d330 Mon Sep 17 00:00:00 2001 From: zhanghao <2024138486@qq.com> Date: Fri, 29 May 2026 09:58:17 +0800 Subject: [PATCH] Improve QET scene sync and cabinet reuse --- docs/2D-3D数据传递待办.md | 427 ++++++++++++++++++ src/Mod/FreeCADExchange/CMakeLists.txt | 2 + src/Mod/FreeCADExchange/DeviceImport.py | 116 ++++- src/Mod/FreeCADExchange/ExchangeBootstrap.py | 110 ++++- src/Mod/FreeCADExchange/InitGui.py | 14 + src/Mod/FreeCADExchange/StaleObjectActions.py | 294 ++++++++++++ src/Mod/FreeCADExchange/StaleObjectSync.py | 291 ++++++++++++ src/Mod/FreeCADExchange/TerminalImport.py | 15 - src/Mod/FreeCADExchange/WiringImport.py | 42 -- ...eecad_exchange_device_import_fcstd_test.py | 87 ++++ ...ecad_exchange_stale_object_actions_test.py | 184 ++++++++ ...freecad_exchange_stale_object_sync_test.py | 224 +++++++++ .../freecad_exchange_wiring_import_test.py | 13 +- 13 files changed, 1741 insertions(+), 78 deletions(-) create mode 100644 docs/2D-3D数据传递待办.md create mode 100644 src/Mod/FreeCADExchange/StaleObjectActions.py create mode 100644 src/Mod/FreeCADExchange/StaleObjectSync.py create mode 100644 tests/python/freecad_exchange_stale_object_actions_test.py create mode 100644 tests/python/freecad_exchange_stale_object_sync_test.py diff --git a/docs/2D-3D数据传递待办.md b/docs/2D-3D数据传递待办.md new file mode 100644 index 0000000..c7da569 --- /dev/null +++ b/docs/2D-3D数据传递待办.md @@ -0,0 +1,427 @@ +# 2D / 3D 数据传递待办 + +本文只记录 QET 与 FreeCAD 第一版协同中,和数据传递、持久化、打开流程相关的待办事项。 + +第一版继续遵守当前约束: + +- 2D 电气语义以 QET 为准。 +- 3D 空间状态以 FreeCAD 文档为准。 +- 3D 位姿、装配关系、端子空间位置、导线几何不写入数据库。 +- 数据库只依赖: + - `project_2d3d_symbol_binding` + - `project_2d3d_terminal_binding` +- 设备绑定只依赖: + - `project_uuid` + - `element_uuid` + - `instance_id` +- 端子绑定只依赖: + - `project_uuid` + - `terminal_uuid` + - `instance_id` +- 第一版 3D 端子绑定唯一依据是 `terminal_uuid`。 + +## 1. QET 读取 3d_to_2d.json + +### 1.1 当前状态 + +FreeCAD 侧已经可以生成: + +```text +/.qet_freecad/3d_to_2d.json +``` + +该文件用于把 FreeCAD 创建或维护的 3D 实例 ID 回传给 QET。 + +当前最关键缺口是: + +> QET 侧还需要读取 `3d_to_2d.json`,并把里面的实例绑定写回项目运行库。 + +### 1.2 要写回的表 + +设备实例绑定写入: + +```text +project_2d3d_symbol_binding(project_uuid, element_uuid, instance_id) +``` + +端子实例绑定写入: + +```text +project_2d3d_terminal_binding(project_uuid, terminal_uuid, instance_id) +``` + +### 1.3 为什么要做 + +第一次打开 FreeCAD 时,QET 可能只知道: + +```text +element_uuid +terminal_uuid +resolved_model_path +``` + +此时 `instance_id` 可能为空。FreeCAD 创建 3D 设备实例后,会生成并保存 `QetInstanceId`。 + +QET 读取 `3d_to_2d.json` 后,就能记住: + +```text +element_uuid -> instance_id +terminal_uuid -> instance_id +``` + +这样下次 QET 再导出 `2d_to_3d.json` 时,可以带上已有 `instance_id`,FreeCAD 就能复用已有 3D 实例,而不是重复创建。 + +## 2. 3D 工程文件持久化 + +### 2.1 当前约定 + +第一版 3D 工程本体是: + +```text +/.qet_freecad/scene.FCStd +``` + +交换目录建议保持为: + +```text +/.qet_freecad/ + 2d_to_3d.json + 3d_to_2d.json + scene.FCStd + logs/ +``` + +### 2.2 FreeCAD 文档负责保存 + +`scene.FCStd` 保存: + +- 设备 3D 实例对象 +- 设备位姿 +- 柜内装配关系 +- 端子 LCS 和空间位置 +- 手动导线几何 +- 自动布线结果 +- 走线路径和路由网络 + +这些内容不进入数据库。 + +### 2.3 QET 要做什么 + +QET 下次打开同一个工程或同一个机柜时,应优先打开已有: + +```text +/.qet_freecad/scene.FCStd +``` + +不要重新创建空场景。 + +如果后续支持一个项目多个机柜,则 QET 还需要保存或推导: + +```text +某个机柜 -> 对应哪个 scene.FCStd +``` + +第一版可以先使用默认工程文件: + +```text +.qet_freecad/scene.FCStd +``` + +## 3. 2D 原理图更新后同步到 3D + +### 3.1 当前推荐方式 + +第一版不做实时同步。 + +推荐在用户点击 QET 的 `3D视图` 时执行一次同步: + +```text +QET 重新导出 2d_to_3d.json +FreeCAD 打开已有 scene.FCStd +FreeCAD 根据最新 JSON 增量更新 3D 文档 +``` + +### 3.2 FreeCAD 更新依据 + +FreeCAD 应按下面字段识别对象: + +```text +设备:element_uuid / instance_id +端子:terminal_uuid / instance_id +``` + +处理策略: + +- `instance_id` 已存在,并且 FreeCAD 文档中找到对象:复用并更新语义。 +- `instance_id` 为空:创建新的 3D 实例。 +- 2D 新增设备或端子:FreeCAD 新增对应 3D 对象。 +- 2D 已删除但 FreeCAD 仍存在的对象:需要定义失效处理策略。 + +### 3.3 删除或失效对象策略 + +后续需要明确: + +- 直接删除 3D 对象 +- 标记为失效 +- 隐藏但保留 +- 提示用户确认后删除 + +第一版建议先采用保守策略: + +```text +标记失效或提示用户,不自动删除已有 3D 装配和接线。 +``` + +## 4. 3D 保存后 2D 再打开时打开保存的工程 + +这和“2D 原理图更新后同步到 3D”不是一件事。 + +### 4.1 打开保存的工程解决的问题 + +它解决的是: + +```text +QET 应该打开哪个 FreeCAD 文件? +``` + +第一版默认打开: + +```text +/.qet_freecad/scene.FCStd +``` + +### 4.2 2D 更新同步解决的问题 + +它解决的是: + +```text +打开已有 scene.FCStd 后,FreeCAD 应该按最新 2D 原理图新增、更新或标记哪些设备、端子、导线任务? +``` + +### 4.3 两者关系 + +完整动作通常发生在同一个入口中: + +```text +用户点击 3D视图 +QET 读取上一轮 3d_to_2d.json +QET 更新绑定表 +QET 重新导出 2d_to_3d.json +QET 启动 FreeCAD 并打开已有 scene.FCStd +FreeCAD 根据 2d_to_3d.json 增量更新文档 +``` + +### 4.4 3D视图入口确认流程 + +当前确认的 `3D视图` 入口顺序为: + +```text +用户点击 3D视图 +1. QET 检查 .qet_freecad/3d_to_2d.json 是否存在 +2. 如果存在,读取并校验 project_uuid +3. 把 instances[] 写入 project_2d3d_symbol_binding +4. 把 terminals[] 写入 project_2d3d_terminal_binding +5. 然后重新导出最新 2d_to_3d.json +6. 启动 FreeCAD,打开已有 scene.FCStd +``` + +这个顺序的关键点是: + +```text +先读取 3D 回写并更新绑定表 +再导出新的 2D -> 3D 快照 +``` + +否则本次导出的 `2d_to_3d.json` 仍然拿不到上一轮 FreeCAD 生成的 `instance_id`。 + +## 5. 设备 3D 资产传递 + +### 5.1 当前状态 + +QET -> FreeCAD 方向已经具备设备资产传递。 + +`2d_to_3d.json` 中已有: + +```text +device_models[] + element_uuid + device_id + parts_3d + resolved_model_path +``` + +FreeCAD 使用 `resolved_model_path` 导入 3D 模型。 + +### 5.2 后续要求 + +后续不是从零实现,而是继续稳定以下规则: + +- 优先使用 `device_3d_asset`。 +- `device_attribute.parts_3d` 只作为兼容或回退字段。 +- QET 导出给 FreeCAD 的关键字段是 `resolved_model_path`。 +- FreeCAD 不反查 QET 数据库来寻找模型路径。 +- `.FCStd` 应作为正式可复用设备资产格式。 +- STEP / STP / STE 更适合作为制作 FCStd 模板的原始几何输入。 + +## 6. 导线数据传递 + +### 6.1 当前状态 + +QET -> FreeCAD 方向已经具备导线任务传递。 + +`2d_to_3d.json` 中已有: + +```text +wires[] + wire_id + net_uuid + group_uuid + wire_mark + wire_mark_is_manual + start_element_uuid + start_terminal_uuid + end_element_uuid + end_terminal_uuid + start_terminal_display + end_terminal_display +``` + +FreeCAD 侧已经可以把 `wires[]` 导入为导线任务。 + +### 6.2 后续可能扩展 + +如果后续 QET 需要读取 3D 布线状态,可以扩展 `3d_to_2d.json`,例如: + +```text +wire_id +route_status +route_type +length +diagnostics +``` + +### 6.3 第一版不做 + +第一版不把下面内容写进数据库: + +- 3D 路径点 +- 导线空间几何 +- 线束位姿 +- 线槽内具体排布 + +这些仍保存在: + +```text +scene.FCStd +``` + +## 7. 推荐实施步骤 + +### 步骤 1:补 QET 读取 3d_to_2d.json + +在 QET 侧新增或接入一个读取流程: + +```text +读取 /.qet_freecad/3d_to_2d.json +校验 project_uuid +读取 instances[] +读取 terminals[] +``` + +校验失败时不写库,并给出提示或日志。 + +### 步骤 2:写回两张绑定表 + +对 `instances[]` 执行 upsert: + +```text +project_2d3d_symbol_binding: + project_uuid + element_uuid + instance_id +``` + +对 `terminals[]` 执行 upsert: + +```text +project_2d3d_terminal_binding: + project_uuid + terminal_uuid + instance_id +``` + +第一版不要写入旧 3D 场景表,也不要写入位姿字段。 + +### 步骤 3:把读取回写接到 3D视图入口前 + +用户点击 `3D视图` 时,建议先执行: + +```text +读取上一轮 3d_to_2d.json +更新绑定表 +重新导出 2d_to_3d.json +启动 FreeCAD +``` + +这样新导出的 `2d_to_3d.json` 就能带上最新 `instance_id`。 + +### 步骤 4:确认 scene.FCStd 打开策略 + +QET 启动 FreeCAD 时: + +- 如果 `.qet_freecad/scene.FCStd` 已存在,打开它。 +- 如果不存在,允许 FreeCAD 创建新的工程文档。 +- 后续如果接入机柜维度,则按机柜映射选择对应 FCStd。 + +### 步骤 5:确认 FreeCAD 增量更新策略 + +FreeCAD 打开 `scene.FCStd` 并读取新的 `2d_to_3d.json` 后: + +- 已存在实例:复用。 +- 新设备:创建。 +- 新端子:创建。 +- 失效设备或端子:先标记或提示,不建议第一版自动删除。 + +### 步骤 6:保留设备资产和导线任务现有链路 + +设备 3D 资产继续走: + +```text +device_3d_asset -> resolved_model_path -> FreeCAD +``` + +导线任务继续走: + +```text +QET wires[] -> FreeCAD QETWiring_01_Tasks +``` + +第一版暂不要求 QET 读取 3D 导线几何。 + +### 步骤 7:后续再扩展 3D 布线状态回传 + +等最小绑定闭环稳定后,再考虑在 `3d_to_2d.json` 中增加: + +```text +routed_wires[] + wire_id + route_status + route_type + length + diagnostics +``` + +该扩展只作为状态、统计、诊断回传,不作为第一版数据库几何持久化。 + +## 8. 第一版完成标准 + +第一版数据传递闭环完成后,应满足: + +1. 第一次从 QET 打开 FreeCAD,`instance_id` 可以为空。 +2. FreeCAD 创建 3D 实例并保存 `scene.FCStd`。 +3. FreeCAD 生成 `3d_to_2d.json`。 +4. QET 能读取 `3d_to_2d.json`。 +5. QET 能写入两张绑定表。 +6. 第二次点击 `3D视图` 时,QET 导出的 `2d_to_3d.json` 中已有 `instance_id`。 +7. FreeCAD 打开已有 `scene.FCStd`,复用已有 3D 实例。 +8. 3D 位姿、装配、导线几何仍只保存在 `scene.FCStd`。 diff --git a/src/Mod/FreeCADExchange/CMakeLists.txt b/src/Mod/FreeCADExchange/CMakeLists.txt index 05e6f87..020ba62 100644 --- a/src/Mod/FreeCADExchange/CMakeLists.txt +++ b/src/Mod/FreeCADExchange/CMakeLists.txt @@ -13,6 +13,8 @@ set(FreeCADExchange_Scripts TerminalImport.py WiringObjects.py WiringImport.py + StaleObjectSync.py + StaleObjectActions.py RoutingNetwork.py AutoRouting.py AutoRoutingPanel.py diff --git a/src/Mod/FreeCADExchange/DeviceImport.py b/src/Mod/FreeCADExchange/DeviceImport.py index d20890a..d4fc078 100644 --- a/src/Mod/FreeCADExchange/DeviceImport.py +++ b/src/Mod/FreeCADExchange/DeviceImport.py @@ -21,6 +21,7 @@ import TerminalObjects ROOT_GROUP_NAME = "QETExchangeDevices" ROOT_GROUP_LABEL = "QET Exchange Devices" CABINET_MODEL_GROUP_NAME = "QETCabinetModel" +CABINET_GROUP_PREFIX = "QETCabinet_" DEVICE_GROUP_PREFIX = "QETDevice_" TERMINAL_GROUP_PREFIX = "QETTerminals_" WIRE_GROUP_PREFIX = "QETWires_" @@ -81,6 +82,13 @@ def _native_path(value): return os.path.normpath(os.path.expandvars(os.path.expanduser(text))) +def _normalized_path_key(value): + text = _native_path(value) + if not text: + return "" + return os.path.normcase(os.path.normpath(text)) + + def _existing_object_names(doc): return {obj.Name for obj in doc.Objects} @@ -318,13 +326,79 @@ def _ensure_root_group(doc, cabinet=None, project_uuid=""): return root -def _ensure_cabinet_model_group(doc, root_group): - group = doc.getObject(CABINET_MODEL_GROUP_NAME) +def _cabinet_instance_id(cabinet): + if not isinstance(cabinet, dict): + return "" + for field_name in ("cabinet_instance_id", "cabinet_uuid", "location_id"): + value = cabinet.get(field_name) + if value is None: + continue + text = str(value).strip() + if text: + return text + return "default" + + +def _cabinet_group_label(cabinet): + label = _cabinet_label_text(cabinet) + if label: + return label + return "3D机柜" + + +def _find_cabinet_group(doc, cabinet_instance_id): + target_id = (cabinet_instance_id or "").strip() + preferred_name = CABINET_GROUP_PREFIX + _safe_token(target_id or "default") + group = doc.getObject(preferred_name) + if group is not None: + return group + + legacy_group = doc.getObject(CABINET_MODEL_GROUP_NAME) + if legacy_group is not None: + legacy_instance_id = getattr(legacy_group, "QetCabinetInstanceId", "").strip() + if not target_id or not legacy_instance_id or legacy_instance_id == target_id: + return legacy_group + + for candidate in getattr(doc, "Objects", []) or []: + if "QetCabinetInstanceId" not in getattr(candidate, "PropertiesList", []): + continue + if getattr(candidate, "QetCabinetInstanceId", "").strip() == target_id: + return candidate + return None + + +def _ensure_cabinet_model_group(doc, root_group, cabinet=None, project_uuid=""): + cabinet_instance_id = _cabinet_instance_id(cabinet) + group = _find_cabinet_group(doc, cabinet_instance_id) if group is None: - group = doc.addObject("App::DocumentObjectGroup", CABINET_MODEL_GROUP_NAME) - group.Label = "3D机柜" + group = doc.addObject( + "App::DocumentObjectGroup", + CABINET_GROUP_PREFIX + _safe_token(cabinet_instance_id or "default"), + ) + group.Label = _cabinet_group_label(cabinet) if group not in getattr(root_group, "Group", []): root_group.addObject(group) + _ensure_string_property( + group, + "QetCabinetInstanceId", + "QET Exchange", + "Cabinet instance id from QET exchange", + cabinet_instance_id, + ) + _ensure_string_property( + group, + "QetCabinetResolvedScenePath", + "QET Exchange", + "Resolved local cabinet scene path from QET exchange", + cabinet.get("resolved_scene_path", "") if isinstance(cabinet, dict) else "", + ) + _ensure_string_property( + group, + "QetProjectUuid", + "QET Exchange", + "Project UUID from QET exchange", + project_uuid, + ) return group @@ -565,6 +639,14 @@ def _clear_group_contents(doc, group): _remove_object_tree(doc, child) +def _existing_group_objects(doc, group): + result = [] + for child in list(getattr(group, "Group", []) or []): + if _object_exists(doc, child): + result.append(child) + return result + + def _is_exchange_sidecar_group(obj): child_name = _object_name(obj) if child_name.startswith(TERMINAL_GROUP_PREFIX) or child_name.startswith(WIRE_GROUP_PREFIX): @@ -816,7 +898,22 @@ def _import_cabinet_model(doc, root_group, cabinet, report): ) return - cabinet_group = _ensure_cabinet_model_group(doc, root_group) + project_uuid = getattr(root_group, "QetProjectUuid", "").strip() + cabinet_group = _ensure_cabinet_model_group(doc, root_group, cabinet, project_uuid) + existing_model_objects = _existing_group_objects(doc, cabinet_group) + previous_path = getattr(cabinet_group, "QetCabinetResolvedScenePath", "").strip() + same_source = _normalized_path_key(previous_path) == _normalized_path_key(resolved_scene_path) + if existing_model_objects and same_source: + report.setdefault("cabinet_reused", 0) + report["cabinet_reused"] += 1 + _append_debug_log( + "DeviceImport cabinet import skipped: reused existing cabinet group for instance_id={0}".format( + getattr(cabinet_group, "QetCabinetInstanceId", "").strip() + ) + ) + return + + had_existing_model = bool(existing_model_objects) _clear_group_contents(doc, cabinet_group) _ensure_string_property( cabinet_group, @@ -839,6 +936,12 @@ def _import_cabinet_model(doc, root_group, cabinet, report): use_link_group=True, ) report["cabinet_imported"] += 1 + if had_existing_model: + report.setdefault("cabinet_reimported", 0) + report["cabinet_reimported"] += 1 + else: + report.setdefault("cabinet_added", 0) + report["cabinet_added"] += 1 _append_debug_log("DeviceImport cabinet import succeeded") except Exception as exc: report["cabinet_skipped_import_error"] += 1 @@ -870,6 +973,9 @@ def import_devices_from_payload(payload, scene_path=""): "skipped_unsupported_format": 0, "skipped_import_error": 0, "cabinet_imported": 0, + "cabinet_added": 0, + "cabinet_reimported": 0, + "cabinet_reused": 0, "cabinet_skipped_missing_model": 0, "cabinet_skipped_missing_file": 0, "cabinet_skipped_unsupported_format": 0, diff --git a/src/Mod/FreeCADExchange/ExchangeBootstrap.py b/src/Mod/FreeCADExchange/ExchangeBootstrap.py index 4dd6302..45081e1 100644 --- a/src/Mod/FreeCADExchange/ExchangeBootstrap.py +++ b/src/Mod/FreeCADExchange/ExchangeBootstrap.py @@ -23,6 +23,10 @@ try: import WiringImport except Exception: WiringImport = None +try: + import StaleObjectSync +except Exception: + StaleObjectSync = None try: from PySide6 import QtCore, QtWidgets @@ -42,6 +46,7 @@ STATE_SUMMARY = "_qet_exchange_summary" STATE_IMPORT_REPORT = "_qet_exchange_import_report" STATE_TERMINAL_IMPORT_REPORT = "_qet_exchange_terminal_import_report" STATE_WIRING_IMPORT_REPORT = "_qet_exchange_wiring_import_report" +STATE_STALE_SYNC_REPORT = "_qet_exchange_stale_sync_report" STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report" STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled" STATE_TREE_FILTER = "_qet_exchange_tree_filter" @@ -653,19 +658,76 @@ def _import_wiring_tasks(payload): return None -def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None): +def _scene_path_from_exchange_context(): + scene_path = os.environ.get(ENV_SCENE_PATH, "").strip() + if scene_path: + return scene_path + + json_path = os.environ.get(ENV_JSON_PATH, "").strip() + if not json_path: + return "" + + exchange_dir = Path(json_path).parent + for file_name in ("QETScene.FCStd", "scene.FCStd"): + candidate = exchange_dir / file_name + if candidate.is_file(): + resolved = str(candidate) + os.environ[ENV_SCENE_PATH] = resolved + _append_debug_log( + "QET_FREECAD_SCENE_FILE inferred from exchange directory: {0}".format( + resolved + ) + ) + return resolved + return "" + + +def _mark_stale_objects(payload): + if StaleObjectSync is None: + _append_debug_log("stale object sync skipped: StaleObjectSync module unavailable") + return None + + try: + return StaleObjectSync.mark_stale_objects_from_payload( + payload, + App.ActiveDocument, + ) + except Exception as exc: + _append_debug_log("stale object sync failed: {0}".format(exc)) + _append_debug_log(traceback.format_exc()) + return None + + +def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None, stale_report=None): lines = [ "QET exchange file loaded successfully.", - "", - "Project UUID: {0}".format(summary["project_uuid"]), - "Exchange file: {0}".format(summary["json_path"]), - "Devices: {0}".format(summary["device_count"]), - "Terminals: {0}".format(summary["terminal_count"]), - "Wires: {0}".format(summary["wire_count"]), - "Device models: {0}".format(summary["device_model_count"]), - "Resolved model paths: {0}".format(summary["device_models_with_parts"]), ] + if import_report or stale_report: + lines.extend( + [ + "", + "同步结果:", + "新增机柜:{0}".format(import_report.get("cabinet_added", 0) if import_report else 0), + "失效机柜:{0}".format(stale_report.get("stale_cabinets", 0) if stale_report else 0), + "新增设备:{0}".format(import_report.get("imported_devices", 0) if import_report else 0), + "失效设备:{0}".format(stale_report.get("stale_devices", 0) if stale_report else 0), + ] + ) + + lines.extend( + [ + "", + "Project UUID: {0}".format(summary["project_uuid"]), + "Exchange file: {0}".format(summary["json_path"]), + "Devices: {0}".format(summary["device_count"]), + "Terminals: {0}".format(summary["terminal_count"]), + "Wires: {0}".format(summary["wire_count"]), + "Device models: {0}".format(summary["device_model_count"]), + "Resolved model paths: {0}".format(summary["device_models_with_parts"]), + ] + ) + cabinet = summary.get("cabinet") if isinstance(cabinet, dict): cabinet_name = cabinet.get("display_text") or cabinet.get("label") or cabinet.get("name") @@ -708,6 +770,9 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac "3D device import summary:", "Target document: {0}".format(import_report["document_name"]), "Imported cabinet models: {0}".format(import_report["cabinet_imported"]), + "Added cabinets: {0}".format(import_report.get("cabinet_added", 0)), + "Reimported cabinets: {0}".format(import_report.get("cabinet_reimported", 0)), + "Reused cabinets: {0}".format(import_report.get("cabinet_reused", 0)), "Imported devices: {0}".format(import_report["imported_devices"]), "Updated devices: {0}".format(import_report["updated_devices"]), ] @@ -795,6 +860,24 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac if len(warnings) > 10: lines.append("- ... ({0} more)".format(len(warnings) - 10)) + if stale_report: + lines.extend( + [ + "", + "3D stale object sync:", + "Active cabinets: {0}".format(stale_report.get("active_cabinets", 0)), + "Stale cabinets: {0}".format(stale_report.get("stale_cabinets", 0)), + "Active devices: {0}".format(stale_report.get("active_devices", 0)), + "Stale devices: {0}".format(stale_report.get("stale_devices", 0)), + "Active terminals: {0}".format(stale_report.get("active_terminals", 0)), + "Stale terminals: {0}".format(stale_report.get("stale_terminals", 0)), + "Active wire tasks: {0}".format(stale_report.get("active_wire_tasks", 0)), + "Stale wire tasks: {0}".format(stale_report.get("stale_wire_tasks", 0)), + "Active routed wires: {0}".format(stale_report.get("active_routed_wires", 0)), + "Stale routed wires: {0}".format(stale_report.get("stale_routed_wires", 0)), + ] + ) + lines.append("") lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.") lines.append("3D terminal import and write-back are enabled.") @@ -832,7 +915,7 @@ def _run_scheduled_device_import(attempt=0): ) return - scene_path = os.environ.get(ENV_SCENE_PATH, "").strip() + scene_path = _scene_path_from_exchange_context() _append_debug_log( "scheduled device import starting with scene_path={0}".format(scene_path) ) @@ -917,6 +1000,10 @@ def _run_scheduled_device_import(attempt=0): if wiring_report is not None: setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report) + stale_report = _mark_stale_objects(payload) + if stale_report is not None: + setattr(App, STATE_STALE_SYNC_REPORT, stale_report) + if ExchangeWriteBack is None: _append_debug_log("write-back skipped: ExchangeWriteBack module unavailable") writeback_report = None @@ -942,7 +1029,7 @@ def _run_scheduled_device_import(attempt=0): _show_info( "QET Exchange", - _summary_message(summary, import_report, terminal_report, writeback_report, wiring_report), + _summary_message(summary, import_report, terminal_report, writeback_report, wiring_report, stale_report), ) _append_debug_log("summary dialog shown") @@ -988,6 +1075,7 @@ def bootstrap_if_requested(): _append_debug_log(traceback.format_exc()) raise + _scene_path_from_exchange_context() summary = _build_summary(payload, json_path) _append_debug_log( "payload loaded: devices={0}, terminals={1}, models={2}, cabinet={3}".format( diff --git a/src/Mod/FreeCADExchange/InitGui.py b/src/Mod/FreeCADExchange/InitGui.py index e233a56..9294258 100644 --- a/src/Mod/FreeCADExchange/InitGui.py +++ b/src/Mod/FreeCADExchange/InitGui.py @@ -19,6 +19,9 @@ COMMANDS = [ "QET_Exchange_AutoRouteSelected", "QET_Exchange_AutoRouteAll", "QET_Exchange_OpenAutoRoutingPanel", + "QET_Exchange_HideStaleObjects", + "QET_Exchange_ShowStaleObjects", + "QET_Exchange_SummarizeStaleObjects", ] @@ -70,6 +73,7 @@ def _register_exchange_commands( auto_routing_panel = safe_import("AutoRoutingPanel") manual_wiring = safe_import("ManualWiring") manual_wiring_panel = safe_import("ManualWiringPanel") + stale_object_actions = safe_import("StaleObjectActions") template_authoring = safe_import("TemplateAuthoring") template_authoring_panel = safe_import("TemplateAuthoringPanel") template_instantiation = safe_import("TemplateInstantiation") @@ -134,6 +138,16 @@ def _register_exchange_commands( ) ) + try: + if stale_object_actions is not None: + stale_object_actions.register_commands() + except Exception: + append_init_log( + "InitGui failed to register stale object commands:\n{0}".format( + traceback_module.format_exc() + ) + ) + try: if template_authoring is not None: template_authoring.register_commands() diff --git a/src/Mod/FreeCADExchange/StaleObjectActions.py b/src/Mod/FreeCADExchange/StaleObjectActions.py new file mode 100644 index 0000000..87cc1ad --- /dev/null +++ b/src/Mod/FreeCADExchange/StaleObjectActions.py @@ -0,0 +1,294 @@ +# FreeCADExchange stale object user actions. + +import FreeCAD as App + +try: + import FreeCADGui as Gui +except ImportError: + Gui = None + +try: + from PySide6 import QtWidgets +except ImportError: + try: + from PySide2 import QtWidgets + except ImportError: + try: + from PySide import QtGui as QtWidgets + except ImportError: + QtWidgets = None + +import TerminalObjects + + +SYNC_STATUS_STALE = "Stale" +CABINET_GROUP_PREFIX = "QETCabinet_" + + +def _active_document(doc=None): + if doc is not None: + return doc + return getattr(App, "ActiveDocument", None) + + +def _has_property(obj, prop_name): + return prop_name in getattr(obj, "PropertiesList", []) + + +def _sync_status(obj): + if not _has_property(obj, "QetSyncStatus"): + return "" + return (getattr(obj, "QetSyncStatus", "") or "").strip() + + +def _is_stale_object(obj): + return _sync_status(obj) == SYNC_STATUS_STALE + + +def _is_device(obj): + name = getattr(obj, "Name", "") + return name.startswith(TerminalObjects.DEVICE_GROUP_PREFIX) and _has_property( + obj, + "QetElementUuid", + ) + + +def _is_cabinet(obj): + name = getattr(obj, "Name", "") + return ( + (name.startswith(CABINET_GROUP_PREFIX) or name == "QETCabinetModel") + and _has_property(obj, "QetCabinetInstanceId") + ) + + +def _is_terminal(obj): + return _has_property(obj, "QetTerminalUuid") or ( + (getattr(obj, "Role", "") or "").strip() == TerminalObjects.TERMINAL_ROLE + ) + + +def _is_wire_task(obj): + return ( + _has_property(obj, "QetWireUuid") + and (getattr(obj, "RouteType", "") or "").strip() == "Task" + ) + + +def _is_routed_wire(obj): + if not _has_property(obj, "QetWireUuid"): + return False + if (getattr(obj, "RouteType", "") or "").strip() == "Task": + return False + for parent in getattr(obj, "InList", []) or []: + if getattr(parent, "Name", "") == "QETWiring_04_Routed": + return True + return False + + +def _category(obj): + if _is_cabinet(obj): + return "cabinets" + if _is_device(obj): + return "devices" + if _is_terminal(obj): + return "terminals" + if _is_wire_task(obj): + return "wire_tasks" + if _is_routed_wire(obj): + return "routed_wires" + return "other" + + +def iter_stale_objects(doc=None): + doc = _active_document(doc) + if doc is None: + return + for obj in list(getattr(doc, "Objects", []) or []): + if _is_stale_object(obj): + yield obj + + +def _iter_object_tree(obj): + seen = set() + + def visit(item): + if item is None or id(item) in seen: + return + seen.add(id(item)) + yield item + for child in list(getattr(item, "Group", []) or []): + for nested in visit(child): + yield nested + + for result in visit(obj): + yield result + + +def _set_visible(obj, visible): + view_object = getattr(obj, "ViewObject", None) + if view_object is None or not hasattr(view_object, "Visibility"): + return False + view_object.Visibility = bool(visible) + return True + + +def summarize_stale_objects(doc=None): + doc = _active_document(doc) + report = { + "document_name": getattr(doc, "Name", "") if doc is not None else "", + "total": 0, + "cabinets": 0, + "devices": 0, + "terminals": 0, + "wire_tasks": 0, + "routed_wires": 0, + "other": 0, + "objects": [], + } + if doc is None: + return report + + for obj in iter_stale_objects(doc): + category = _category(obj) + report["total"] += 1 + report[category] += 1 + report["objects"].append( + { + "name": getattr(obj, "Name", ""), + "label": getattr(obj, "Label", ""), + "category": category, + "reason": getattr(obj, "QetSyncReason", ""), + } + ) + return report + + +def set_stale_objects_visibility(visible, doc=None): + doc = _active_document(doc) + report = summarize_stale_objects(doc) + report["visible"] = bool(visible) + report["affected_objects"] = 0 + if doc is None: + return report + + affected_ids = set() + for stale_obj in iter_stale_objects(doc): + for obj in _iter_object_tree(stale_obj): + if id(obj) in affected_ids: + continue + if _set_visible(obj, visible): + affected_ids.add(id(obj)) + report["affected_objects"] = len(affected_ids) + try: + doc.recompute() + except Exception: + pass + return report + + +def hide_stale_objects(doc=None): + return set_stale_objects_visibility(False, doc) + + +def show_stale_objects(doc=None): + return set_stale_objects_visibility(True, doc) + + +def _format_summary(report): + return ( + "失效对象统计:\n" + "总数:{total}\n" + "机柜:{cabinets}\n" + "设备:{devices}\n" + "端子:{terminals}\n" + "导线任务:{wire_tasks}\n" + "已布线:{routed_wires}\n" + "其他:{other}" + ).format(**report) + + +def _notify(title, message): + try: + App.Console.PrintMessage("[FreeCADExchange] {0}\n".format(message.replace("\n", ";"))) + except Exception: + pass + if QtWidgets is not None: + try: + QtWidgets.QMessageBox.information(None, title, message) + except Exception: + pass + + +class CommandHideStaleObjects: + def GetResources(self): + return { + "MenuText": "隐藏失效对象", + "ToolTip": "隐藏所有 QetSyncStatus=Stale 的 QET 3D 对象", + } + + def IsActive(self): + return App.ActiveDocument is not None + + def Activated(self): + report = hide_stale_objects(App.ActiveDocument) + _notify( + "QET 失效对象", + "已隐藏 {0} 个可见对象。\n\n{1}".format( + report["affected_objects"], + _format_summary(report), + ), + ) + + +class CommandShowStaleObjects: + def GetResources(self): + return { + "MenuText": "显示失效对象", + "ToolTip": "显示所有 QetSyncStatus=Stale 的 QET 3D 对象", + } + + def IsActive(self): + return App.ActiveDocument is not None + + def Activated(self): + report = show_stale_objects(App.ActiveDocument) + _notify( + "QET 失效对象", + "已显示 {0} 个对象。\n\n{1}".format( + report["affected_objects"], + _format_summary(report), + ), + ) + + +class CommandSummarizeStaleObjects: + def GetResources(self): + return { + "MenuText": "统计失效对象", + "ToolTip": "统计当前文档中 QetSyncStatus=Stale 的 QET 3D 对象", + } + + def IsActive(self): + return App.ActiveDocument is not None + + def Activated(self): + report = summarize_stale_objects(App.ActiveDocument) + _notify("QET 失效对象", _format_summary(report)) + + +_COMMANDS_REGISTERED = False + + +def register_commands(): + global _COMMANDS_REGISTERED + if _COMMANDS_REGISTERED: + return + if Gui is None or not hasattr(Gui, "addCommand"): + return + Gui.addCommand("QET_Exchange_HideStaleObjects", CommandHideStaleObjects()) + Gui.addCommand("QET_Exchange_ShowStaleObjects", CommandShowStaleObjects()) + Gui.addCommand("QET_Exchange_SummarizeStaleObjects", CommandSummarizeStaleObjects()) + _COMMANDS_REGISTERED = True + + +register_commands() diff --git a/src/Mod/FreeCADExchange/StaleObjectSync.py b/src/Mod/FreeCADExchange/StaleObjectSync.py new file mode 100644 index 0000000..2642408 --- /dev/null +++ b/src/Mod/FreeCADExchange/StaleObjectSync.py @@ -0,0 +1,291 @@ +# FreeCADExchange stale object marking helpers. + +import FreeCAD as App + +import TerminalObjects + +try: + import WiringObjects +except Exception: + WiringObjects = None + + +SYNC_STATUS_ACTIVE = "Active" +SYNC_STATUS_STALE = "Stale" +SYNC_GROUP = "QET Sync" +CABINET_GROUP_PREFIX = "QETCabinet_" + + +def _string_value(item, field_name): + if not isinstance(item, dict): + return "" + value = item.get(field_name, "") + if value is None: + return "" + return str(value).strip() + + +def _set_status(obj, status, reason=""): + TerminalObjects.ensure_string_property( + obj, + "QetSyncStatus", + SYNC_GROUP, + "Latest 2D/3D synchronization status", + status, + ) + TerminalObjects.ensure_string_property( + obj, + "QetSyncReason", + SYNC_GROUP, + "Why the object has this synchronization status", + reason, + ) + + +def _payload_identity_sets(payload): + cabinet_instance_ids = set() + device_element_uuids = set() + device_instance_ids = set() + terminal_uuids = set() + wire_uuids = set() + + cabinet = payload.get("cabinet") + if isinstance(cabinet, dict): + for field_name in ("cabinet_instance_id", "cabinet_uuid", "location_id"): + value = _string_value(cabinet, field_name) + if value: + cabinet_instance_ids.add(value) + break + + for item in payload.get("devices", []) or []: + element_uuid = _string_value(item, "element_uuid") + instance_id = _string_value(item, "instance_id") + if element_uuid: + device_element_uuids.add(element_uuid) + if instance_id: + device_instance_ids.add(instance_id) + + for item in payload.get("terminals", []) or []: + terminal_uuid = _string_value(item, "terminal_uuid") + if terminal_uuid: + terminal_uuids.add(terminal_uuid) + + for item in payload.get("wires", []) or []: + wire_uuid = ( + _string_value(item, "wire_id") + or _string_value(item, "wire_uuid") + or _string_value(item, "id") + ) + if wire_uuid: + wire_uuids.add(wire_uuid) + + return { + "cabinet_instance_ids": cabinet_instance_ids, + "device_element_uuids": device_element_uuids, + "device_instance_ids": device_instance_ids, + "terminal_uuids": terminal_uuids, + "wire_uuids": wire_uuids, + } + + +def _iter_cabinet_groups(doc): + root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME) + candidates = [] + if root is not None: + candidates.extend(list(getattr(root, "Group", []) or [])) + if not candidates: + candidates.extend(list(getattr(doc, "Objects", []) or [])) + + seen = set() + for obj in candidates: + name = getattr(obj, "Name", "") + if not name.startswith(CABINET_GROUP_PREFIX) and name != "QETCabinetModel": + continue + if "QetCabinetInstanceId" not in getattr(obj, "PropertiesList", []): + continue + if id(obj) in seen: + continue + seen.add(id(obj)) + yield obj + + +def _iter_device_groups(doc): + root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME) + candidates = [] + if root is not None: + candidates.extend(list(getattr(root, "Group", []) or [])) + if not candidates: + candidates.extend(list(getattr(doc, "Objects", []) or [])) + + seen = set() + for obj in candidates: + name = getattr(obj, "Name", "") + if not name.startswith(TerminalObjects.DEVICE_GROUP_PREFIX): + continue + if "QetElementUuid" not in getattr(obj, "PropertiesList", []): + continue + if id(obj) in seen: + continue + seen.add(id(obj)) + yield obj + + +def _mark_device(device_group, identity_sets): + element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip() + instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip() + active = False + if element_uuid and element_uuid in identity_sets["device_element_uuids"]: + active = True + if instance_id and instance_id in identity_sets["device_instance_ids"]: + active = True + + if active: + _set_status(device_group, SYNC_STATUS_ACTIVE) + return "active" + + _set_status( + device_group, + SYNC_STATUS_STALE, + "This 3D device is not present in the latest 2D exchange payload.", + ) + return "stale" + + +def _mark_cabinet(cabinet_group, identity_sets): + cabinet_instance_id = (getattr(cabinet_group, "QetCabinetInstanceId", "") or "").strip() + active = bool(cabinet_instance_id and cabinet_instance_id in identity_sets["cabinet_instance_ids"]) + if active: + _set_status(cabinet_group, SYNC_STATUS_ACTIVE) + return "active" + + _set_status( + cabinet_group, + SYNC_STATUS_STALE, + "This 3D cabinet is not present in the latest 2D exchange payload.", + ) + return "stale" + + +def _mark_terminals(device_group, identity_sets): + report = {"active": 0, "stale": 0} + terminal_group = TerminalObjects.find_child_group_by_kind( + device_group, + TerminalObjects.TERMINAL_GROUP_KIND, + ) + for terminal in TerminalObjects.collect_terminal_objects(terminal_group): + terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip() + if TerminalObjects.is_local_terminal_uuid(terminal_uuid): + _set_status(terminal, SYNC_STATUS_ACTIVE) + report["active"] += 1 + continue + if terminal_uuid and terminal_uuid in identity_sets["terminal_uuids"]: + _set_status(terminal, SYNC_STATUS_ACTIVE) + report["active"] += 1 + continue + _set_status( + terminal, + SYNC_STATUS_STALE, + "This 3D terminal is not present in the latest 2D exchange payload.", + ) + report["stale"] += 1 + return report + + +def _iter_wire_task_objects(doc): + task_group = doc.getObject("QETWiring_01_Tasks") + if task_group is None: + return [] + + result = [] + for obj in list(getattr(task_group, "Group", []) or []): + if (getattr(obj, "RouteType", "") or "").strip() != "Task": + continue + if "QetWireUuid" not in getattr(obj, "PropertiesList", []): + continue + result.append(obj) + return result + + +def _iter_routed_wire_objects(doc): + if WiringObjects is None: + return [] + try: + return WiringObjects.iter_routed_wire_objects(doc) + except Exception: + return [] + + +def _mark_wire_object(obj, identity_sets, stale_reason): + wire_uuid = (getattr(obj, "QetWireUuid", "") or "").strip() + if wire_uuid and wire_uuid in identity_sets["wire_uuids"]: + _set_status(obj, SYNC_STATUS_ACTIVE) + return "active" + + _set_status(obj, SYNC_STATUS_STALE, stale_reason) + return "stale" + + +def mark_stale_objects_from_payload(payload, doc=None): + if doc is None: + doc = getattr(App, "ActiveDocument", None) + if doc is None: + raise RuntimeError("No active FreeCAD document is available.") + if not isinstance(payload, dict): + raise RuntimeError("Exchange payload must be an object.") + + identity_sets = _payload_identity_sets(payload) + report = { + "active_cabinets": 0, + "stale_cabinets": 0, + "active_devices": 0, + "stale_devices": 0, + "active_terminals": 0, + "stale_terminals": 0, + "active_wire_tasks": 0, + "stale_wire_tasks": 0, + "active_routed_wires": 0, + "stale_routed_wires": 0, + "warnings": [], + } + + for cabinet_group in _iter_cabinet_groups(doc): + cabinet_status = _mark_cabinet(cabinet_group, identity_sets) + if cabinet_status == "active": + report["active_cabinets"] += 1 + else: + report["stale_cabinets"] += 1 + + for device_group in _iter_device_groups(doc): + device_status = _mark_device(device_group, identity_sets) + if device_status == "active": + report["active_devices"] += 1 + else: + report["stale_devices"] += 1 + + terminal_report = _mark_terminals(device_group, identity_sets) + report["active_terminals"] += terminal_report["active"] + report["stale_terminals"] += terminal_report["stale"] + + for task in _iter_wire_task_objects(doc): + status = _mark_wire_object( + task, + identity_sets, + "This 3D wire task is not present in the latest 2D exchange payload.", + ) + if status == "active": + report["active_wire_tasks"] += 1 + else: + report["stale_wire_tasks"] += 1 + + for wire in _iter_routed_wire_objects(doc): + status = _mark_wire_object( + wire, + identity_sets, + "This routed 3D wire is not present in the latest 2D exchange payload.", + ) + if status == "active": + report["active_routed_wires"] += 1 + else: + report["stale_routed_wires"] += 1 + + return report diff --git a/src/Mod/FreeCADExchange/TerminalImport.py b/src/Mod/FreeCADExchange/TerminalImport.py index fc297d4..e4b6435 100644 --- a/src/Mod/FreeCADExchange/TerminalImport.py +++ b/src/Mod/FreeCADExchange/TerminalImport.py @@ -556,21 +556,6 @@ def import_terminals_from_payload(payload, scene_path=""): _hide_object(source_obj) report["reused_template_hints"] += 1 - for terminal_uuid, terminal_obj in list(existing_by_uuid.items()): - if terminal_uuid in used_uuids: - continue - if terminal_obj in used_objects: - continue - if TerminalObjects.is_local_terminal_uuid(terminal_uuid): - continue - report["warnings"].append( - "Removed stale terminal {0} from device {1}.".format( - terminal_uuid, device_element_uuid - ) - ) - TerminalObjects.remove_object_tree(doc, terminal_obj) - report["removed_terminals"] += 1 - doc.recompute() if Gui is not None: try: diff --git a/src/Mod/FreeCADExchange/WiringImport.py b/src/Mod/FreeCADExchange/WiringImport.py index b9c9eef..f2ba041 100644 --- a/src/Mod/FreeCADExchange/WiringImport.py +++ b/src/Mod/FreeCADExchange/WiringImport.py @@ -143,40 +143,6 @@ def _find_task_by_wire_uuid(task_group, wire_uuid): return None -def _remove_task_object(doc, task_group, task): - try: - if task in list(getattr(task_group, "Group", []) or []): - task_group.removeObject(task) - except Exception: - try: - task_group.Group = [ - candidate - for candidate in list(getattr(task_group, "Group", []) or []) - if candidate is not task - ] - except Exception: - pass - try: - doc.removeObject(getattr(task, "Name", "")) - except Exception: - pass - - -def _remove_stale_wire_tasks(doc, task_group, active_wire_uuids): - active = {(wire_uuid or "").strip() for wire_uuid in active_wire_uuids if (wire_uuid or "").strip()} - removed = 0 - for candidate in list(getattr(task_group, "Group", []) or []): - wire_uuid = (getattr(candidate, "QetWireUuid", "") or "").strip() - route_type = (getattr(candidate, "RouteType", "") or "").strip() - if not wire_uuid or route_type != "Task": - continue - if wire_uuid in active: - continue - _remove_task_object(doc, task_group, candidate) - removed += 1 - return removed - - def _ensure_string_property(obj, prop_name, value, description="QET wire task property"): TerminalObjects.ensure_string_property( obj, @@ -273,7 +239,6 @@ def import_wire_tasks_from_payload(payload, doc=None): } device_labels = _device_display_map(payload) - active_wire_uuids = [] for index, item in enumerate(wires): try: entry = _normalize_wire_entry(item, index, device_labels=device_labels) @@ -282,17 +247,10 @@ def import_wire_tasks_from_payload(payload, doc=None): report["warnings"].append(str(exc)) continue - active_wire_uuids.append(entry["wire_uuid"]) _task, created = _upsert_wire_task(doc, task_group, project_uuid, entry) if created: report["imported_tasks"] += 1 else: report["updated_tasks"] += 1 - report["removed_stale_tasks"] = _remove_stale_wire_tasks( - doc, - task_group, - active_wire_uuids, - ) - return report diff --git a/tests/python/freecad_exchange_device_import_fcstd_test.py b/tests/python/freecad_exchange_device_import_fcstd_test.py index 6f20916..5e53b91 100644 --- a/tests/python/freecad_exchange_device_import_fcstd_test.py +++ b/tests/python/freecad_exchange_device_import_fcstd_test.py @@ -268,6 +268,93 @@ class FcstdDeviceImportTest(unittest.TestCase): self.assertIs(app.ActiveDocument, scene_doc) self.assertIn("QETScene", app.set_active_document_calls) + def test_legacy_singleton_cabinet_group_is_reused_without_reimport(self): + with tempfile.TemporaryDirectory() as temp_dir: + cabinet_path = Path(temp_dir) / "cabinet.step" + cabinet_path.write_text("fake step placeholder", encoding="utf-8") + _install_fake_freecad(None) + + device_import, _ = _reload_modules() + + doc = FakeDocument("QETScene") + root_group = device_import._ensure_root_group( + doc, + {"location_id": 42, "resolved_scene_path": str(cabinet_path)}, + "project-1", + ) + legacy_group = doc.addObject("App::DocumentObjectGroup", "QETCabinetModel") + root_group.addObject(legacy_group) + device_import._ensure_string_property( + legacy_group, + "QetCabinetResolvedScenePath", + "QET Exchange", + "", + str(cabinet_path), + ) + existing_model = doc.addObject("Part::Feature", "CabinetBody") + legacy_group.addObject(existing_model) + + import_calls = [] + + def fake_import_model(doc_arg, group_arg, model_path, merge=False, use_link_group=True): + import_calls.append((doc_arg, group_arg, model_path, merge, use_link_group)) + return [] + + device_import._import_model_into_group = fake_import_model + + report = { + "cabinet_imported": 0, + "cabinet_reused": 0, + "cabinet_skipped_missing_model": 0, + "cabinet_skipped_missing_file": 0, + "cabinet_skipped_unsupported_format": 0, + "cabinet_skipped_import_error": 0, + "warnings": [], + } + cabinet = { + "location_id": 42, + "resolved_scene_path": str(cabinet_path), + "display_text": "Main Cabinet", + } + + device_import._import_cabinet_model(doc, root_group, cabinet, report) + + self.assertEqual([], import_calls) + self.assertEqual(0, report["cabinet_imported"]) + self.assertEqual(1, report["cabinet_reused"]) + self.assertIs(doc.getObject("QETCabinetModel"), legacy_group) + self.assertEqual("42", legacy_group.QetCabinetInstanceId) + self.assertEqual(str(cabinet_path), legacy_group.QetCabinetResolvedScenePath) + self.assertEqual([existing_model], legacy_group.Group) + + def test_cabinet_groups_are_separated_by_instance_id(self): + _install_fake_freecad(None) + device_import, _ = _reload_modules() + + doc = FakeDocument("QETScene") + root_group = device_import._ensure_root_group(doc, None, "project-1") + + group_a = device_import._ensure_cabinet_model_group( + doc, + root_group, + {"location_id": 1, "resolved_scene_path": r"D:\cabinet.step", "display_text": "A"}, + "project-1", + ) + group_b = device_import._ensure_cabinet_model_group( + doc, + root_group, + {"location_id": 2, "resolved_scene_path": r"D:\cabinet.step", "display_text": "B"}, + "project-1", + ) + + self.assertIsNot(group_a, group_b) + self.assertEqual("QETCabinet_1", group_a.Name) + self.assertEqual("QETCabinet_2", group_b.Name) + self.assertEqual("1", group_a.QetCabinetInstanceId) + self.assertEqual("2", group_b.QetCabinetInstanceId) + self.assertIn(group_a, root_group.Group) + self.assertIn(group_b, root_group.Group) + def test_fcstd_import_preserves_template_slots_without_live_template_lcs(self): source = FakeDocument("Source", r"D:\models\breaker.FCStd") _install_fake_freecad(source) diff --git a/tests/python/freecad_exchange_stale_object_actions_test.py b/tests/python/freecad_exchange_stale_object_actions_test.py new file mode 100644 index 0000000..cf246cf --- /dev/null +++ b/tests/python/freecad_exchange_stale_object_actions_test.py @@ -0,0 +1,184 @@ +import importlib +import sys +import types +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_DIR = REPO_ROOT / "src" / "Mod" / "FreeCADExchange" +if str(MODULE_DIR) not in sys.path: + sys.path.insert(0, str(MODULE_DIR)) + + +def _install_fake_freecad(): + fake_freecad = types.ModuleType("FreeCAD") + fake_freecad.ActiveDocument = None + fake_freecad.Console = types.SimpleNamespace( + PrintMessage=lambda *args, **kwargs: None, + PrintWarning=lambda *args, **kwargs: None, + PrintError=lambda *args, **kwargs: None, + ) + sys.modules["FreeCAD"] = fake_freecad + + fake_gui = types.ModuleType("FreeCADGui") + fake_gui.commands = {} + fake_gui.addCommand = lambda name, command: fake_gui.commands.setdefault(name, command) + sys.modules["FreeCADGui"] = fake_gui + + +class FakeViewObject: + def __init__(self): + self.Visibility = True + + +class FakeObject: + def __init__(self, name, type_id): + self.Name = name + self.Label = name + self.TypeId = type_id + self.PropertiesList = [] + self.Group = [] + self.InList = [] + self.ViewObject = FakeViewObject() + + def isDerivedFrom(self, type_name): + if type_name == "App::DocumentObjectGroup": + return self.TypeId == "App::DocumentObjectGroup" + return self.TypeId == type_name + + def addProperty(self, prop_type, prop_name, group_name, description): + if prop_name not in self.PropertiesList: + self.PropertiesList.append(prop_name) + + def addObject(self, child): + if child not in self.Group: + self.Group.append(child) + if self not in child.InList: + child.InList.append(self) + + +class FakeDocument: + def __init__(self): + self.Name = "FakeDoc" + self.Objects = [] + self.recomputed = False + + def addObject(self, type_name, name): + obj = FakeObject(name, type_name) + self.Objects.append(obj) + return obj + + def getObject(self, name): + for obj in self.Objects: + if obj.Name == name: + return obj + return None + + def recompute(self): + self.recomputed = True + + +def _reload_modules(): + for name in ["TerminalObjects", "StaleObjectActions"]: + sys.modules.pop(name, None) + terminal_objects = importlib.import_module("TerminalObjects") + stale_object_actions = importlib.import_module("StaleObjectActions") + return terminal_objects, stale_object_actions + + +def _set_stale(terminal_objects, obj): + terminal_objects.ensure_string_property( + obj, + "QetSyncStatus", + "QET Sync", + "Latest 2D/3D synchronization status", + "Stale", + ) + + +class StaleObjectActionsTest(unittest.TestCase): + def test_hides_and_shows_stale_object_trees(self): + _install_fake_freecad() + terminal_objects, stale_object_actions = _reload_modules() + + doc = FakeDocument() + sys.modules["FreeCAD"].ActiveDocument = doc + + cabinet = doc.addObject("App::DocumentObjectGroup", "QETCabinet_1") + terminal_objects.ensure_string_property(cabinet, "QetCabinetInstanceId", "QET Exchange", "", "1") + _set_stale(terminal_objects, cabinet) + + cabinet_model = doc.addObject("Part::Feature", "CabinetModel") + cabinet.addObject(cabinet_model) + + device = doc.addObject("App::DocumentObjectGroup", "QETDevice_device_stale") + terminal_objects.ensure_string_property(device, "QetElementUuid", "QET Exchange", "", "device-stale") + _set_stale(terminal_objects, device) + + model_child = doc.addObject("Part::Feature", "ImportedModel") + device.addObject(model_child) + + terminal = doc.addObject("Part::LocalCoordinateSystem", "QETTerminal_terminal_stale") + terminal_objects.set_terminal_semantics( + terminal, + "project-1", + "device-stale", + "terminal-stale", + "instance-stale", + ) + _set_stale(terminal_objects, terminal) + + task = doc.addObject("Part::Feature", "QETWireTask_wire_stale") + terminal_objects.ensure_string_property(task, "QetWireUuid", "QET Wiring", "", "wire-stale") + terminal_objects.ensure_string_property(task, "RouteType", "QET Wiring", "", "Task") + _set_stale(terminal_objects, task) + + routed_group = doc.addObject("App::DocumentObjectGroup", "QETWiring_04_Routed") + routed_wire = doc.addObject("Part::Feature", "QETWire_wire_stale") + terminal_objects.ensure_string_property(routed_wire, "QetWireUuid", "QET Wiring", "", "wire-stale") + routed_group.addObject(routed_wire) + _set_stale(terminal_objects, routed_wire) + + report = stale_object_actions.hide_stale_objects(doc) + + self.assertEqual(5, report["total"]) + self.assertEqual(1, report["cabinets"]) + self.assertEqual(1, report["devices"]) + self.assertEqual(1, report["terminals"]) + self.assertEqual(1, report["wire_tasks"]) + self.assertEqual(1, report["routed_wires"]) + self.assertFalse(cabinet.ViewObject.Visibility) + self.assertFalse(cabinet_model.ViewObject.Visibility) + self.assertFalse(device.ViewObject.Visibility) + self.assertFalse(model_child.ViewObject.Visibility) + self.assertFalse(terminal.ViewObject.Visibility) + self.assertFalse(task.ViewObject.Visibility) + self.assertFalse(routed_wire.ViewObject.Visibility) + self.assertTrue(doc.recomputed) + + show_report = stale_object_actions.show_stale_objects(doc) + + self.assertEqual(7, show_report["affected_objects"]) + self.assertTrue(cabinet.ViewObject.Visibility) + self.assertTrue(cabinet_model.ViewObject.Visibility) + self.assertTrue(device.ViewObject.Visibility) + self.assertTrue(model_child.ViewObject.Visibility) + self.assertTrue(terminal.ViewObject.Visibility) + self.assertTrue(task.ViewObject.Visibility) + self.assertTrue(routed_wire.ViewObject.Visibility) + + def test_registers_toolbar_commands(self): + _install_fake_freecad() + _terminal_objects, stale_object_actions = _reload_modules() + + stale_object_actions.register_commands() + + commands = sys.modules["FreeCADGui"].commands + self.assertIn("QET_Exchange_HideStaleObjects", commands) + self.assertIn("QET_Exchange_ShowStaleObjects", commands) + self.assertIn("QET_Exchange_SummarizeStaleObjects", commands) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/python/freecad_exchange_stale_object_sync_test.py b/tests/python/freecad_exchange_stale_object_sync_test.py new file mode 100644 index 0000000..d569cb4 --- /dev/null +++ b/tests/python/freecad_exchange_stale_object_sync_test.py @@ -0,0 +1,224 @@ +import importlib +import sys +import types +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_DIR = REPO_ROOT / "src" / "Mod" / "FreeCADExchange" +if str(MODULE_DIR) not in sys.path: + sys.path.insert(0, str(MODULE_DIR)) + + +def _install_fake_freecad(): + fake_freecad = types.ModuleType("FreeCAD") + fake_freecad.ActiveDocument = None + fake_freecad.Console = types.SimpleNamespace( + PrintMessage=lambda *args, **kwargs: None, + PrintWarning=lambda *args, **kwargs: None, + PrintError=lambda *args, **kwargs: None, + ) + sys.modules["FreeCAD"] = fake_freecad + + +class FakeViewObject: + def __init__(self): + self.Visibility = True + + +class FakeObject: + def __init__(self, name, type_id): + self.Name = name + self.Label = name + self.TypeId = type_id + self.PropertiesList = [] + self.Group = [] + self.InList = [] + self.ViewObject = FakeViewObject() + + def isDerivedFrom(self, type_name): + return self.TypeId == type_name + + def addProperty(self, prop_type, prop_name, group_name, description): + if prop_name not in self.PropertiesList: + self.PropertiesList.append(prop_name) + + def addObject(self, child): + if child not in self.Group: + self.Group.append(child) + if self not in child.InList: + child.InList.append(self) + + def removeObject(self, child): + if child in self.Group: + self.Group.remove(child) + if self in child.InList: + child.InList.remove(self) + + +class FakeDocument: + def __init__(self): + self.Name = "FakeDoc" + self.Objects = [] + + def addObject(self, type_name, name): + obj = FakeObject(name, type_name) + self.Objects.append(obj) + return obj + + def getObject(self, name): + for obj in self.Objects: + if obj.Name == name: + return obj + return None + + +def _reload_modules(): + for name in [ + "DeviceImport", + "TerminalObjects", + "WiringObjects", + "StaleObjectSync", + ]: + sys.modules.pop(name, None) + terminal_objects = importlib.import_module("TerminalObjects") + wiring_objects = importlib.import_module("WiringObjects") + stale_object_sync = importlib.import_module("StaleObjectSync") + return terminal_objects, wiring_objects, stale_object_sync + + +class StaleObjectSyncTest(unittest.TestCase): + def test_marks_devices_terminals_and_wires_missing_from_payload_as_stale(self): + _install_fake_freecad() + terminal_objects, wiring_objects, stale_object_sync = _reload_modules() + + doc = FakeDocument() + root = terminal_objects.ensure_root_group(doc, "project-1") + + active_cabinet = doc.addObject("App::DocumentObjectGroup", "QETCabinet_1") + active_cabinet.addProperty("App::PropertyString", "QetCabinetInstanceId", "QET Exchange", "") + active_cabinet.QetCabinetInstanceId = "1" + root.addObject(active_cabinet) + + stale_cabinet = doc.addObject("App::DocumentObjectGroup", "QETCabinet_2") + stale_cabinet.addProperty("App::PropertyString", "QetCabinetInstanceId", "QET Exchange", "") + stale_cabinet.QetCabinetInstanceId = "2" + root.addObject(stale_cabinet) + + active_device = doc.addObject("App::Part", "QETDevice_device_active") + active_device.addProperty("App::PropertyString", "QetElementUuid", "QET Exchange", "") + active_device.QetElementUuid = "device-active" + active_device.addProperty("App::PropertyString", "QetInstanceId", "QET Exchange", "") + active_device.QetInstanceId = "instance-active" + root.addObject(active_device) + + stale_device = doc.addObject("App::Part", "QETDevice_device_stale") + stale_device.addProperty("App::PropertyString", "QetElementUuid", "QET Exchange", "") + stale_device.QetElementUuid = "device-stale" + stale_device.addProperty("App::PropertyString", "QetInstanceId", "QET Exchange", "") + stale_device.QetInstanceId = "instance-stale" + root.addObject(stale_device) + + active_terminals = terminal_objects.ensure_terminal_group( + doc, + active_device, + project_uuid="project-1", + instance_id="instance-active", + ) + active_terminal = doc.addObject("Part::LocalCoordinateSystem", "QETTerminal_terminal_active") + terminal_objects.set_terminal_semantics( + active_terminal, + "project-1", + "device-active", + "terminal-active", + "instance-active", + ) + active_terminals.addObject(active_terminal) + + stale_terminals = terminal_objects.ensure_terminal_group( + doc, + stale_device, + project_uuid="project-1", + instance_id="instance-stale", + ) + stale_terminal = doc.addObject("Part::LocalCoordinateSystem", "QETTerminal_terminal_stale") + terminal_objects.set_terminal_semantics( + stale_terminal, + "project-1", + "device-stale", + "terminal-stale", + "instance-stale", + ) + stale_terminals.addObject(stale_terminal) + + active_task = wiring_objects.create_wire_task( + doc, + "project-1", + "wire-active", + "wire-active", + "terminal-active", + "terminal-b", + "instance-active", + "instance-b", + ) + stale_task = wiring_objects.create_wire_task( + doc, + "project-1", + "wire-stale", + "wire-stale", + "terminal-stale", + "terminal-c", + "instance-stale", + "instance-c", + ) + + payload = { + "project_uuid": "project-1", + "cabinet": { + "location_id": 1, + }, + "devices": [ + { + "element_uuid": "device-active", + "instance_id": "instance-active", + } + ], + "terminals": [ + { + "terminal_uuid": "terminal-active", + "instance_id": "instance-active", + "element_uuid": "device-active", + } + ], + "wires": [ + { + "wire_id": "wire-active", + "start_terminal_uuid": "terminal-active", + "end_terminal_uuid": "terminal-b", + } + ], + } + + report = stale_object_sync.mark_stale_objects_from_payload(payload, doc) + + self.assertEqual(1, report["active_cabinets"]) + self.assertEqual(1, report["stale_cabinets"]) + self.assertEqual(1, report["active_devices"]) + self.assertEqual(1, report["stale_devices"]) + self.assertEqual(1, report["active_terminals"]) + self.assertEqual(1, report["stale_terminals"]) + self.assertEqual(1, report["active_wire_tasks"]) + self.assertEqual(1, report["stale_wire_tasks"]) + self.assertEqual("Active", active_cabinet.QetSyncStatus) + self.assertEqual("Stale", stale_cabinet.QetSyncStatus) + self.assertEqual("Active", active_device.QetSyncStatus) + self.assertEqual("Stale", stale_device.QetSyncStatus) + self.assertEqual("Active", active_terminal.QetSyncStatus) + self.assertEqual("Stale", stale_terminal.QetSyncStatus) + self.assertEqual("Active", active_task.QetSyncStatus) + self.assertEqual("Stale", stale_task.QetSyncStatus) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/python/freecad_exchange_wiring_import_test.py b/tests/python/freecad_exchange_wiring_import_test.py index 7036e00..d63ac4c 100644 --- a/tests/python/freecad_exchange_wiring_import_test.py +++ b/tests/python/freecad_exchange_wiring_import_test.py @@ -209,7 +209,7 @@ class WiringImportTest(unittest.TestCase): self.assertEqual("W001-updated", task_group.Group[0].QetWireMark) self.assertEqual("Routed", task_group.Group[0].RouteStatus) - def test_reimport_removes_stale_wire_tasks_not_in_current_payload(self): + def test_reimport_keeps_stale_wire_tasks_for_sync_marking(self): _install_fake_freecad() terminal_objects, wiring_objects, wiring_import = _reload_modules() @@ -242,10 +242,13 @@ class WiringImportTest(unittest.TestCase): report = wiring_import.import_wire_tasks_from_payload(payload, doc) self.assertEqual(1, report["imported_tasks"]) - self.assertEqual(1, report["removed_stale_tasks"]) - self.assertIsNone(doc.getObject(stale_task.Name)) - self.assertEqual(1, len(task_group.Group)) - self.assertEqual("direction:new-wire:0", task_group.Group[0].QetWireUuid) + self.assertEqual(0, report["removed_stale_tasks"]) + self.assertIsNotNone(doc.getObject(stale_task.Name)) + self.assertEqual(2, len(task_group.Group)) + self.assertEqual( + {"conductor:old-wire", "direction:new-wire:0"}, + {task.QetWireUuid for task in task_group.Group}, + ) if __name__ == "__main__":