From 9422fcc14923a20565226093b684db48a21f5222 Mon Sep 17 00:00:00 2001 From: Zhaowenlong Date: Sat, 30 May 2026 17:42:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=9D=E7=95=99=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=B8=83=E7=BA=BF=E7=AB=AF=E7=82=B9=E5=85=83=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Mod/FreeCADExchange/AutoRouting.py | 87 +++++++++++++++++-- .../freecad_exchange_auto_routing_test.py | 81 +++++++++++++++++ 2 files changed, 163 insertions(+), 5 deletions(-) diff --git a/src/Mod/FreeCADExchange/AutoRouting.py b/src/Mod/FreeCADExchange/AutoRouting.py index 686e27a..c032e51 100644 --- a/src/Mod/FreeCADExchange/AutoRouting.py +++ b/src/Mod/FreeCADExchange/AutoRouting.py @@ -553,9 +553,46 @@ def _set_string(obj, name, value, description="Routing connection property"): TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value) -def _route_payload(route_data, collisions, wire_style_id=""): +def _clean_endpoint_metadata(endpoint_metadata): + if not isinstance(endpoint_metadata, dict): + return {} + allowed = ( + "start_element_uuid", + "start_terminal_display", + "start_device_label", + "end_element_uuid", + "end_terminal_display", + "end_device_label", + "endpoint_label", + ) + cleaned = {} + for key in allowed: + value = str(endpoint_metadata.get(key, "") or "").strip() + if value: + cleaned[key] = value + return cleaned + + +def _set_endpoint_metadata(wire, endpoint_metadata): + metadata = _clean_endpoint_metadata(endpoint_metadata) + property_names = { + "start_element_uuid": "QetStartElementUuid", + "start_terminal_display": "QetStartTerminalDisplay", + "start_device_label": "QetStartDeviceLabel", + "end_element_uuid": "QetEndElementUuid", + "end_terminal_display": "QetEndTerminalDisplay", + "end_device_label": "QetEndDeviceLabel", + "endpoint_label": "QetEndpointLabel", + } + for key, prop_name in property_names.items(): + if key in metadata: + _set_string(wire, prop_name, metadata[key], "QET routed wire endpoint metadata") + return metadata + + +def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=None): points = route_data.get("points", []) - return { + payload = { "algorithm": route_data.get("algorithm", ""), "length_mm": _route_length(points), "wire_style_id": str(wire_style_id or "").strip(), @@ -566,10 +603,15 @@ def _route_payload(route_data, collisions, wire_style_id=""): "network": route_data.get("network", {}), "route_track": route_data.get("route_track", {}), } + metadata = _clean_endpoint_metadata(endpoint_metadata) + if metadata: + payload["endpoint_metadata"] = metadata + return payload -def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=""): +def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id="", endpoint_metadata=None): length_mm = _route_length(route_data.get("points", [])) + cleaned_endpoint_metadata = _set_endpoint_metadata(wire, endpoint_metadata) _set_string( wire, "QetRouteAlgorithm", @@ -591,7 +633,15 @@ def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id _set_string( wire, "QetRouteDiagnosticsJson", - json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False), + json.dumps( + _route_payload( + route_data, + collisions, + wire_style_id=wire_style_id, + endpoint_metadata=cleaned_endpoint_metadata, + ), + ensure_ascii=False, + ), "Routing connection diagnostics", ) if route_data.get("network"): @@ -1024,6 +1074,7 @@ def route_eplan_connection_between_terminals( wire_mark="", wire_mark_is_manual=False, wire_style_id="", + endpoint_metadata=None, ): if doc is None: raise AutoRoutingError("No FreeCAD document is available.") @@ -1096,7 +1147,13 @@ def route_eplan_connection_between_terminals( wire_mark=wire_mark, wire_mark_is_manual=wire_mark_is_manual, ) - _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id) + _set_routing_connection_metadata( + wire, + route_data, + collisions, + wire_style_id=effective_wire_style_id, + endpoint_metadata=endpoint_metadata, + ) routed_group = WiringObjects.ensure_routed_group(doc, project_uuid) if wire not in getattr(routed_group, "Group", []): @@ -1282,6 +1339,15 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la lane_key = _route_lane_key(start_uuid, end_uuid) route_lane_index = lane_indexes_by_pair.get(lane_key, 0) try: + endpoint_metadata = { + "start_element_uuid": _wire_item_value(item, "start_element_uuid"), + "start_terminal_display": _wire_item_value(item, "start_terminal_display"), + "start_device_label": _wire_item_value(item, "start_device_label"), + "end_element_uuid": _wire_item_value(item, "end_element_uuid"), + "end_terminal_display": _wire_item_value(item, "end_terminal_display"), + "end_device_label": _wire_item_value(item, "end_device_label"), + "endpoint_label": _wire_item_value(item, "endpoint_label"), + } result = route_eplan_connection_between_terminals( doc, start_terminal, @@ -1295,6 +1361,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la wire_mark=_wire_item_value(item, "wire_mark"), wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)), wire_style_id=_wire_item_value(item, "wire_style_id"), + endpoint_metadata=endpoint_metadata, ) except Exception as exc: error_text = str(exc) @@ -1338,7 +1405,14 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la "wire_label": _wire_item_value(item, "wire_label", "wire_mark"), "wire_style_id": _wire_item_value(item, "wire_style_id"), "start_terminal_uuid": start_uuid, + "start_element_uuid": _wire_item_value(item, "start_element_uuid"), + "start_terminal_display": _wire_item_value(item, "start_terminal_display"), + "start_device_label": _wire_item_value(item, "start_device_label"), "end_terminal_uuid": end_uuid, + "end_element_uuid": _wire_item_value(item, "end_element_uuid"), + "end_terminal_display": _wire_item_value(item, "end_terminal_display"), + "end_device_label": _wire_item_value(item, "end_device_label"), + "endpoint_label": _wire_item_value(item, "endpoint_label"), "algorithm": result["algorithm"], "route_status": result["route_status"], "length_mm": route_length, @@ -1561,10 +1635,13 @@ def _wire_tasks_payload(doc): "start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(), "start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(), "start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(), + "start_device_label": (getattr(task, "QetStartDeviceLabel", "") or "").strip(), "end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(), "end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(), "end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(), "end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(), + "end_device_label": (getattr(task, "QetEndDeviceLabel", "") or "").strip(), + "endpoint_label": (getattr(task, "QetEndpointLabel", "") or "").strip(), } ) return payload diff --git a/tests/python/freecad_exchange_auto_routing_test.py b/tests/python/freecad_exchange_auto_routing_test.py index 0573821..1d9581c 100644 --- a/tests/python/freecad_exchange_auto_routing_test.py +++ b/tests/python/freecad_exchange_auto_routing_test.py @@ -1572,6 +1572,87 @@ class AutoRoutingTest(unittest.TestCase): self.assertEqual(1, route["network"]["carriers"]) self.assertEqual("WireDuct", route["route_track"]["segments"][0]["carrier"]["kind"]) + def test_route_eplan_connections_preserves_endpoint_metadata_on_routed_wire(self): + _install_fake_freecad() + terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules() + app = sys.modules["FreeCAD"] + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0)) + _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0)) + routing_network.create_route_carrier( + doc, + [app.Vector(0, 0, 20), app.Vector(100, 0, 20)], + project_uuid="project-1", + kind="WireDuct", + ) + payload = { + "project_uuid": "project-1", + "wires": [ + { + "wire_id": "wire-1", + "start_element_uuid": "device-a", + "start_terminal_uuid": "terminal-start", + "start_terminal_display": "A1", + "end_element_uuid": "device-b", + "end_terminal_uuid": "terminal-end", + "end_terminal_display": "B1", + } + ], + } + + report = auto_routing.route_eplan_connections_from_payload(doc, payload) + routed_group = doc.getObject("QETWiring_04_Routed") + wire = routed_group.Group[0] + diagnostics = json.loads(wire.QetRouteDiagnosticsJson) + + self.assertEqual("device-a", wire.QetStartElementUuid) + self.assertEqual("A1", wire.QetStartTerminalDisplay) + self.assertEqual("device-b", wire.QetEndElementUuid) + self.assertEqual("B1", wire.QetEndTerminalDisplay) + self.assertEqual("device-a", report["routes"][0]["start_element_uuid"]) + self.assertEqual("B1", report["routes"][0]["end_terminal_display"]) + self.assertEqual("A1", diagnostics["endpoint_metadata"]["start_terminal_display"]) + + def test_route_eplan_connection_tasks_preserve_task_endpoint_labels_on_routed_wire(self): + _install_fake_freecad() + terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules() + app = sys.modules["FreeCAD"] + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0)) + _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0)) + routing_network.create_route_carrier( + doc, + [app.Vector(0, 0, 20), app.Vector(100, 0, 20)], + project_uuid="project-1", + kind="WireDuct", + ) + task = wiring_objects.create_wire_task( + doc, + "project-1", + "wire-1", + "N4111", + "terminal-start", + "terminal-end", + "instance-a", + "instance-b", + ) + terminal_objects.ensure_string_property(task, "QetStartDeviceLabel", "QET Wiring", "", "QF1") + terminal_objects.ensure_string_property(task, "QetEndDeviceLabel", "QET Wiring", "", "X1") + terminal_objects.ensure_string_property(task, "QetEndpointLabel", "QET Wiring", "", "QF1:A1 -> X1:B1") + + report = auto_routing.route_eplan_connection_tasks(doc) + routed_group = doc.getObject("QETWiring_04_Routed") + wire = routed_group.Group[0] + diagnostics = json.loads(wire.QetRouteDiagnosticsJson) + + self.assertEqual("QF1", wire.QetStartDeviceLabel) + self.assertEqual("X1", wire.QetEndDeviceLabel) + self.assertEqual("QF1:A1 -> X1:B1", wire.QetEndpointLabel) + self.assertEqual("QF1:A1 -> X1:B1", report["routes"][0]["endpoint_label"]) + self.assertEqual("QF1", diagnostics["endpoint_metadata"]["start_device_label"]) + def test_route_eplan_connections_records_wire_identity_for_errors(self): _install_fake_freecad() terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()