diff --git a/src/Mod/FreeCADExchange/AutoRouting.py b/src/Mod/FreeCADExchange/AutoRouting.py index 2da8cad..83af255 100644 --- a/src/Mod/FreeCADExchange/AutoRouting.py +++ b/src/Mod/FreeCADExchange/AutoRouting.py @@ -1296,6 +1296,32 @@ def _route_lane_key(start_uuid, end_uuid): return tuple(endpoints) +def _route_segment_key(segment): + if not isinstance(segment, dict): + return None + carrier = segment.get("carrier") or {} + carrier_name = str(carrier.get("name", "") or "").strip() + from_key = tuple(segment.get("from_key", []) or []) + to_key = tuple(segment.get("to_key", []) or []) + if not from_key or not to_key: + return None + return ( + carrier_name, + tuple(sorted((from_key, to_key))), + ) + + +def _route_segment_keys(result): + route_track = result.get("route_track", {}) if isinstance(result, dict) else {} + segments = route_track.get("segments", []) if isinstance(route_track, dict) else [] + keys = [] + for segment in segments or []: + key = _route_segment_key(segment) + if key is not None: + keys.append(key) + return keys + + def bind_wire_task_terminals_from_payload(doc, payload): """Bind local template terminals to QET terminal UUIDs without creating wires.""" if doc is None: @@ -1376,11 +1402,29 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la report["prepared_layout"] = prepared_layout missing_endpoint_uuids = set() lane_indexes_by_pair = {} + lane_indexes_by_segment = {} def add_status(status): key = str(status or "").strip() or "Unknown" report["route_status_counts"][key] = report["route_status_counts"].get(key, 0) + 1 + def create_route(route_lane_index, item, start_terminal, end_terminal, endpoint_metadata): + return route_eplan_connection_between_terminals( + doc, + start_terminal, + end_terminal, + route_index=route_lane_index, + options=options, + wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"), + wire_label=_wire_item_value(item, "wire_label", "wire_mark"), + net_uuid=_wire_item_value(item, "net_uuid"), + group_uuid=_wire_item_value(item, "group_uuid"), + wire_mark=_wire_item_value(item, "wire_mark"), + wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)), + wire_style_id=_wire_item_value(item, "wire_style_id"), + endpoint_metadata=endpoint_metadata, + ) + for item in wires: if not isinstance(item, dict): report["skipped_invalid"] += 1 @@ -1425,21 +1469,33 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la "end_device_label": _wire_item_value(item, "end_device_label"), "endpoint_label": _wire_item_value(item, "endpoint_label"), } - result = route_eplan_connection_between_terminals( - doc, + result = create_route( + route_lane_index, + item, start_terminal, end_terminal, - route_index=route_lane_index, - options=options, - wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"), - wire_label=_wire_item_value(item, "wire_label", "wire_mark"), - net_uuid=_wire_item_value(item, "net_uuid"), - group_uuid=_wire_item_value(item, "group_uuid"), - wire_mark=_wire_item_value(item, "wire_mark"), - wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)), - wire_style_id=_wire_item_value(item, "wire_style_id"), - endpoint_metadata=endpoint_metadata, + endpoint_metadata, ) + route_segment_keys = _route_segment_keys(result) + shared_lane_index = max( + [lane_indexes_by_segment.get(key, 0) for key in route_segment_keys] or [0] + ) + final_lane_index = max(route_lane_index, shared_lane_index) + if final_lane_index != route_lane_index: + initial_wire = result.get("wire") if isinstance(result, dict) else None + try: + result = create_route( + final_lane_index, + item, + start_terminal, + end_terminal, + endpoint_metadata, + ) + except Exception: + if initial_wire is not None: + _remove_routing_connection_objects(doc, [initial_wire]) + raise + route_segment_keys = _route_segment_keys(result) except Exception as exc: error_text = str(exc) report["errors"].append(error_text) @@ -1462,7 +1518,15 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la } ) continue - lane_indexes_by_pair[lane_key] = route_lane_index + 1 + lane_indexes_by_pair[lane_key] = max( + lane_indexes_by_pair.get(lane_key, 0), + int(result.get("lane", {}).get("index", 0) or 0) + 1, + ) + for segment_key in route_segment_keys: + lane_indexes_by_segment[segment_key] = max( + lane_indexes_by_segment.get(segment_key, 0), + int(result.get("lane", {}).get("index", 0) or 0) + 1, + ) if result["route_status"] == "CollisionWarning": report["collision_warnings"] += 1 add_status(result["route_status"]) diff --git a/tests/python/freecad_exchange_auto_routing_test.py b/tests/python/freecad_exchange_auto_routing_test.py index 776fb89..43d8aaf 100644 --- a/tests/python/freecad_exchange_auto_routing_test.py +++ b/tests/python/freecad_exchange_auto_routing_test.py @@ -1870,6 +1870,54 @@ class AutoRoutingTest(unittest.TestCase): self.assertEqual(0, report["routes"][1]["lane"]["index"]) self.assertEqual(1, report["routes"][2]["lane"]["index"]) + def test_route_eplan_connections_lane_index_increments_for_shared_route_segments(self): + _install_fake_freecad() + terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules() + app = sys.modules["FreeCAD"] + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + _terminal(doc, terminal_objects, "TerminalStartA", "terminal-start-a", app.Vector(0, 0, 0)) + _terminal(doc, terminal_objects, "TerminalEndA", "terminal-end-a", app.Vector(100, 0, 0)) + _terminal(doc, terminal_objects, "TerminalStartB", "terminal-start-b", app.Vector(0, 0, 0)) + _terminal(doc, terminal_objects, "TerminalEndB", "terminal-end-b", app.Vector(100, 0, 0)) + routing_network.create_route_carrier( + doc, + [app.Vector(0, 0, 20), app.Vector(100, 0, 20)], + project_uuid="project-1", + kind="WireDuct", + ) + payload = { + "project_uuid": "project-1", + "wires": [ + { + "wire_id": "wire-a", + "start_terminal_uuid": "terminal-start-a", + "end_terminal_uuid": "terminal-end-a", + }, + { + "wire_id": "wire-b", + "start_terminal_uuid": "terminal-start-b", + "end_terminal_uuid": "terminal-end-b", + }, + ], + } + + report = auto_routing.route_eplan_connections_from_payload( + doc, + payload, + options={"lane_spacing": 10.0, "lane_axis": "y"}, + ) + + self.assertEqual(0, report["routes"][0]["lane"]["index"]) + self.assertEqual(1, report["routes"][1]["lane"]["index"]) + routed_group = doc.getObject("QETWiring_04_Routed") + second_wire = [ + wire + for wire in list(getattr(routed_group, "Group", []) or []) + if getattr(wire, "QetWireUuid", "") == "wire-b" + ][0] + self.assertTrue(any(abs(point.y - 10.0) <= 0.001 for point in second_wire.Points[1:-1])) + def test_route_eplan_connections_report_includes_collision_samples(self): _install_fake_freecad() terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()