feat: 支持导线任务导入与手动布线

dev
Zhaowenlong 1 day ago
parent 58c9eae33c
commit 44c2e448b5

@ -134,13 +134,13 @@ STEP / STP / STE 适合作为模板制作的输入,不建议作为长期带电
1. 正式方式:使用 FCStd 模板,在模板里提前放好 LCS 端子对象。
2. 过渡方式STEP + sidecar JSON在同目录下保存端子槽位坐标。
3. 验证方式:没有模板语义时,临时使用 bbox fallback 生成端子位置。
3. 如果没有模板语义,则跳过端子生成并输出警告,不再用 bbox fallback 猜端子位置。
sidecar 只作为 FreeCAD 端模板辅助文件,不进入第一版数据库绑定主键。
sidecar 里除了端子坐标,还可以继续补端子朝向,例如 `rotation`,让模板端子不只是“有位置”,还可以“有方向”。
FCStd 模板里的 LCS 如果已经带了 Placement 朝向,导入时也要一并保留,这样端子不只是有坐标,还能保留真实出线方向。
FCStd 模板里的 LCS 如果已经带了 Placement 朝向,导入时要把位置和朝向一并读取出来,这样生成的工程端子不只是有坐标,还能保留真实出线方向。
### 3.1 FCStd 设备模板制作约定
@ -172,6 +172,22 @@ FCStd 设备模板用于解决“这个模型本身就带端子语义”的问
这些工程级字段由导入项目时的 `2d_to_3d.json` 和 FreeCADExchange 运行时补齐。这样同一个 FCStd 设备模板才能在多个工程、多台机器、多人之间复用。
### 3.1.1 FCStd 导入运行时约定
FCStd 模板中的端子 LCS 是“模板槽位定义”,不是最终留在工程场景里的端子对象。
导入到 `QETScene`FreeCADExchange 的处理顺序是:
1. 复制 FCStd 中的几何对象到设备组。
2. 读取模板 LCS 的 `QetTemplateSlotName / Label / Placement`
3. 将槽位位置和朝向序列化到设备组属性 `QetTemplateSlotsJson`
4. 删除复制出来的模板 LCS 及其 `OriginFeatures`,避免这些模板对象留在 `App::Part` 中参与变换或选择。
5. 后续由 `TerminalImport.py``terminal_uuid``QETTerminals_*` 下创建工程端子 LCS。
这样做不会改变第一版数据库约束:`QetTemplateSlotsJson` 只是 FreeCAD 文档内部的运行时缓存,不进入 `project_2d3d_symbol_binding``project_2d3d_terminal_binding`
如果同一个设备从 FCStd 切回 STEP / IGES / BREP 等非 FCStd 模型,导入时会清空旧的 `QetTemplateSlotsJson`,避免继续沿用上一次 FCStd 的端子槽位。
### 3.2 模板制作工具目标
后续在 `FreeCADExchange` 中新增模板制作能力,目标是让用户不需要手工给 LCS 添加属性:
@ -416,11 +432,11 @@ ManualWiring.py
端子位置策略:
1. 如果 FCStd 模板已有 `Role="Terminal"` 的 LCS则优先复用模板 LCS。
1. 如果 FCStd 模板已有 `Role="Terminal"` 的 LCS则优先读取模板 LCS 的槽位信息,再生成工程端子 LCS。
2. 如果有 sidecar JSON则按 sidecar 坐标创建。
3. 如果只有 STEP,则先按设备包围盒生成临时端子排列,用于打通流程
3. 如果只有 STEP 且没有 sidecar则不生成端子并在导入报告中提示缺少模板槽位
临时端子排列只用于第一版验证,不作为长期物理端子定义
第一版不再按设备包围盒猜测临时端子,避免把错误端子位置保存进工程
验收:
@ -598,4 +614,6 @@ ManualWiring.py
- 2026-05-20修复 `TemplateAuthoring.py``FreeCADCmd.exe` 命令行模式下导入时误注册 GUI 命令的问题;已在运行目录验证创建 `P1` 模板端子、保存 `.FCStd`、重新打开后端子语义仍可识别。
- 2026-05-20确定方案 2 开发目标:新增“设备模板端子制作”任务面板,让 CAD 工作人员通过输入端子名、选择模型位置、点击按钮完成添加端子、校验端子和保存 FCStd不再依赖 Python 控制台。
- 2026-05-20新增 `TemplateAuthoringPanel.py`,提供“设备模板端子制作”任务面板和 `QET_Template_OpenAuthoringPanel` 命令;面板支持输入端子名、添加端子、校验端子、保存 FCStd并已同步到运行目录验证模块可导入。
- 2026-05-25新增 `WiringImport.py`,把 `2d_to_3d.json` 中的 `wires` 导入为 `QETWiring_01_Tasks` 下的导线任务;`ExchangeBootstrap.py` 已接入启动导入流程。`ManualWiringPanel.py` 增加任务列表、选择导线任务和删除最后折点,按任务生成导线时会把 `wire_id / net_uuid / group_uuid / wire_mark` 写入正式导线对象,并把任务状态更新为 `Routed`。已通过 35 项 `freecad_exchange*_test.py` 单元测试,并安装到 `D:\fc\run-FreeCAD-1.1.1` 运行目录验证 `WiringImport / ManualWiring / ManualWiringPanel / WiringObjects` 可导入。
- 2026-05-25修复 FCStd 设备导入后模板 LCS 留在工程场景的问题;导入时会把模板槽位位置和朝向缓存到设备组 `QetTemplateSlotsJson`,随后删除模板 LCS 及其 `OriginFeatures`,工程端子仍按 `terminal_uuid` 生成到 `QETTerminals_*`。已补单元测试验证 FCStd 导入不保留模板 LCS、切回 STEP 会清空旧槽位缓存,并避免重复访问已删除对象的 `Group / InList / Name`
```

@ -12,6 +12,7 @@ set(FreeCADExchange_Scripts
TemplateInstantiation.py
TerminalImport.py
WiringObjects.py
WiringImport.py
ExchangeWriteBack.py
ManualWiring.py
ManualWiringPanel.py

@ -19,6 +19,10 @@ try:
import WiringObjects
except Exception:
WiringObjects = None
try:
import WiringImport
except Exception:
WiringImport = None
try:
from PySide6 import QtCore, QtWidgets
@ -37,6 +41,7 @@ STATE_PAYLOAD = "_qet_exchange_payload"
STATE_SUMMARY = "_qet_exchange_summary"
STATE_IMPORT_REPORT = "_qet_exchange_import_report"
STATE_TERMINAL_IMPORT_REPORT = "_qet_exchange_terminal_import_report"
STATE_WIRING_IMPORT_REPORT = "_qet_exchange_wiring_import_report"
STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report"
STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled"
STATE_TREE_FILTER = "_qet_exchange_tree_filter"
@ -371,6 +376,81 @@ def _normalize_terminals(payload):
return normalized
def _optional_string(item, field_name, entry_label):
value = item.get(field_name, "")
if value is None:
return ""
if value and not isinstance(value, str):
raise ExchangeValidationError(
"Field '{0}' in {1} must be a string.".format(field_name, entry_label)
)
return value.strip() if isinstance(value, str) else ""
def _normalize_conductor_uuids(item, entry_label):
values = item.get("conductor_uuids", [])
if values is None:
return []
if not isinstance(values, list):
raise ExchangeValidationError(
"Field 'conductor_uuids' in {0} must be a list.".format(entry_label)
)
normalized = []
for conductor_index, value in enumerate(values):
if not isinstance(value, str):
raise ExchangeValidationError(
"Field 'conductor_uuids[{0}]' in {1} must be a string.".format(
conductor_index,
entry_label,
)
)
value = value.strip()
if value:
normalized.append(value)
return normalized
def _normalize_wires(payload):
wires = payload.get("wires", [])
if wires is None:
return []
if not isinstance(wires, list):
raise ExchangeValidationError("Field 'wires' must be a list.")
normalized = []
for index, item in enumerate(wires):
entry_label = "wire entry #{0}".format(index)
if not isinstance(item, dict):
raise ExchangeValidationError(
"Wire entry #{0} must be an object.".format(index)
)
wire_id = _require_string(item, "wire_id")
wire_mark_is_manual = item.get("wire_mark_is_manual", False)
if not isinstance(wire_mark_is_manual, bool):
raise ExchangeValidationError(
"Field 'wire_mark_is_manual' in {0} must be a bool.".format(
entry_label
)
)
normalized.append(
{
"wire_id": wire_id,
"net_uuid": _optional_string(item, "net_uuid", entry_label),
"group_uuid": _optional_string(item, "group_uuid", entry_label),
"wire_mark": _optional_string(item, "wire_mark", entry_label),
"wire_mark_is_manual": wire_mark_is_manual,
"start_element_uuid": _optional_string(item, "start_element_uuid", entry_label),
"start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label),
"end_element_uuid": _optional_string(item, "end_element_uuid", entry_label),
"end_terminal_uuid": _optional_string(item, "end_terminal_uuid", entry_label),
"start_terminal_display": _optional_string(item, "start_terminal_display", entry_label),
"end_terminal_display": _optional_string(item, "end_terminal_display", entry_label),
"conductor_uuids": _normalize_conductor_uuids(item, entry_label),
}
)
return normalized
def _normalize_device_models(payload):
models = payload.get("device_models", [])
if not isinstance(models, list):
@ -490,6 +570,7 @@ def load_exchange_payload(json_path):
"devices": _normalize_devices(payload),
"terminals": _normalize_terminals(payload),
"device_models": _normalize_device_models(payload),
"wires": _normalize_wires(payload),
}
return normalized
@ -498,6 +579,7 @@ def _build_summary(payload, json_path):
devices = payload["devices"]
terminals = payload["terminals"]
device_models = payload["device_models"]
wires = payload.get("wires", [])
cabinet = payload.get("cabinet")
missing_device_instances = sum(1 for item in devices if not item["instance_id"])
missing_terminal_instances = sum(
@ -512,6 +594,7 @@ def _build_summary(payload, json_path):
"project_uuid": payload["project_uuid"],
"device_count": len(devices),
"terminal_count": len(terminals),
"wire_count": len(wires),
"device_model_count": len(device_models),
"device_models_with_parts": with_model_paths,
"missing_device_instances": missing_device_instances,
@ -537,7 +620,23 @@ def _initialize_wiring_scene(payload):
return None
def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None):
def _import_wiring_tasks(payload):
if WiringImport is None:
_append_debug_log("wire task import skipped: WiringImport module unavailable")
return None
try:
return WiringImport.import_wire_tasks_from_payload(
payload,
App.ActiveDocument,
)
except Exception as exc:
_append_debug_log("wire task import failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
return None
def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None):
lines = [
"QET exchange file loaded successfully.",
"",
@ -545,6 +644,7 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
"Exchange file: {0}".format(summary["json_path"]),
"Devices: {0}".format(summary["device_count"]),
"Terminals: {0}".format(summary["terminal_count"]),
"Wires: {0}".format(summary["wire_count"]),
"Device models: {0}".format(summary["device_model_count"]),
"Resolved model paths: {0}".format(summary["device_models_with_parts"]),
]
@ -662,6 +762,22 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
]
)
if wiring_report:
lines.extend(
[
"",
"3D wire tasks:",
"Imported tasks: {0}".format(wiring_report.get("imported_tasks", 0)),
"Updated tasks: {0}".format(wiring_report.get("updated_tasks", 0)),
]
)
warnings = wiring_report.get("warnings", [])
if warnings:
lines.append("Wire task warnings:")
lines.extend("- {0}".format(item) for item in warnings[:10])
if len(warnings) > 10:
lines.append("- ... ({0} more)".format(len(warnings) - 10))
lines.append("")
lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.")
lines.append("3D terminal import and write-back are enabled.")
@ -780,6 +896,9 @@ def _run_scheduled_device_import(attempt=0):
setattr(App, STATE_TERMINAL_IMPORT_REPORT, terminal_report)
_initialize_wiring_scene(payload)
wiring_report = _import_wiring_tasks(payload)
if wiring_report is not None:
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
if ExchangeWriteBack is None:
_append_debug_log("write-back skipped: ExchangeWriteBack module unavailable")
@ -806,7 +925,7 @@ def _run_scheduled_device_import(attempt=0):
_show_info(
"QET Exchange",
_summary_message(summary, import_report, terminal_report, writeback_report),
_summary_message(summary, import_report, terminal_report, writeback_report, wiring_report),
)
_append_debug_log("summary dialog shown")

@ -119,6 +119,55 @@ def _selected_terminal():
return None
def _is_wire_task_object(obj):
if obj is None:
return False
return (
"QetWireUuid" in getattr(obj, "PropertiesList", [])
and (getattr(obj, "RouteType", "") or "").strip() == "Task"
)
def _task_label(obj):
if obj is None:
return "未选择"
label = (getattr(obj, "Label", "") or "").strip()
if label:
return label
return (
getattr(obj, "QetWireMark", "")
or getattr(obj, "QetWireLabel", "")
or getattr(obj, "QetWireUuid", "")
or getattr(obj, "Name", "")
or "导线任务"
)
def _iter_terminal_objects(doc):
root = None
try:
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
except Exception:
root = None
if root is not None:
return TerminalObjects.collect_terminal_objects(root)
return [
obj
for obj in list(getattr(doc, "Objects", []) or [])
if TerminalObjects.is_terminal_object(obj)
]
def _find_terminal_by_uuid(doc, terminal_uuid):
target = (terminal_uuid or "").strip()
if not target:
return None
for terminal in _iter_terminal_objects(doc):
if getattr(terminal, "QetTerminalUuid", "").strip() == target:
return terminal
return None
def _terminal_label(obj):
return (
getattr(obj, "Label", "")
@ -267,9 +316,66 @@ def _create_preview_point(doc, waypoint, index):
return preview
def _remove_preview_object(doc, preview):
if preview is None or doc is None:
return
group = None
try:
group = doc.getObject("QETWiring_03_Previews")
except Exception:
group = None
try:
if group is not None and preview in getattr(group, "Group", []):
if hasattr(group, "removeObject"):
group.removeObject(preview)
else:
group.Group = [obj for obj in group.Group if obj is not preview]
except Exception:
pass
try:
if doc.getObject(getattr(preview, "Name", "")) is not None:
doc.removeObject(preview.Name)
except Exception:
pass
def _set_task_route_status(task, status):
if task is None:
return
try:
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
status,
)
except Exception:
try:
task.RouteStatus = status
except Exception:
pass
def _task_wire_kwargs(task):
if task is None:
return {}
return {
"wire_uuid": getattr(task, "QetWireUuid", "").strip(),
"wire_label": getattr(task, "QetWireLabel", "").strip(),
"net_uuid": getattr(task, "QetNetUuid", "").strip(),
"group_uuid": getattr(task, "QetGroupUuid", "").strip(),
"wire_mark": getattr(task, "QetWireMark", "").strip(),
"wire_mark_is_manual": bool(getattr(task, "QetWireMarkIsManual", False)),
}
class ManualWiringController:
def __init__(self, terminal_exit_length=DEFAULT_TERMINAL_EXIT_LENGTH):
self.terminal_exit_length = float(terminal_exit_length or 0.0)
self.current_task = None
self.start_terminal = None
self.waypoints = []
self.preview_objects = []
@ -281,34 +387,52 @@ class ManualWiringController:
self.preview_objects = []
return
group = None
try:
group = doc.getObject("QETWiring_03_Previews")
except Exception:
group = None
for preview in list(self.preview_objects):
try:
if group is not None and preview in getattr(group, "Group", []):
if hasattr(group, "removeObject"):
group.removeObject(preview)
else:
group.Group = [obj for obj in group.Group if obj is not preview]
except Exception:
pass
try:
if doc.getObject(getattr(preview, "Name", "")) is not None:
doc.removeObject(preview.Name)
except Exception:
pass
_remove_preview_object(doc, preview)
self.preview_objects = []
def _reset_route_state(self):
self.current_task = None
self.start_terminal = None
self.waypoints = []
self.last_wire = None
self._clear_preview_objects()
def available_tasks(self):
doc = _active_document()
try:
group = doc.getObject("QETWiring_01_Tasks")
except Exception:
group = None
if group is None:
return []
return [
obj
for obj in list(getattr(group, "Group", []) or [])
if _is_wire_task_object(obj)
]
def set_task_from_object(self, task):
if not _is_wire_task_object(task):
raise ManualWiringPanelError("请选择一个 QET 导线任务。")
doc = _active_document()
start_terminal = _find_terminal_by_uuid(
doc,
getattr(task, "QetStartTerminalUuid", ""),
)
end_terminal = _find_terminal_by_uuid(
doc,
getattr(task, "QetEndTerminalUuid", ""),
)
if start_terminal is None or end_terminal is None:
raise ManualWiringPanelError("导线任务的起点或终点工程端子未找到。")
self._reset_route_state()
self.current_task = task
self.start_terminal = start_terminal
return task
def set_start_from_selection(self):
terminal = _selected_terminal()
if terminal is None:
@ -327,21 +451,42 @@ class ManualWiringController:
self.preview_objects.append(preview)
return waypoint
def delete_last_waypoint(self):
if not self.waypoints:
return None
waypoint = self.waypoints.pop()
preview = self.preview_objects.pop() if self.preview_objects else None
_remove_preview_object(getattr(App, "ActiveDocument", None), preview)
return waypoint
def set_end_from_selection_and_generate(self):
doc = _active_document()
if self.start_terminal is None:
raise ManualWiringPanelError("请先设置起点端子。")
end_terminal = _selected_terminal()
wire_kwargs = {}
if self.current_task is not None:
end_terminal = _find_terminal_by_uuid(
doc,
getattr(self.current_task, "QetEndTerminalUuid", ""),
)
wire_kwargs = _task_wire_kwargs(self.current_task)
else:
end_terminal = _selected_terminal()
if end_terminal is None:
raise ManualWiringPanelError("请先选择一个工程端子,再点击“设为终点并生成”。")
wire = ManualWiring.create_manual_wire(
doc,
self.start_terminal,
end_terminal,
waypoints=list(self.waypoints),
terminal_exit_length=self.terminal_exit_length,
**wire_kwargs,
)
self.last_wire = wire
if self.current_task is not None:
_set_task_route_status(self.current_task, "Routed")
return wire
def clear(self):
@ -364,6 +509,9 @@ class ManualWiringController:
start_text = "未设置"
if self.start_terminal is not None:
start_text = _terminal_label(self.start_terminal)
task_text = "未选择"
if self.current_task is not None:
task_text = _task_label(self.current_task)
wire_text = "未生成"
if self.last_wire is not None:
wire_text = getattr(self.last_wire, "Label", "") or getattr(self.last_wire, "Name", "")
@ -375,7 +523,8 @@ class ManualWiringController:
)
if len(self.waypoints) > 3:
waypoint_text += "..."
return "起点:{0};折点:{1} 个;最近导线:{2};折点明细:{3}".format(
return "任务:{0};起点:{1};折点:{2} 个;最近导线:{3};折点明细:{4}".format(
task_text,
start_text,
len(self.waypoints),
wire_text,
@ -394,14 +543,22 @@ class ManualWiringTaskPanel:
layout = QtWidgets.QVBoxLayout(self.form)
self.task_list = QtWidgets.QListWidget()
self.use_task_button = QtWidgets.QPushButton("选择导线任务")
self.reload_tasks_button = QtWidgets.QPushButton("刷新任务")
self.start_button = QtWidgets.QPushButton("设为起点")
self.waypoint_button = QtWidgets.QPushButton("添加折点")
self.delete_waypoint_button = QtWidgets.QPushButton("删除最后折点")
self.end_button = QtWidgets.QPushButton("设为终点并生成")
self.clear_button = QtWidgets.QPushButton("清除草稿")
self.save_button = QtWidgets.QPushButton("保存并回写")
layout.addWidget(self.task_list)
layout.addWidget(self.use_task_button)
layout.addWidget(self.reload_tasks_button)
layout.addWidget(self.start_button)
layout.addWidget(self.waypoint_button)
layout.addWidget(self.delete_waypoint_button)
layout.addWidget(self.end_button)
layout.addWidget(self.clear_button)
layout.addWidget(self.save_button)
@ -413,12 +570,17 @@ class ManualWiringTaskPanel:
self.status_label.setWordWrap(True)
layout.addWidget(self.status_label)
self.task_objects = []
self.use_task_button.clicked.connect(self.use_selected_task)
self.reload_tasks_button.clicked.connect(self._refresh_task_list)
self.start_button.clicked.connect(self.set_start)
self.waypoint_button.clicked.connect(self.add_waypoint)
self.delete_waypoint_button.clicked.connect(self.delete_last_waypoint)
self.end_button.clicked.connect(self.set_end_and_generate)
self.clear_button.clicked.connect(self.clear)
self.save_button.clicked.connect(self.save_and_write_back)
self._refresh_task_list()
self._refresh_waypoint_list()
self._set_status(self.controller.state_text())
@ -446,6 +608,34 @@ class ManualWiringTaskPanel:
)
)
def _refresh_task_list(self):
self.task_list.clear()
self.task_objects = []
try:
self.task_objects = self.controller.available_tasks()
except Exception as exc:
self._set_error(str(exc))
return
for task in self.task_objects:
self.task_list.addItem(_task_label(task))
def _selected_task_from_list(self):
row = self.task_list.currentRow()
if row < 0 or row >= len(self.task_objects):
return None
return self.task_objects[row]
def use_selected_task(self):
try:
task = self._selected_task_from_list()
if task is None:
raise ManualWiringPanelError("请先在任务列表中选择一条导线任务。")
self.controller.set_task_from_object(task)
self._refresh_waypoint_list()
self._set_status("已选择导线任务:{0}".format(_task_label(task)))
except Exception as exc:
self._set_error(str(exc))
def set_start(self):
try:
terminal = self.controller.set_start_from_selection()
@ -462,9 +652,21 @@ class ManualWiringTaskPanel:
except Exception as exc:
self._set_error(str(exc))
def delete_last_waypoint(self):
try:
removed = self.controller.delete_last_waypoint()
self._refresh_waypoint_list()
if removed is None:
self._set_status("当前没有可删除的折点。")
else:
self._set_status(self.controller.state_text())
except Exception as exc:
self._set_error(str(exc))
def set_end_and_generate(self):
try:
wire = self.controller.set_end_from_selection_and_generate()
self._refresh_task_list()
self._set_status("已生成导线:{0}".format(getattr(wire, "Name", "")))
try:
if Gui is not None:

@ -0,0 +1,208 @@
# FreeCADExchange wire task import helpers.
import json
import FreeCAD as App
import TerminalObjects
import WiringObjects
class WiringImportError(RuntimeError):
pass
def _string_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
if not isinstance(value, str):
return str(value).strip()
return value.strip()
def _bool_value(item, field_name):
return bool(item.get(field_name, False))
def _conductor_uuids(item):
values = item.get("conductor_uuids", [])
if not isinstance(values, list):
return []
return [str(value).strip() for value in values if str(value).strip()]
def _normalize_wire_entry(item, index):
if not isinstance(item, dict):
raise WiringImportError("Wire entry #{0} must be an object.".format(index))
wire_uuid = (
_string_value(item, "wire_id")
or _string_value(item, "wire_uuid")
or _string_value(item, "id")
)
start_terminal_uuid = _string_value(item, "start_terminal_uuid")
end_terminal_uuid = _string_value(item, "end_terminal_uuid")
if not wire_uuid:
raise WiringImportError("Wire entry #{0} is missing wire_id.".format(index))
if not start_terminal_uuid or not end_terminal_uuid:
raise WiringImportError(
"Wire {0} is missing start_terminal_uuid or end_terminal_uuid.".format(
wire_uuid
)
)
wire_mark = _string_value(item, "wire_mark")
wire_label = _string_value(item, "wire_label") or wire_mark or wire_uuid
return {
"wire_uuid": wire_uuid,
"wire_label": wire_label,
"net_uuid": _string_value(item, "net_uuid"),
"group_uuid": _string_value(item, "group_uuid"),
"wire_mark": wire_mark,
"wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"),
"start_element_uuid": _string_value(item, "start_element_uuid"),
"start_terminal_uuid": start_terminal_uuid,
"start_instance_id": _string_value(item, "start_instance_id"),
"start_terminal_display": _string_value(item, "start_terminal_display"),
"end_element_uuid": _string_value(item, "end_element_uuid"),
"end_terminal_uuid": end_terminal_uuid,
"end_instance_id": _string_value(item, "end_instance_id"),
"end_terminal_display": _string_value(item, "end_terminal_display"),
"conductor_uuids": _conductor_uuids(item),
}
def _unique_object_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _task_label(entry):
mark = entry["wire_mark"] or entry["wire_label"] or entry["wire_uuid"]
start = entry["start_terminal_display"] or entry["start_terminal_uuid"]
end = entry["end_terminal_display"] or entry["end_terminal_uuid"]
return "{0} {1} -> {2}".format(mark, start, end)
def _find_task_by_wire_uuid(task_group, wire_uuid):
target = (wire_uuid or "").strip()
if not target:
return None
for candidate in list(getattr(task_group, "Group", []) or []):
if getattr(candidate, "QetWireUuid", "").strip() == target:
return candidate
return None
def _ensure_string_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_string_property(
obj,
prop_name,
"QET Wiring",
description,
value,
)
def _set_task_extra_properties(task, entry):
_ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"])
_ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"])
_ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"])
_ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"])
_ensure_string_property(
task,
"QetConductorUuidsJson",
json.dumps(entry["conductor_uuids"], ensure_ascii=False),
)
def _upsert_wire_task(doc, task_group, project_uuid, entry):
task = _find_task_by_wire_uuid(task_group, entry["wire_uuid"])
created = task is None
previous_status = ""
if task is None:
task = doc.addObject(
"App::DocumentObjectGroup",
_unique_object_name(doc, "QETWireTask_{0}".format(entry["wire_uuid"])),
)
task_group.addObject(task)
else:
previous_status = (getattr(task, "RouteStatus", "") or "").strip()
WiringObjects.set_wire_task_semantics(
task,
project_uuid,
entry["wire_uuid"],
entry["wire_label"],
entry["start_terminal_uuid"],
entry["end_terminal_uuid"],
entry["start_instance_id"],
entry["end_instance_id"],
net_uuid=entry["net_uuid"],
group_uuid=entry["group_uuid"],
wire_mark=entry["wire_mark"],
wire_mark_is_manual=entry["wire_mark_is_manual"],
)
_set_task_extra_properties(task, entry)
if previous_status and previous_status != "Task":
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
previous_status,
)
task.Label = _task_label(entry)
return task, created
def import_wire_tasks_from_payload(payload, doc=None):
if doc is None:
doc = getattr(App, "ActiveDocument", None)
if doc is None:
raise WiringImportError("No active FreeCAD document is available.")
if not isinstance(payload, dict):
raise WiringImportError("Exchange payload must be an object.")
project_uuid = _string_value(payload, "project_uuid")
if not project_uuid:
raise WiringImportError("Field 'project_uuid' is required for wire task import.")
wires = payload.get("wires", [])
if wires is None:
wires = []
if not isinstance(wires, list):
raise WiringImportError("Field 'wires' must be a list.")
task_group = WiringObjects.ensure_task_group(doc, project_uuid)
report = {
"project_uuid": project_uuid,
"total_wires": len(wires),
"imported_tasks": 0,
"updated_tasks": 0,
"skipped_invalid": 0,
"warnings": [],
}
for index, item in enumerate(wires):
try:
entry = _normalize_wire_entry(item, index)
except WiringImportError as exc:
report["skipped_invalid"] += 1
report["warnings"].append(str(exc))
continue
_task, created = _upsert_wire_task(doc, task_group, project_uuid, entry)
if created:
report["imported_tasks"] += 1
else:
report["updated_tasks"] += 1
return report

@ -1,5 +1,7 @@
import importlib
import json
import sys
import tempfile
import types
import unittest
from pathlib import Path
@ -46,6 +48,15 @@ def _install_fake_modules():
fake_wiring.initialize_wiring_scene = lambda doc, project_uuid="": calls.append((doc, project_uuid)) or "root"
sys.modules["WiringObjects"] = fake_wiring
wire_calls = []
fake_wiring_import = types.ModuleType("WiringImport")
fake_wiring_import.WiringImportError = RuntimeError
fake_wiring_import.import_wire_tasks_from_payload = (
lambda payload, doc=None: wire_calls.append((payload, doc))
or {"imported_tasks": 1, "updated_tasks": 0, "warnings": []}
)
sys.modules["WiringImport"] = fake_wiring_import
class _QObject:
def __init__(self, *args, **kwargs):
pass
@ -67,12 +78,12 @@ def _install_fake_modules():
fake_pyside.QtWidgets = fake_qt_widgets
sys.modules["PySide6"] = fake_pyside
return fake_freecad, calls
return fake_freecad, calls, wire_calls
class ExchangeBootstrapWiringTest(unittest.TestCase):
def test_initialize_wiring_scene_uses_active_document_and_project_uuid(self):
app, calls = _install_fake_modules()
app, calls, _wire_calls = _install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
@ -81,6 +92,46 @@ class ExchangeBootstrapWiringTest(unittest.TestCase):
self.assertEqual("root", result)
self.assertEqual([(app.ActiveDocument, "project-1")], calls)
def test_import_wiring_tasks_uses_active_document_and_payload(self):
app, _calls, wire_calls = _install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
payload = {"project_uuid": "project-1", "wires": [{"wire_id": "wire-1"}]}
result = bootstrap._import_wiring_tasks(payload)
self.assertEqual({"imported_tasks": 1, "updated_tasks": 0, "warnings": []}, result)
self.assertEqual([(payload, app.ActiveDocument)], wire_calls)
def test_load_exchange_payload_keeps_wire_entries(self):
_install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
payload = {
"schema_version": "1.2",
"project_uuid": "project-1",
"devices": [],
"terminals": [],
"device_models": [],
"wires": [
{
"wire_id": "wire-1",
"wire_mark": "W001",
"start_terminal_uuid": "terminal-a",
"end_terminal_uuid": "terminal-b",
}
],
}
with tempfile.TemporaryDirectory() as temp_dir:
path = Path(temp_dir) / "2d_to_3d.json"
path.write_text(json.dumps(payload), encoding="utf-8")
normalized = bootstrap.load_exchange_payload(str(path))
self.assertEqual(1, len(normalized["wires"]))
self.assertEqual("wire-1", normalized["wires"][0]["wire_id"])
self.assertEqual("W001", normalized["wires"][0]["wire_mark"])
if __name__ == "__main__":
unittest.main()

@ -247,6 +247,39 @@ class ManualWiringPanelTest(unittest.TestCase):
),
)
def test_controller_deletes_last_waypoint_and_preview_point(self):
selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
app.ActiveDocument = doc
terminal_objects.ensure_root_group(doc, "project-1")
controller = panel.ManualWiringController(terminal_exit_length=10.0)
for point in [app.Vector(10, 20, 30), app.Vector(40, 50, 60)]:
selection_state["selection_ex"] = [
types.SimpleNamespace(
PickedPoints=[point],
SubObjects=[],
SubElementNames=[],
Object=types.SimpleNamespace(Name="CabinetFace", Label="柜体面"),
)
]
controller.add_waypoint_from_selection()
preview_group = doc.getObject("QETWiring_03_Previews")
self.assertEqual(2, len(controller.waypoints))
self.assertEqual(2, len(controller.preview_objects))
self.assertEqual(2, len(preview_group.Group))
removed = controller.delete_last_waypoint()
self.assertIsNotNone(removed)
self.assertEqual(1, len(controller.waypoints))
self.assertEqual(1, len(controller.preview_objects))
self.assertEqual(1, len(preview_group.Group))
def test_controller_generates_direct_wire_from_waypoint_and_end_selection(self):
selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
@ -331,6 +364,89 @@ class ManualWiringPanelTest(unittest.TestCase):
self.assertEqual((9.0, 8.0, 17.0), (wire.Points[3].x, wire.Points[3].y, wire.Points[3].z))
self.assertEqual((9.0, 8.0, 7.0), (wire.Points[4].x, wire.Points[4].y, wire.Points[4].z))
def test_controller_generates_wire_from_selected_task(self):
selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
wiring_objects = importlib.import_module("WiringObjects")
app = sys.modules["FreeCAD"]
doc = FakeDocument()
app.ActiveDocument = doc
root = terminal_objects.ensure_root_group(doc, "project-1")
device = doc.addObject("App::DocumentObjectGroup", "QETDevice_device_a")
root.addObject(device)
terminal_objects.ensure_string_property(device, "QetElementUuid", "QET Exchange", "", "device-a")
terminal_objects.ensure_string_property(device, "QetInstanceId", "QET Exchange", "", "instance-a")
start_terminal = doc.addObject("Part::LocalCoordinateSystem", "TerminalStart")
start_terminal.Placement = app.Placement(
app.Vector(1, 2, 3),
app.Rotation(w_axis=app.Vector(0, 1, 0)),
)
device.addObject(start_terminal)
terminal_objects.set_terminal_semantics(
start_terminal,
"project-1",
"device-a",
"terminal-start",
"instance-a",
label="Start",
)
end_terminal = doc.addObject("Part::LocalCoordinateSystem", "TerminalEnd")
end_terminal.Placement = app.Placement(
app.Vector(9, 8, 7),
app.Rotation(w_axis=app.Vector(0, 0, 1)),
)
device.addObject(end_terminal)
terminal_objects.set_terminal_semantics(
end_terminal,
"project-1",
"device-a",
"terminal-end",
"instance-a",
label="End",
)
task = wiring_objects.create_wire_task(
doc,
"project-1",
"wire-1",
"W001",
"terminal-start",
"terminal-end",
"instance-a",
"instance-a",
net_uuid="net-1",
group_uuid="group-1",
wire_mark="W001",
wire_mark_is_manual=True,
)
controller = panel.ManualWiringController(terminal_exit_length=10.0)
controller.set_task_from_object(task)
selection_state["selection_ex"] = [
types.SimpleNamespace(
PickedPoints=[app.Vector(10, 20, 30)],
SubObjects=[],
SubElementNames=[],
Object=types.SimpleNamespace(Name="CabinetFace", Label="柜体面"),
)
]
controller.add_waypoint_from_selection()
selection_state["selection"] = []
wire = controller.set_end_from_selection_and_generate()
self.assertEqual("wire-1", getattr(wire, "QetWireUuid", ""))
self.assertEqual("net-1", getattr(wire, "QetNetUuid", ""))
self.assertEqual("group-1", getattr(wire, "QetGroupUuid", ""))
self.assertEqual("W001", getattr(wire, "QetWireMark", ""))
self.assertTrue(getattr(wire, "QetWireMarkIsManual", False))
self.assertEqual("terminal-start", getattr(wire, "QetStartTerminalUuid", ""))
self.assertEqual("terminal-end", getattr(wire, "QetEndTerminalUuid", ""))
self.assertEqual("Routed", getattr(task, "RouteStatus", ""))
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,159 @@
import importlib
import sys
import types
import unittest
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
MODULE_DIR = REPO_ROOT / "src" / "Mod" / "FreeCADExchange"
if str(MODULE_DIR) not in sys.path:
sys.path.insert(0, str(MODULE_DIR))
def _install_fake_freecad():
class Vector:
def __init__(self, x=0.0, y=0.0, z=0.0):
self.x = float(x)
self.y = float(y)
self.z = float(z)
class Rotation:
pass
class Placement:
def __init__(self, base=None, rotation=None):
self.Base = base or Vector()
self.Rotation = rotation or Rotation()
fake_freecad = types.ModuleType("FreeCAD")
fake_freecad.Vector = Vector
fake_freecad.Rotation = Rotation
fake_freecad.Placement = Placement
fake_freecad.ActiveDocument = None
fake_freecad.Console = types.SimpleNamespace(
PrintMessage=lambda *args, **kwargs: None,
PrintWarning=lambda *args, **kwargs: None,
PrintError=lambda *args, **kwargs: None,
)
sys.modules["FreeCAD"] = fake_freecad
class FakeViewObject:
def __init__(self):
self.Visibility = True
class FakeObject:
def __init__(self, name, type_id):
self.Name = name
self.Label = name
self.TypeId = type_id
self.PropertiesList = []
self.Group = []
self.ViewObject = FakeViewObject()
self.InList = []
def isDerivedFrom(self, type_name):
return self.TypeId == type_name
def addProperty(self, prop_type, prop_name, group_name, description):
if prop_name not in self.PropertiesList:
self.PropertiesList.append(prop_name)
def addObject(self, child):
if child not in self.Group:
self.Group.append(child)
if self not in child.InList:
child.InList.append(self)
class FakeDocument:
def __init__(self):
self.Objects = []
self.Name = "FakeDoc"
def addObject(self, type_name, name):
obj = FakeObject(name, type_name)
self.Objects.append(obj)
return obj
def getObject(self, name):
for obj in self.Objects:
if obj.Name == name:
return obj
return None
def _reload_modules():
for name in ["TerminalObjects", "WiringObjects", "WiringImport"]:
sys.modules.pop(name, None)
terminal_objects = importlib.import_module("TerminalObjects")
wiring_objects = importlib.import_module("WiringObjects")
wiring_import = importlib.import_module("WiringImport")
return terminal_objects, wiring_objects, wiring_import
class WiringImportTest(unittest.TestCase):
def test_import_wire_tasks_creates_and_updates_qet_tasks(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, wiring_import = _reload_modules()
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-1",
"net_uuid": "net-1",
"group_uuid": "group-1",
"wire_mark": "W001",
"wire_mark_is_manual": True,
"start_element_uuid": "device-a",
"start_terminal_uuid": "terminal-a",
"end_element_uuid": "device-b",
"end_terminal_uuid": "terminal-b",
"start_terminal_display": "A1",
"end_terminal_display": "B1",
"conductor_uuids": ["conductor-1"],
}
],
}
report = wiring_import.import_wire_tasks_from_payload(payload, doc)
task_group = doc.getObject("QETWiring_01_Tasks")
self.assertIsNotNone(task_group)
self.assertEqual(1, report["imported_tasks"])
self.assertEqual(0, report["updated_tasks"])
self.assertEqual(1, len(task_group.Group))
task = task_group.Group[0]
self.assertEqual("wire-1", task.QetWireUuid)
self.assertEqual("net-1", task.QetNetUuid)
self.assertEqual("group-1", task.QetGroupUuid)
self.assertEqual("W001", task.QetWireMark)
self.assertTrue(task.QetWireMarkIsManual)
self.assertEqual("terminal-a", task.QetStartTerminalUuid)
self.assertEqual("terminal-b", task.QetEndTerminalUuid)
self.assertEqual("device-a", task.QetStartElementUuid)
self.assertEqual("device-b", task.QetEndElementUuid)
self.assertEqual("A1", task.QetStartTerminalDisplay)
self.assertEqual("B1", task.QetEndTerminalDisplay)
self.assertIn("conductor-1", task.QetConductorUuidsJson)
self.assertEqual("Task", task.RouteType)
self.assertEqual("Task", task.RouteStatus)
payload["wires"][0]["wire_mark"] = "W001-updated"
task.RouteStatus = "Routed"
second_report = wiring_import.import_wire_tasks_from_payload(payload, doc)
self.assertEqual(0, second_report["imported_tasks"])
self.assertEqual(1, second_report["updated_tasks"])
self.assertEqual(1, len(task_group.Group))
self.assertEqual("W001-updated", task_group.Group[0].QetWireMark)
self.assertEqual("Routed", task_group.Group[0].RouteStatus)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save