From 44c2e448b5959d2adf9cec6a9811e2245d4c34b6 Mon Sep 17 00:00:00 2001 From: Zhaowenlong Date: Mon, 25 May 2026 18:47:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=AF=BC=E7=BA=BF?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=AF=BC=E5=85=A5=E4=B8=8E=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E5=B8=83=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...子显示连线保存回写开发文档.md | 28 +- src/Mod/FreeCADExchange/CMakeLists.txt | 1 + src/Mod/FreeCADExchange/ExchangeBootstrap.py | 123 ++++++++- src/Mod/FreeCADExchange/ManualWiringPanel.py | 244 ++++++++++++++++-- src/Mod/FreeCADExchange/WiringImport.py | 208 +++++++++++++++ .../freecad_exchange_bootstrap_wiring_test.py | 55 +++- ...eecad_exchange_manual_wiring_panel_test.py | 116 +++++++++ .../freecad_exchange_wiring_import_test.py | 159 ++++++++++++ 8 files changed, 904 insertions(+), 30 deletions(-) create mode 100644 src/Mod/FreeCADExchange/WiringImport.py create mode 100644 tests/python/freecad_exchange_wiring_import_test.py diff --git a/docs/FreeCAD 端子显示连线保存回写开发文档.md b/docs/FreeCAD 端子显示连线保存回写开发文档.md index 5423e9f..e69117e 100644 --- a/docs/FreeCAD 端子显示连线保存回写开发文档.md +++ b/docs/FreeCAD 端子显示连线保存回写开发文档.md @@ -134,13 +134,13 @@ STEP / STP / STE 适合作为模板制作的输入,不建议作为长期带电 1. 正式方式:使用 FCStd 模板,在模板里提前放好 LCS 端子对象。 2. 过渡方式:STEP + sidecar JSON,在同目录下保存端子槽位坐标。 -3. 验证方式:没有模板语义时,临时使用 bbox fallback 生成端子位置。 +3. 如果没有模板语义,则跳过端子生成并输出警告,不再用 bbox fallback 猜端子位置。 sidecar 只作为 FreeCAD 端模板辅助文件,不进入第一版数据库绑定主键。 sidecar 里除了端子坐标,还可以继续补端子朝向,例如 `rotation`,让模板端子不只是“有位置”,还可以“有方向”。 -FCStd 模板里的 LCS 如果已经带了 Placement 朝向,导入时也要一并保留,这样端子不只是有坐标,还能保留真实出线方向。 +FCStd 模板里的 LCS 如果已经带了 Placement 朝向,导入时要把位置和朝向一并读取出来,这样生成的工程端子不只是有坐标,还能保留真实出线方向。 ### 3.1 FCStd 设备模板制作约定 @@ -172,6 +172,22 @@ FCStd 设备模板用于解决“这个模型本身就带端子语义”的问 这些工程级字段由导入项目时的 `2d_to_3d.json` 和 FreeCADExchange 运行时补齐。这样同一个 FCStd 设备模板才能在多个工程、多台机器、多人之间复用。 +### 3.1.1 FCStd 导入运行时约定 + +FCStd 模板中的端子 LCS 是“模板槽位定义”,不是最终留在工程场景里的端子对象。 + +导入到 `QETScene` 时,FreeCADExchange 的处理顺序是: + +1. 复制 FCStd 中的几何对象到设备组。 +2. 读取模板 LCS 的 `QetTemplateSlotName / Label / Placement`。 +3. 将槽位位置和朝向序列化到设备组属性 `QetTemplateSlotsJson`。 +4. 删除复制出来的模板 LCS 及其 `OriginFeatures`,避免这些模板对象留在 `App::Part` 中参与变换或选择。 +5. 后续由 `TerminalImport.py` 按 `terminal_uuid` 在 `QETTerminals_*` 下创建工程端子 LCS。 + +这样做不会改变第一版数据库约束:`QetTemplateSlotsJson` 只是 FreeCAD 文档内部的运行时缓存,不进入 `project_2d3d_symbol_binding` 或 `project_2d3d_terminal_binding`。 + +如果同一个设备从 FCStd 切回 STEP / IGES / BREP 等非 FCStd 模型,导入时会清空旧的 `QetTemplateSlotsJson`,避免继续沿用上一次 FCStd 的端子槽位。 + ### 3.2 模板制作工具目标 后续在 `FreeCADExchange` 中新增模板制作能力,目标是让用户不需要手工给 LCS 添加属性: @@ -416,11 +432,11 @@ ManualWiring.py 端子位置策略: -1. 如果 FCStd 模板已有 `Role="Terminal"` 的 LCS,则优先复用模板 LCS。 +1. 如果 FCStd 模板已有 `Role="Terminal"` 的 LCS,则优先读取模板 LCS 的槽位信息,再生成工程端子 LCS。 2. 如果有 sidecar JSON,则按 sidecar 坐标创建。 -3. 如果只有 STEP,则先按设备包围盒生成临时端子排列,用于打通流程。 +3. 如果只有 STEP 且没有 sidecar,则不生成端子,并在导入报告中提示缺少模板槽位。 -临时端子排列只用于第一版验证,不作为长期物理端子定义。 +第一版不再按设备包围盒猜测临时端子,避免把错误端子位置保存进工程。 验收: @@ -598,4 +614,6 @@ ManualWiring.py - 2026-05-20:修复 `TemplateAuthoring.py` 在 `FreeCADCmd.exe` 命令行模式下导入时误注册 GUI 命令的问题;已在运行目录验证创建 `P1` 模板端子、保存 `.FCStd`、重新打开后端子语义仍可识别。 - 2026-05-20:确定方案 2 开发目标:新增“设备模板端子制作”任务面板,让 CAD 工作人员通过输入端子名、选择模型位置、点击按钮完成添加端子、校验端子和保存 FCStd,不再依赖 Python 控制台。 - 2026-05-20:新增 `TemplateAuthoringPanel.py`,提供“设备模板端子制作”任务面板和 `QET_Template_OpenAuthoringPanel` 命令;面板支持输入端子名、添加端子、校验端子、保存 FCStd,并已同步到运行目录验证模块可导入。 +- 2026-05-25:新增 `WiringImport.py`,把 `2d_to_3d.json` 中的 `wires` 导入为 `QETWiring_01_Tasks` 下的导线任务;`ExchangeBootstrap.py` 已接入启动导入流程。`ManualWiringPanel.py` 增加任务列表、选择导线任务和删除最后折点,按任务生成导线时会把 `wire_id / net_uuid / group_uuid / wire_mark` 写入正式导线对象,并把任务状态更新为 `Routed`。已通过 35 项 `freecad_exchange*_test.py` 单元测试,并安装到 `D:\fc\run-FreeCAD-1.1.1` 运行目录验证 `WiringImport / ManualWiring / ManualWiringPanel / WiringObjects` 可导入。 +- 2026-05-25:修复 FCStd 设备导入后模板 LCS 留在工程场景的问题;导入时会把模板槽位位置和朝向缓存到设备组 `QetTemplateSlotsJson`,随后删除模板 LCS 及其 `OriginFeatures`,工程端子仍按 `terminal_uuid` 生成到 `QETTerminals_*`。已补单元测试验证 FCStd 导入不保留模板 LCS、切回 STEP 会清空旧槽位缓存,并避免重复访问已删除对象的 `Group / InList / Name`。 ``` diff --git a/src/Mod/FreeCADExchange/CMakeLists.txt b/src/Mod/FreeCADExchange/CMakeLists.txt index 58b3690..34436ae 100644 --- a/src/Mod/FreeCADExchange/CMakeLists.txt +++ b/src/Mod/FreeCADExchange/CMakeLists.txt @@ -12,6 +12,7 @@ set(FreeCADExchange_Scripts TemplateInstantiation.py TerminalImport.py WiringObjects.py + WiringImport.py ExchangeWriteBack.py ManualWiring.py ManualWiringPanel.py diff --git a/src/Mod/FreeCADExchange/ExchangeBootstrap.py b/src/Mod/FreeCADExchange/ExchangeBootstrap.py index 0e66c5c..3a6582a 100644 --- a/src/Mod/FreeCADExchange/ExchangeBootstrap.py +++ b/src/Mod/FreeCADExchange/ExchangeBootstrap.py @@ -19,6 +19,10 @@ try: import WiringObjects except Exception: WiringObjects = None +try: + import WiringImport +except Exception: + WiringImport = None try: from PySide6 import QtCore, QtWidgets @@ -37,6 +41,7 @@ STATE_PAYLOAD = "_qet_exchange_payload" STATE_SUMMARY = "_qet_exchange_summary" STATE_IMPORT_REPORT = "_qet_exchange_import_report" STATE_TERMINAL_IMPORT_REPORT = "_qet_exchange_terminal_import_report" +STATE_WIRING_IMPORT_REPORT = "_qet_exchange_wiring_import_report" STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report" STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled" STATE_TREE_FILTER = "_qet_exchange_tree_filter" @@ -371,6 +376,81 @@ def _normalize_terminals(payload): return normalized +def _optional_string(item, field_name, entry_label): + value = item.get(field_name, "") + if value is None: + return "" + if value and not isinstance(value, str): + raise ExchangeValidationError( + "Field '{0}' in {1} must be a string.".format(field_name, entry_label) + ) + return value.strip() if isinstance(value, str) else "" + + +def _normalize_conductor_uuids(item, entry_label): + values = item.get("conductor_uuids", []) + if values is None: + return [] + if not isinstance(values, list): + raise ExchangeValidationError( + "Field 'conductor_uuids' in {0} must be a list.".format(entry_label) + ) + normalized = [] + for conductor_index, value in enumerate(values): + if not isinstance(value, str): + raise ExchangeValidationError( + "Field 'conductor_uuids[{0}]' in {1} must be a string.".format( + conductor_index, + entry_label, + ) + ) + value = value.strip() + if value: + normalized.append(value) + return normalized + + +def _normalize_wires(payload): + wires = payload.get("wires", []) + if wires is None: + return [] + if not isinstance(wires, list): + raise ExchangeValidationError("Field 'wires' must be a list.") + + normalized = [] + for index, item in enumerate(wires): + entry_label = "wire entry #{0}".format(index) + if not isinstance(item, dict): + raise ExchangeValidationError( + "Wire entry #{0} must be an object.".format(index) + ) + wire_id = _require_string(item, "wire_id") + wire_mark_is_manual = item.get("wire_mark_is_manual", False) + if not isinstance(wire_mark_is_manual, bool): + raise ExchangeValidationError( + "Field 'wire_mark_is_manual' in {0} must be a bool.".format( + entry_label + ) + ) + normalized.append( + { + "wire_id": wire_id, + "net_uuid": _optional_string(item, "net_uuid", entry_label), + "group_uuid": _optional_string(item, "group_uuid", entry_label), + "wire_mark": _optional_string(item, "wire_mark", entry_label), + "wire_mark_is_manual": wire_mark_is_manual, + "start_element_uuid": _optional_string(item, "start_element_uuid", entry_label), + "start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label), + "end_element_uuid": _optional_string(item, "end_element_uuid", entry_label), + "end_terminal_uuid": _optional_string(item, "end_terminal_uuid", entry_label), + "start_terminal_display": _optional_string(item, "start_terminal_display", entry_label), + "end_terminal_display": _optional_string(item, "end_terminal_display", entry_label), + "conductor_uuids": _normalize_conductor_uuids(item, entry_label), + } + ) + return normalized + + def _normalize_device_models(payload): models = payload.get("device_models", []) if not isinstance(models, list): @@ -490,6 +570,7 @@ def load_exchange_payload(json_path): "devices": _normalize_devices(payload), "terminals": _normalize_terminals(payload), "device_models": _normalize_device_models(payload), + "wires": _normalize_wires(payload), } return normalized @@ -498,6 +579,7 @@ def _build_summary(payload, json_path): devices = payload["devices"] terminals = payload["terminals"] device_models = payload["device_models"] + wires = payload.get("wires", []) cabinet = payload.get("cabinet") missing_device_instances = sum(1 for item in devices if not item["instance_id"]) missing_terminal_instances = sum( @@ -512,6 +594,7 @@ def _build_summary(payload, json_path): "project_uuid": payload["project_uuid"], "device_count": len(devices), "terminal_count": len(terminals), + "wire_count": len(wires), "device_model_count": len(device_models), "device_models_with_parts": with_model_paths, "missing_device_instances": missing_device_instances, @@ -537,7 +620,23 @@ def _initialize_wiring_scene(payload): return None -def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None): +def _import_wiring_tasks(payload): + if WiringImport is None: + _append_debug_log("wire task import skipped: WiringImport module unavailable") + return None + + try: + return WiringImport.import_wire_tasks_from_payload( + payload, + App.ActiveDocument, + ) + except Exception as exc: + _append_debug_log("wire task import failed: {0}".format(exc)) + _append_debug_log(traceback.format_exc()) + return None + + +def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None): lines = [ "QET exchange file loaded successfully.", "", @@ -545,6 +644,7 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac "Exchange file: {0}".format(summary["json_path"]), "Devices: {0}".format(summary["device_count"]), "Terminals: {0}".format(summary["terminal_count"]), + "Wires: {0}".format(summary["wire_count"]), "Device models: {0}".format(summary["device_model_count"]), "Resolved model paths: {0}".format(summary["device_models_with_parts"]), ] @@ -662,6 +762,22 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac ] ) + if wiring_report: + lines.extend( + [ + "", + "3D wire tasks:", + "Imported tasks: {0}".format(wiring_report.get("imported_tasks", 0)), + "Updated tasks: {0}".format(wiring_report.get("updated_tasks", 0)), + ] + ) + warnings = wiring_report.get("warnings", []) + if warnings: + lines.append("Wire task warnings:") + lines.extend("- {0}".format(item) for item in warnings[:10]) + if len(warnings) > 10: + lines.append("- ... ({0} more)".format(len(warnings) - 10)) + lines.append("") lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.") lines.append("3D terminal import and write-back are enabled.") @@ -780,6 +896,9 @@ def _run_scheduled_device_import(attempt=0): setattr(App, STATE_TERMINAL_IMPORT_REPORT, terminal_report) _initialize_wiring_scene(payload) + wiring_report = _import_wiring_tasks(payload) + if wiring_report is not None: + setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report) if ExchangeWriteBack is None: _append_debug_log("write-back skipped: ExchangeWriteBack module unavailable") @@ -806,7 +925,7 @@ def _run_scheduled_device_import(attempt=0): _show_info( "QET Exchange", - _summary_message(summary, import_report, terminal_report, writeback_report), + _summary_message(summary, import_report, terminal_report, writeback_report, wiring_report), ) _append_debug_log("summary dialog shown") diff --git a/src/Mod/FreeCADExchange/ManualWiringPanel.py b/src/Mod/FreeCADExchange/ManualWiringPanel.py index cc8f0d6..d3c3488 100644 --- a/src/Mod/FreeCADExchange/ManualWiringPanel.py +++ b/src/Mod/FreeCADExchange/ManualWiringPanel.py @@ -119,6 +119,55 @@ def _selected_terminal(): return None +def _is_wire_task_object(obj): + if obj is None: + return False + return ( + "QetWireUuid" in getattr(obj, "PropertiesList", []) + and (getattr(obj, "RouteType", "") or "").strip() == "Task" + ) + + +def _task_label(obj): + if obj is None: + return "未选择" + label = (getattr(obj, "Label", "") or "").strip() + if label: + return label + return ( + getattr(obj, "QetWireMark", "") + or getattr(obj, "QetWireLabel", "") + or getattr(obj, "QetWireUuid", "") + or getattr(obj, "Name", "") + or "导线任务" + ) + + +def _iter_terminal_objects(doc): + root = None + try: + root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME) + except Exception: + root = None + if root is not None: + return TerminalObjects.collect_terminal_objects(root) + return [ + obj + for obj in list(getattr(doc, "Objects", []) or []) + if TerminalObjects.is_terminal_object(obj) + ] + + +def _find_terminal_by_uuid(doc, terminal_uuid): + target = (terminal_uuid or "").strip() + if not target: + return None + for terminal in _iter_terminal_objects(doc): + if getattr(terminal, "QetTerminalUuid", "").strip() == target: + return terminal + return None + + def _terminal_label(obj): return ( getattr(obj, "Label", "") @@ -267,9 +316,66 @@ def _create_preview_point(doc, waypoint, index): return preview +def _remove_preview_object(doc, preview): + if preview is None or doc is None: + return + + group = None + try: + group = doc.getObject("QETWiring_03_Previews") + except Exception: + group = None + + try: + if group is not None and preview in getattr(group, "Group", []): + if hasattr(group, "removeObject"): + group.removeObject(preview) + else: + group.Group = [obj for obj in group.Group if obj is not preview] + except Exception: + pass + try: + if doc.getObject(getattr(preview, "Name", "")) is not None: + doc.removeObject(preview.Name) + except Exception: + pass + + +def _set_task_route_status(task, status): + if task is None: + return + try: + TerminalObjects.ensure_string_property( + task, + "RouteStatus", + "QET Wiring", + "Wire task route status", + status, + ) + except Exception: + try: + task.RouteStatus = status + except Exception: + pass + + +def _task_wire_kwargs(task): + if task is None: + return {} + return { + "wire_uuid": getattr(task, "QetWireUuid", "").strip(), + "wire_label": getattr(task, "QetWireLabel", "").strip(), + "net_uuid": getattr(task, "QetNetUuid", "").strip(), + "group_uuid": getattr(task, "QetGroupUuid", "").strip(), + "wire_mark": getattr(task, "QetWireMark", "").strip(), + "wire_mark_is_manual": bool(getattr(task, "QetWireMarkIsManual", False)), + } + + class ManualWiringController: def __init__(self, terminal_exit_length=DEFAULT_TERMINAL_EXIT_LENGTH): self.terminal_exit_length = float(terminal_exit_length or 0.0) + self.current_task = None self.start_terminal = None self.waypoints = [] self.preview_objects = [] @@ -281,34 +387,52 @@ class ManualWiringController: self.preview_objects = [] return - group = None - try: - group = doc.getObject("QETWiring_03_Previews") - except Exception: - group = None - for preview in list(self.preview_objects): - try: - if group is not None and preview in getattr(group, "Group", []): - if hasattr(group, "removeObject"): - group.removeObject(preview) - else: - group.Group = [obj for obj in group.Group if obj is not preview] - except Exception: - pass - try: - if doc.getObject(getattr(preview, "Name", "")) is not None: - doc.removeObject(preview.Name) - except Exception: - pass + _remove_preview_object(doc, preview) self.preview_objects = [] def _reset_route_state(self): + self.current_task = None self.start_terminal = None self.waypoints = [] self.last_wire = None self._clear_preview_objects() + def available_tasks(self): + doc = _active_document() + try: + group = doc.getObject("QETWiring_01_Tasks") + except Exception: + group = None + if group is None: + return [] + return [ + obj + for obj in list(getattr(group, "Group", []) or []) + if _is_wire_task_object(obj) + ] + + def set_task_from_object(self, task): + if not _is_wire_task_object(task): + raise ManualWiringPanelError("请选择一个 QET 导线任务。") + + doc = _active_document() + start_terminal = _find_terminal_by_uuid( + doc, + getattr(task, "QetStartTerminalUuid", ""), + ) + end_terminal = _find_terminal_by_uuid( + doc, + getattr(task, "QetEndTerminalUuid", ""), + ) + if start_terminal is None or end_terminal is None: + raise ManualWiringPanelError("导线任务的起点或终点工程端子未找到。") + + self._reset_route_state() + self.current_task = task + self.start_terminal = start_terminal + return task + def set_start_from_selection(self): terminal = _selected_terminal() if terminal is None: @@ -327,21 +451,42 @@ class ManualWiringController: self.preview_objects.append(preview) return waypoint + def delete_last_waypoint(self): + if not self.waypoints: + return None + waypoint = self.waypoints.pop() + preview = self.preview_objects.pop() if self.preview_objects else None + _remove_preview_object(getattr(App, "ActiveDocument", None), preview) + return waypoint + def set_end_from_selection_and_generate(self): doc = _active_document() if self.start_terminal is None: raise ManualWiringPanelError("请先设置起点端子。") - end_terminal = _selected_terminal() + + wire_kwargs = {} + if self.current_task is not None: + end_terminal = _find_terminal_by_uuid( + doc, + getattr(self.current_task, "QetEndTerminalUuid", ""), + ) + wire_kwargs = _task_wire_kwargs(self.current_task) + else: + end_terminal = _selected_terminal() if end_terminal is None: raise ManualWiringPanelError("请先选择一个工程端子,再点击“设为终点并生成”。") + wire = ManualWiring.create_manual_wire( doc, self.start_terminal, end_terminal, waypoints=list(self.waypoints), terminal_exit_length=self.terminal_exit_length, + **wire_kwargs, ) self.last_wire = wire + if self.current_task is not None: + _set_task_route_status(self.current_task, "Routed") return wire def clear(self): @@ -364,6 +509,9 @@ class ManualWiringController: start_text = "未设置" if self.start_terminal is not None: start_text = _terminal_label(self.start_terminal) + task_text = "未选择" + if self.current_task is not None: + task_text = _task_label(self.current_task) wire_text = "未生成" if self.last_wire is not None: wire_text = getattr(self.last_wire, "Label", "") or getattr(self.last_wire, "Name", "") @@ -375,7 +523,8 @@ class ManualWiringController: ) if len(self.waypoints) > 3: waypoint_text += ";..." - return "起点:{0};折点:{1} 个;最近导线:{2};折点明细:{3}".format( + return "任务:{0};起点:{1};折点:{2} 个;最近导线:{3};折点明细:{4}".format( + task_text, start_text, len(self.waypoints), wire_text, @@ -394,14 +543,22 @@ class ManualWiringTaskPanel: layout = QtWidgets.QVBoxLayout(self.form) + self.task_list = QtWidgets.QListWidget() + self.use_task_button = QtWidgets.QPushButton("选择导线任务") + self.reload_tasks_button = QtWidgets.QPushButton("刷新任务") self.start_button = QtWidgets.QPushButton("设为起点") self.waypoint_button = QtWidgets.QPushButton("添加折点") + self.delete_waypoint_button = QtWidgets.QPushButton("删除最后折点") self.end_button = QtWidgets.QPushButton("设为终点并生成") self.clear_button = QtWidgets.QPushButton("清除草稿") self.save_button = QtWidgets.QPushButton("保存并回写") + layout.addWidget(self.task_list) + layout.addWidget(self.use_task_button) + layout.addWidget(self.reload_tasks_button) layout.addWidget(self.start_button) layout.addWidget(self.waypoint_button) + layout.addWidget(self.delete_waypoint_button) layout.addWidget(self.end_button) layout.addWidget(self.clear_button) layout.addWidget(self.save_button) @@ -413,12 +570,17 @@ class ManualWiringTaskPanel: self.status_label.setWordWrap(True) layout.addWidget(self.status_label) + self.task_objects = [] + self.use_task_button.clicked.connect(self.use_selected_task) + self.reload_tasks_button.clicked.connect(self._refresh_task_list) self.start_button.clicked.connect(self.set_start) self.waypoint_button.clicked.connect(self.add_waypoint) + self.delete_waypoint_button.clicked.connect(self.delete_last_waypoint) self.end_button.clicked.connect(self.set_end_and_generate) self.clear_button.clicked.connect(self.clear) self.save_button.clicked.connect(self.save_and_write_back) + self._refresh_task_list() self._refresh_waypoint_list() self._set_status(self.controller.state_text()) @@ -446,6 +608,34 @@ class ManualWiringTaskPanel: ) ) + def _refresh_task_list(self): + self.task_list.clear() + self.task_objects = [] + try: + self.task_objects = self.controller.available_tasks() + except Exception as exc: + self._set_error(str(exc)) + return + for task in self.task_objects: + self.task_list.addItem(_task_label(task)) + + def _selected_task_from_list(self): + row = self.task_list.currentRow() + if row < 0 or row >= len(self.task_objects): + return None + return self.task_objects[row] + + def use_selected_task(self): + try: + task = self._selected_task_from_list() + if task is None: + raise ManualWiringPanelError("请先在任务列表中选择一条导线任务。") + self.controller.set_task_from_object(task) + self._refresh_waypoint_list() + self._set_status("已选择导线任务:{0}".format(_task_label(task))) + except Exception as exc: + self._set_error(str(exc)) + def set_start(self): try: terminal = self.controller.set_start_from_selection() @@ -462,9 +652,21 @@ class ManualWiringTaskPanel: except Exception as exc: self._set_error(str(exc)) + def delete_last_waypoint(self): + try: + removed = self.controller.delete_last_waypoint() + self._refresh_waypoint_list() + if removed is None: + self._set_status("当前没有可删除的折点。") + else: + self._set_status(self.controller.state_text()) + except Exception as exc: + self._set_error(str(exc)) + def set_end_and_generate(self): try: wire = self.controller.set_end_from_selection_and_generate() + self._refresh_task_list() self._set_status("已生成导线:{0}".format(getattr(wire, "Name", ""))) try: if Gui is not None: diff --git a/src/Mod/FreeCADExchange/WiringImport.py b/src/Mod/FreeCADExchange/WiringImport.py new file mode 100644 index 0000000..0bf5112 --- /dev/null +++ b/src/Mod/FreeCADExchange/WiringImport.py @@ -0,0 +1,208 @@ +# FreeCADExchange wire task import helpers. + +import json + +import FreeCAD as App + +import TerminalObjects +import WiringObjects + + +class WiringImportError(RuntimeError): + pass + + +def _string_value(item, field_name): + value = item.get(field_name, "") + if value is None: + return "" + if not isinstance(value, str): + return str(value).strip() + return value.strip() + + +def _bool_value(item, field_name): + return bool(item.get(field_name, False)) + + +def _conductor_uuids(item): + values = item.get("conductor_uuids", []) + if not isinstance(values, list): + return [] + return [str(value).strip() for value in values if str(value).strip()] + + +def _normalize_wire_entry(item, index): + if not isinstance(item, dict): + raise WiringImportError("Wire entry #{0} must be an object.".format(index)) + + wire_uuid = ( + _string_value(item, "wire_id") + or _string_value(item, "wire_uuid") + or _string_value(item, "id") + ) + start_terminal_uuid = _string_value(item, "start_terminal_uuid") + end_terminal_uuid = _string_value(item, "end_terminal_uuid") + + if not wire_uuid: + raise WiringImportError("Wire entry #{0} is missing wire_id.".format(index)) + if not start_terminal_uuid or not end_terminal_uuid: + raise WiringImportError( + "Wire {0} is missing start_terminal_uuid or end_terminal_uuid.".format( + wire_uuid + ) + ) + + wire_mark = _string_value(item, "wire_mark") + wire_label = _string_value(item, "wire_label") or wire_mark or wire_uuid + return { + "wire_uuid": wire_uuid, + "wire_label": wire_label, + "net_uuid": _string_value(item, "net_uuid"), + "group_uuid": _string_value(item, "group_uuid"), + "wire_mark": wire_mark, + "wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"), + "start_element_uuid": _string_value(item, "start_element_uuid"), + "start_terminal_uuid": start_terminal_uuid, + "start_instance_id": _string_value(item, "start_instance_id"), + "start_terminal_display": _string_value(item, "start_terminal_display"), + "end_element_uuid": _string_value(item, "end_element_uuid"), + "end_terminal_uuid": end_terminal_uuid, + "end_instance_id": _string_value(item, "end_instance_id"), + "end_terminal_display": _string_value(item, "end_terminal_display"), + "conductor_uuids": _conductor_uuids(item), + } + + +def _unique_object_name(doc, base_name): + name = TerminalObjects.safe_token(base_name) + if doc.getObject(name) is None: + return name + suffix = 1 + while doc.getObject("{0}_{1}".format(name, suffix)) is not None: + suffix += 1 + return "{0}_{1}".format(name, suffix) + + +def _task_label(entry): + mark = entry["wire_mark"] or entry["wire_label"] or entry["wire_uuid"] + start = entry["start_terminal_display"] or entry["start_terminal_uuid"] + end = entry["end_terminal_display"] or entry["end_terminal_uuid"] + return "{0} {1} -> {2}".format(mark, start, end) + + +def _find_task_by_wire_uuid(task_group, wire_uuid): + target = (wire_uuid or "").strip() + if not target: + return None + for candidate in list(getattr(task_group, "Group", []) or []): + if getattr(candidate, "QetWireUuid", "").strip() == target: + return candidate + return None + + +def _ensure_string_property(obj, prop_name, value, description="QET wire task property"): + TerminalObjects.ensure_string_property( + obj, + prop_name, + "QET Wiring", + description, + value, + ) + + +def _set_task_extra_properties(task, entry): + _ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"]) + _ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"]) + _ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"]) + _ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"]) + _ensure_string_property( + task, + "QetConductorUuidsJson", + json.dumps(entry["conductor_uuids"], ensure_ascii=False), + ) + + +def _upsert_wire_task(doc, task_group, project_uuid, entry): + task = _find_task_by_wire_uuid(task_group, entry["wire_uuid"]) + created = task is None + previous_status = "" + if task is None: + task = doc.addObject( + "App::DocumentObjectGroup", + _unique_object_name(doc, "QETWireTask_{0}".format(entry["wire_uuid"])), + ) + task_group.addObject(task) + else: + previous_status = (getattr(task, "RouteStatus", "") or "").strip() + + WiringObjects.set_wire_task_semantics( + task, + project_uuid, + entry["wire_uuid"], + entry["wire_label"], + entry["start_terminal_uuid"], + entry["end_terminal_uuid"], + entry["start_instance_id"], + entry["end_instance_id"], + net_uuid=entry["net_uuid"], + group_uuid=entry["group_uuid"], + wire_mark=entry["wire_mark"], + wire_mark_is_manual=entry["wire_mark_is_manual"], + ) + _set_task_extra_properties(task, entry) + if previous_status and previous_status != "Task": + TerminalObjects.ensure_string_property( + task, + "RouteStatus", + "QET Wiring", + "Wire task route status", + previous_status, + ) + task.Label = _task_label(entry) + return task, created + + +def import_wire_tasks_from_payload(payload, doc=None): + if doc is None: + doc = getattr(App, "ActiveDocument", None) + if doc is None: + raise WiringImportError("No active FreeCAD document is available.") + if not isinstance(payload, dict): + raise WiringImportError("Exchange payload must be an object.") + + project_uuid = _string_value(payload, "project_uuid") + if not project_uuid: + raise WiringImportError("Field 'project_uuid' is required for wire task import.") + + wires = payload.get("wires", []) + if wires is None: + wires = [] + if not isinstance(wires, list): + raise WiringImportError("Field 'wires' must be a list.") + + task_group = WiringObjects.ensure_task_group(doc, project_uuid) + report = { + "project_uuid": project_uuid, + "total_wires": len(wires), + "imported_tasks": 0, + "updated_tasks": 0, + "skipped_invalid": 0, + "warnings": [], + } + + for index, item in enumerate(wires): + try: + entry = _normalize_wire_entry(item, index) + except WiringImportError as exc: + report["skipped_invalid"] += 1 + report["warnings"].append(str(exc)) + continue + + _task, created = _upsert_wire_task(doc, task_group, project_uuid, entry) + if created: + report["imported_tasks"] += 1 + else: + report["updated_tasks"] += 1 + + return report diff --git a/tests/python/freecad_exchange_bootstrap_wiring_test.py b/tests/python/freecad_exchange_bootstrap_wiring_test.py index 0485286..570d5a8 100644 --- a/tests/python/freecad_exchange_bootstrap_wiring_test.py +++ b/tests/python/freecad_exchange_bootstrap_wiring_test.py @@ -1,5 +1,7 @@ import importlib +import json import sys +import tempfile import types import unittest from pathlib import Path @@ -46,6 +48,15 @@ def _install_fake_modules(): fake_wiring.initialize_wiring_scene = lambda doc, project_uuid="": calls.append((doc, project_uuid)) or "root" sys.modules["WiringObjects"] = fake_wiring + wire_calls = [] + fake_wiring_import = types.ModuleType("WiringImport") + fake_wiring_import.WiringImportError = RuntimeError + fake_wiring_import.import_wire_tasks_from_payload = ( + lambda payload, doc=None: wire_calls.append((payload, doc)) + or {"imported_tasks": 1, "updated_tasks": 0, "warnings": []} + ) + sys.modules["WiringImport"] = fake_wiring_import + class _QObject: def __init__(self, *args, **kwargs): pass @@ -67,12 +78,12 @@ def _install_fake_modules(): fake_pyside.QtWidgets = fake_qt_widgets sys.modules["PySide6"] = fake_pyside - return fake_freecad, calls + return fake_freecad, calls, wire_calls class ExchangeBootstrapWiringTest(unittest.TestCase): def test_initialize_wiring_scene_uses_active_document_and_project_uuid(self): - app, calls = _install_fake_modules() + app, calls, _wire_calls = _install_fake_modules() sys.modules.pop("ExchangeBootstrap", None) bootstrap = importlib.import_module("ExchangeBootstrap") @@ -81,6 +92,46 @@ class ExchangeBootstrapWiringTest(unittest.TestCase): self.assertEqual("root", result) self.assertEqual([(app.ActiveDocument, "project-1")], calls) + def test_import_wiring_tasks_uses_active_document_and_payload(self): + app, _calls, wire_calls = _install_fake_modules() + sys.modules.pop("ExchangeBootstrap", None) + bootstrap = importlib.import_module("ExchangeBootstrap") + + payload = {"project_uuid": "project-1", "wires": [{"wire_id": "wire-1"}]} + result = bootstrap._import_wiring_tasks(payload) + + self.assertEqual({"imported_tasks": 1, "updated_tasks": 0, "warnings": []}, result) + self.assertEqual([(payload, app.ActiveDocument)], wire_calls) + + def test_load_exchange_payload_keeps_wire_entries(self): + _install_fake_modules() + sys.modules.pop("ExchangeBootstrap", None) + bootstrap = importlib.import_module("ExchangeBootstrap") + payload = { + "schema_version": "1.2", + "project_uuid": "project-1", + "devices": [], + "terminals": [], + "device_models": [], + "wires": [ + { + "wire_id": "wire-1", + "wire_mark": "W001", + "start_terminal_uuid": "terminal-a", + "end_terminal_uuid": "terminal-b", + } + ], + } + + with tempfile.TemporaryDirectory() as temp_dir: + path = Path(temp_dir) / "2d_to_3d.json" + path.write_text(json.dumps(payload), encoding="utf-8") + normalized = bootstrap.load_exchange_payload(str(path)) + + self.assertEqual(1, len(normalized["wires"])) + self.assertEqual("wire-1", normalized["wires"][0]["wire_id"]) + self.assertEqual("W001", normalized["wires"][0]["wire_mark"]) + if __name__ == "__main__": unittest.main() diff --git a/tests/python/freecad_exchange_manual_wiring_panel_test.py b/tests/python/freecad_exchange_manual_wiring_panel_test.py index 3d54ea2..585c500 100644 --- a/tests/python/freecad_exchange_manual_wiring_panel_test.py +++ b/tests/python/freecad_exchange_manual_wiring_panel_test.py @@ -247,6 +247,39 @@ class ManualWiringPanelTest(unittest.TestCase): ), ) + def test_controller_deletes_last_waypoint_and_preview_point(self): + selection_state = _install_fake_freecad() + terminal_objects, panel = _reload_modules() + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + app.ActiveDocument = doc + terminal_objects.ensure_root_group(doc, "project-1") + + controller = panel.ManualWiringController(terminal_exit_length=10.0) + for point in [app.Vector(10, 20, 30), app.Vector(40, 50, 60)]: + selection_state["selection_ex"] = [ + types.SimpleNamespace( + PickedPoints=[point], + SubObjects=[], + SubElementNames=[], + Object=types.SimpleNamespace(Name="CabinetFace", Label="柜体面"), + ) + ] + controller.add_waypoint_from_selection() + + preview_group = doc.getObject("QETWiring_03_Previews") + self.assertEqual(2, len(controller.waypoints)) + self.assertEqual(2, len(controller.preview_objects)) + self.assertEqual(2, len(preview_group.Group)) + + removed = controller.delete_last_waypoint() + + self.assertIsNotNone(removed) + self.assertEqual(1, len(controller.waypoints)) + self.assertEqual(1, len(controller.preview_objects)) + self.assertEqual(1, len(preview_group.Group)) + def test_controller_generates_direct_wire_from_waypoint_and_end_selection(self): selection_state = _install_fake_freecad() terminal_objects, panel = _reload_modules() @@ -331,6 +364,89 @@ class ManualWiringPanelTest(unittest.TestCase): self.assertEqual((9.0, 8.0, 17.0), (wire.Points[3].x, wire.Points[3].y, wire.Points[3].z)) self.assertEqual((9.0, 8.0, 7.0), (wire.Points[4].x, wire.Points[4].y, wire.Points[4].z)) + def test_controller_generates_wire_from_selected_task(self): + selection_state = _install_fake_freecad() + terminal_objects, panel = _reload_modules() + wiring_objects = importlib.import_module("WiringObjects") + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + app.ActiveDocument = doc + root = terminal_objects.ensure_root_group(doc, "project-1") + device = doc.addObject("App::DocumentObjectGroup", "QETDevice_device_a") + root.addObject(device) + terminal_objects.ensure_string_property(device, "QetElementUuid", "QET Exchange", "", "device-a") + terminal_objects.ensure_string_property(device, "QetInstanceId", "QET Exchange", "", "instance-a") + + start_terminal = doc.addObject("Part::LocalCoordinateSystem", "TerminalStart") + start_terminal.Placement = app.Placement( + app.Vector(1, 2, 3), + app.Rotation(w_axis=app.Vector(0, 1, 0)), + ) + device.addObject(start_terminal) + terminal_objects.set_terminal_semantics( + start_terminal, + "project-1", + "device-a", + "terminal-start", + "instance-a", + label="Start", + ) + + end_terminal = doc.addObject("Part::LocalCoordinateSystem", "TerminalEnd") + end_terminal.Placement = app.Placement( + app.Vector(9, 8, 7), + app.Rotation(w_axis=app.Vector(0, 0, 1)), + ) + device.addObject(end_terminal) + terminal_objects.set_terminal_semantics( + end_terminal, + "project-1", + "device-a", + "terminal-end", + "instance-a", + label="End", + ) + + task = wiring_objects.create_wire_task( + doc, + "project-1", + "wire-1", + "W001", + "terminal-start", + "terminal-end", + "instance-a", + "instance-a", + net_uuid="net-1", + group_uuid="group-1", + wire_mark="W001", + wire_mark_is_manual=True, + ) + + controller = panel.ManualWiringController(terminal_exit_length=10.0) + controller.set_task_from_object(task) + selection_state["selection_ex"] = [ + types.SimpleNamespace( + PickedPoints=[app.Vector(10, 20, 30)], + SubObjects=[], + SubElementNames=[], + Object=types.SimpleNamespace(Name="CabinetFace", Label="柜体面"), + ) + ] + controller.add_waypoint_from_selection() + selection_state["selection"] = [] + + wire = controller.set_end_from_selection_and_generate() + + self.assertEqual("wire-1", getattr(wire, "QetWireUuid", "")) + self.assertEqual("net-1", getattr(wire, "QetNetUuid", "")) + self.assertEqual("group-1", getattr(wire, "QetGroupUuid", "")) + self.assertEqual("W001", getattr(wire, "QetWireMark", "")) + self.assertTrue(getattr(wire, "QetWireMarkIsManual", False)) + self.assertEqual("terminal-start", getattr(wire, "QetStartTerminalUuid", "")) + self.assertEqual("terminal-end", getattr(wire, "QetEndTerminalUuid", "")) + self.assertEqual("Routed", getattr(task, "RouteStatus", "")) + if __name__ == "__main__": unittest.main() diff --git a/tests/python/freecad_exchange_wiring_import_test.py b/tests/python/freecad_exchange_wiring_import_test.py new file mode 100644 index 0000000..967af66 --- /dev/null +++ b/tests/python/freecad_exchange_wiring_import_test.py @@ -0,0 +1,159 @@ +import importlib +import sys +import types +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[2] +MODULE_DIR = REPO_ROOT / "src" / "Mod" / "FreeCADExchange" +if str(MODULE_DIR) not in sys.path: + sys.path.insert(0, str(MODULE_DIR)) + + +def _install_fake_freecad(): + class Vector: + def __init__(self, x=0.0, y=0.0, z=0.0): + self.x = float(x) + self.y = float(y) + self.z = float(z) + + class Rotation: + pass + + class Placement: + def __init__(self, base=None, rotation=None): + self.Base = base or Vector() + self.Rotation = rotation or Rotation() + + fake_freecad = types.ModuleType("FreeCAD") + fake_freecad.Vector = Vector + fake_freecad.Rotation = Rotation + fake_freecad.Placement = Placement + fake_freecad.ActiveDocument = None + fake_freecad.Console = types.SimpleNamespace( + PrintMessage=lambda *args, **kwargs: None, + PrintWarning=lambda *args, **kwargs: None, + PrintError=lambda *args, **kwargs: None, + ) + sys.modules["FreeCAD"] = fake_freecad + + +class FakeViewObject: + def __init__(self): + self.Visibility = True + + +class FakeObject: + def __init__(self, name, type_id): + self.Name = name + self.Label = name + self.TypeId = type_id + self.PropertiesList = [] + self.Group = [] + self.ViewObject = FakeViewObject() + self.InList = [] + + def isDerivedFrom(self, type_name): + return self.TypeId == type_name + + def addProperty(self, prop_type, prop_name, group_name, description): + if prop_name not in self.PropertiesList: + self.PropertiesList.append(prop_name) + + def addObject(self, child): + if child not in self.Group: + self.Group.append(child) + if self not in child.InList: + child.InList.append(self) + + +class FakeDocument: + def __init__(self): + self.Objects = [] + self.Name = "FakeDoc" + + def addObject(self, type_name, name): + obj = FakeObject(name, type_name) + self.Objects.append(obj) + return obj + + def getObject(self, name): + for obj in self.Objects: + if obj.Name == name: + return obj + return None + + +def _reload_modules(): + for name in ["TerminalObjects", "WiringObjects", "WiringImport"]: + sys.modules.pop(name, None) + terminal_objects = importlib.import_module("TerminalObjects") + wiring_objects = importlib.import_module("WiringObjects") + wiring_import = importlib.import_module("WiringImport") + return terminal_objects, wiring_objects, wiring_import + + +class WiringImportTest(unittest.TestCase): + def test_import_wire_tasks_creates_and_updates_qet_tasks(self): + _install_fake_freecad() + terminal_objects, _wiring_objects, wiring_import = _reload_modules() + + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + payload = { + "project_uuid": "project-1", + "wires": [ + { + "wire_id": "wire-1", + "net_uuid": "net-1", + "group_uuid": "group-1", + "wire_mark": "W001", + "wire_mark_is_manual": True, + "start_element_uuid": "device-a", + "start_terminal_uuid": "terminal-a", + "end_element_uuid": "device-b", + "end_terminal_uuid": "terminal-b", + "start_terminal_display": "A1", + "end_terminal_display": "B1", + "conductor_uuids": ["conductor-1"], + } + ], + } + + report = wiring_import.import_wire_tasks_from_payload(payload, doc) + + task_group = doc.getObject("QETWiring_01_Tasks") + self.assertIsNotNone(task_group) + self.assertEqual(1, report["imported_tasks"]) + self.assertEqual(0, report["updated_tasks"]) + self.assertEqual(1, len(task_group.Group)) + task = task_group.Group[0] + self.assertEqual("wire-1", task.QetWireUuid) + self.assertEqual("net-1", task.QetNetUuid) + self.assertEqual("group-1", task.QetGroupUuid) + self.assertEqual("W001", task.QetWireMark) + self.assertTrue(task.QetWireMarkIsManual) + self.assertEqual("terminal-a", task.QetStartTerminalUuid) + self.assertEqual("terminal-b", task.QetEndTerminalUuid) + self.assertEqual("device-a", task.QetStartElementUuid) + self.assertEqual("device-b", task.QetEndElementUuid) + self.assertEqual("A1", task.QetStartTerminalDisplay) + self.assertEqual("B1", task.QetEndTerminalDisplay) + self.assertIn("conductor-1", task.QetConductorUuidsJson) + self.assertEqual("Task", task.RouteType) + self.assertEqual("Task", task.RouteStatus) + + payload["wires"][0]["wire_mark"] = "W001-updated" + task.RouteStatus = "Routed" + second_report = wiring_import.import_wire_tasks_from_payload(payload, doc) + + self.assertEqual(0, second_report["imported_tasks"]) + self.assertEqual(1, second_report["updated_tasks"]) + self.assertEqual(1, len(task_group.Group)) + self.assertEqual("W001-updated", task_group.Group[0].QetWireMark) + self.assertEqual("Routed", task_group.Group[0].RouteStatus) + + +if __name__ == "__main__": + unittest.main()