Compare commits

..

No commits in common. 'ba9d971ef50a4d6dd9a9cb41238a9cc23dbcd641' and 'aaa2fd365441d90fd54955679f07ec82dbc15940' have entirely different histories.

@ -19,7 +19,7 @@ The wire duct is a gray open duct for cabinet routing:
- Width: `40 mm`
- Height: `40 mm`
It is generated as one FreeCAD tree object, `WireDuct_Body`, with the comb-style side slots and mounting holes cut into the body.
It includes a base plate, two side walls, comb-style side slots, and mounting hole markers.
## DIN Rail

@ -113,60 +113,60 @@ def _create_wire_duct():
y0 = -width / 2.0
light_gray = (0.72, 0.74, 0.74)
dark_gray = (0.18, 0.2, 0.22)
base = Part.makeBox(length, width, wall, App.Vector(x0, y0, 0.0))
left_wall = Part.makeBox(length, wall, height, App.Vector(x0, y0, 0.0))
right_wall = Part.makeBox(length, wall, height, App.Vector(x0, width / 2.0 - wall, 0.0))
body = base.fuse(left_wall).fuse(right_wall)
objects = [
_box(doc, "WireDuct_BasePlate", length, width, wall, x0, y0, 0.0, light_gray),
_box(doc, "WireDuct_LeftWall", length, wall, height, x0, y0, 0.0, light_gray),
_box(doc, "WireDuct_RightWall", length, wall, height, x0, width / 2.0 - wall, 0.0, light_gray),
]
slot_count = 18
slot_pitch = length / slot_count
finger_width = slot_pitch * 0.45
for index in range(slot_count):
center_x = x0 + slot_pitch * (index + 0.5)
body = body.cut(
Part.makeBox(
objects.append(
_box(
doc,
"WireDuct_LeftCombSlot_{0:02d}".format(index + 1),
finger_width,
wall + 0.4,
wall + 0.2,
height - 8.0,
App.Vector(center_x - finger_width / 2.0, y0 - 0.2, 8.0),
center_x - finger_width / 2.0,
y0 - 0.1,
8.0,
dark_gray,
)
)
body = body.cut(
Part.makeBox(
objects.append(
_box(
doc,
"WireDuct_RightCombSlot_{0:02d}".format(index + 1),
finger_width,
wall + 0.4,
wall + 0.2,
height - 8.0,
App.Vector(center_x - finger_width / 2.0, width / 2.0 - wall - 0.2, 8.0),
center_x - finger_width / 2.0,
width / 2.0 - wall - 0.1,
8.0,
dark_gray,
)
)
for center_x in (-60.0, 0.0, 60.0):
body = body.cut(
Part.makeCylinder(
2.2,
wall + 0.4,
App.Vector(center_x, 0.0, -0.2),
App.Vector(0, 0, 1),
)
)
body_obj = doc.addObject("Part::Feature", "WireDuct_Body")
body_obj.Shape = body
_style(body_obj, light_gray, 0)
objects.append(_cylinder_z(doc, "WireDuct_MountHole_{0:g}".format(center_x), 2.2, wall + 0.2, center_x, 0.0, 0.0, dark_gray))
doc.recompute()
fcstd = OUT_DIR / "qet_wire_duct.FCStd"
step = OUT_DIR / "qet_wire_duct.step"
doc.saveAs(str(fcstd))
_export_step([body_obj], step)
_export_step(objects, step)
return {
"name": "wire_duct",
"fcstd": str(fcstd),
"step": str(step),
"dimensions_mm": {"length": length, "width": width, "height": height},
"objects": [body_obj.Name],
"object_count": 1,
"objects": [obj.Name for obj in objects],
}

@ -10,9 +10,49 @@
"height": 40.0
},
"objects": [
"WireDuct_Body"
],
"object_count": 1
"WireDuct_BasePlate",
"WireDuct_LeftWall",
"WireDuct_RightWall",
"WireDuct_LeftCombSlot_01",
"WireDuct_RightCombSlot_01",
"WireDuct_LeftCombSlot_02",
"WireDuct_RightCombSlot_02",
"WireDuct_LeftCombSlot_03",
"WireDuct_RightCombSlot_03",
"WireDuct_LeftCombSlot_04",
"WireDuct_RightCombSlot_04",
"WireDuct_LeftCombSlot_05",
"WireDuct_RightCombSlot_05",
"WireDuct_LeftCombSlot_06",
"WireDuct_RightCombSlot_06",
"WireDuct_LeftCombSlot_07",
"WireDuct_RightCombSlot_07",
"WireDuct_LeftCombSlot_08",
"WireDuct_RightCombSlot_08",
"WireDuct_LeftCombSlot_09",
"WireDuct_RightCombSlot_09",
"WireDuct_LeftCombSlot_10",
"WireDuct_RightCombSlot_10",
"WireDuct_LeftCombSlot_11",
"WireDuct_RightCombSlot_11",
"WireDuct_LeftCombSlot_12",
"WireDuct_RightCombSlot_12",
"WireDuct_LeftCombSlot_13",
"WireDuct_RightCombSlot_13",
"WireDuct_LeftCombSlot_14",
"WireDuct_RightCombSlot_14",
"WireDuct_LeftCombSlot_15",
"WireDuct_RightCombSlot_15",
"WireDuct_LeftCombSlot_16",
"WireDuct_RightCombSlot_16",
"WireDuct_LeftCombSlot_17",
"WireDuct_RightCombSlot_17",
"WireDuct_LeftCombSlot_18",
"WireDuct_RightCombSlot_18",
"WireDuct_MountHole__60",
"WireDuct_MountHole_0",
"WireDuct_MountHole_60"
]
},
{
"name": "din_rail",

@ -1,7 +1,7 @@
ISO-10303-21;
HEADER;
FILE_DESCRIPTION(('FreeCAD Model'),'2;1');
FILE_NAME('Open CASCADE Shape Model','2026-05-31T14:15:58',(''),(''),
FILE_NAME('Open CASCADE Shape Model','2026-05-26T19:26:07',(''),(''),
'Open CASCADE STEP processor 7.8','FreeCAD','Unknown');
FILE_SCHEMA(('AUTOMOTIVE_DESIGN { 1 0 10303 214 1 1 1 1 }'));
ENDSEC;
@ -247,8 +247,8 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#226 = ITEM_DEFINED_TRANSFORMATION('','',#11,#15);
#227 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',#228
);
#228 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('1','DINRail_CenterTop','',#5,#63,
$);
#228 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('43','DINRail_CenterTop','',#5,#63
,$);
#229 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#65));
#230 = SHAPE_DEFINITION_REPRESENTATION(#231,#237);
#231 = PRODUCT_DEFINITION_SHAPE('','',#232);
@ -424,8 +424,8 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#395 = ITEM_DEFINED_TRANSFORMATION('','',#11,#19);
#396 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',#397
);
#397 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('2','DINRail_LeftWeb','',#5,#232,$
);
#397 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('44','DINRail_LeftWeb','',#5,#232,
$);
#398 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#234));
#399 = SHAPE_DEFINITION_REPRESENTATION(#400,#406);
#400 = PRODUCT_DEFINITION_SHAPE('','',#401);
@ -601,8 +601,8 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#564 = ITEM_DEFINED_TRANSFORMATION('','',#11,#23);
#565 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',#566
);
#566 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('3','DINRail_RightWeb','',#5,#401,
$);
#566 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('45','DINRail_RightWeb','',#5,#401
,$);
#567 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#403));
#568 = SHAPE_DEFINITION_REPRESENTATION(#569,#575);
#569 = PRODUCT_DEFINITION_SHAPE('','',#570);
@ -778,7 +778,7 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#733 = ITEM_DEFINED_TRANSFORMATION('','',#11,#27);
#734 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',#735
);
#735 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('4','DINRail_LeftFlange','',#5,
#735 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('46','DINRail_LeftFlange','',#5,
#570,$);
#736 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#572));
#737 = SHAPE_DEFINITION_REPRESENTATION(#738,#744);
@ -955,7 +955,7 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#902 = ITEM_DEFINED_TRANSFORMATION('','',#11,#31);
#903 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',#904
);
#904 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('5','DINRail_RightFlange','',#5,
#904 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('47','DINRail_RightFlange','',#5,
#739,$);
#905 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#741));
#906 = SHAPE_DEFINITION_REPRESENTATION(#907,#913);
@ -1133,8 +1133,8 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#1071 = ITEM_DEFINED_TRANSFORMATION('','',#11,#35);
#1072 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',
#1073);
#1073 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('6','DINRail_LeftReturnLip','',#5
,#908,$);
#1073 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('48','DINRail_LeftReturnLip','',
#5,#908,$);
#1074 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#910));
#1075 = SHAPE_DEFINITION_REPRESENTATION(#1076,#1082);
#1076 = PRODUCT_DEFINITION_SHAPE('','',#1077);
@ -1311,7 +1311,7 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#1240 = ITEM_DEFINED_TRANSFORMATION('','',#11,#39);
#1241 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',
#1242);
#1242 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('7','DINRail_RightReturnLip','',
#1242 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('49','DINRail_RightReturnLip','',
#5,#1077,$);
#1243 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#1079));
#1244 = SHAPE_DEFINITION_REPRESENTATION(#1245,#1251);
@ -1673,8 +1673,8 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#1592 = ITEM_DEFINED_TRANSFORMATION('','',#11,#43);
#1593 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',
#1594);
#1594 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('8','DINRail_MountSlot__60','',#5
,#1246,$);
#1594 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('50','DINRail_MountSlot__60','',
#5,#1246,$);
#1595 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#1248));
#1596 = SHAPE_DEFINITION_REPRESENTATION(#1597,#1603);
#1597 = PRODUCT_DEFINITION_SHAPE('','',#1598);
@ -2034,7 +2034,7 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#1944 = ITEM_DEFINED_TRANSFORMATION('','',#11,#47);
#1945 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',
#1946);
#1946 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('9','DINRail_MountSlot_0','',#5,
#1946 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('51','DINRail_MountSlot_0','',#5,
#1598,$);
#1947 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#1600));
#1948 = SHAPE_DEFINITION_REPRESENTATION(#1949,#1955);
@ -2396,7 +2396,7 @@ SHAPE_REPRESENTATION_RELATIONSHIP() );
#2296 = ITEM_DEFINED_TRANSFORMATION('','',#11,#51);
#2297 = PRODUCT_DEFINITION_SHAPE('Placement','Placement of an item',
#2298);
#2298 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('10','DINRail_MountSlot_60','',#5
#2298 = NEXT_ASSEMBLY_USAGE_OCCURRENCE('52','DINRail_MountSlot_60','',#5
,#1950,$);
#2299 = PRODUCT_RELATED_PRODUCT_CATEGORY('part',$,(#1952));
ENDSEC;

File diff suppressed because it is too large Load Diff

@ -23,9 +23,8 @@ import WiringObjects
DEFAULT_OPTIONS = {
# 端子出来先走一小段,避免导线贴着设备外壳起步。
"terminal_exit_length": 20.0,
"lane_axis": "auto",
"lane_axis": "y",
"lane_spacing": 10.0,
"segment_reuse_penalty": 200.0,
# 线槽网络相关参数。
"use_routing_network": True,
"network_entry_max_distance": 1000.0,
@ -140,30 +139,9 @@ def _with_axis(point, axis, value):
)
def _auto_lane_axis(route_points):
points = [_vector(point) for point in route_points or []]
if len(points) < 2:
return "y"
extents = {"x": 0.0, "y": 0.0, "z": 0.0}
for index in range(len(points) - 1):
start = points[index]
end = points[index + 1]
extents["x"] += abs(float(end.x) - float(start.x))
extents["y"] += abs(float(end.y) - float(start.y))
extents["z"] += abs(float(end.z) - float(start.z))
dominant_axis = max(extents, key=lambda axis: extents[axis])
if dominant_axis == "y":
return "x"
if dominant_axis == "x":
return "y"
return "x"
def _lane_payload(route_index, options, route_points=None):
def _lane_payload(route_index, options):
opts = options or {}
lane_axis = (opts.get("lane_axis") or "y").lower()
if lane_axis == "auto":
lane_axis = _auto_lane_axis(route_points)
if lane_axis not in {"x", "y", "z"}:
lane_axis = "y"
lane_index = max(int(route_index or 0), 0)
@ -228,79 +206,6 @@ def _append_orthogonal(points, target_point, preferred_axis=None):
_append_unique(points, point)
def _collinear_points(first, middle, last):
ax = float(middle.x) - float(first.x)
ay = float(middle.y) - float(first.y)
az = float(middle.z) - float(first.z)
bx = float(last.x) - float(middle.x)
by = float(last.y) - float(middle.y)
bz = float(last.z) - float(middle.z)
cross_x = ay * bz - az * by
cross_y = az * bx - ax * bz
cross_z = ax * by - ay * bx
dot = ax * bx + ay * by + az * bz
return (
abs(cross_x) <= 0.000001
and abs(cross_y) <= 0.000001
and abs(cross_z) <= 0.000001
and dot >= -0.000001
)
def _route_point_key(point, tolerance=0.001):
scale = 1.0 / float(tolerance or 0.001)
return (
int(round(float(point.x) * scale)),
int(round(float(point.y) * scale)),
int(round(float(point.z) * scale)),
)
def _simplify_collinear_points(points, preserved_point_keys=None):
normalized = [_vector(point) for point in points or [] if _is_finite_point(_vector(point))]
if len(normalized) <= 2:
return normalized
preserved_indices = {0, 1, len(normalized) - 2, len(normalized) - 1}
preserved_point_keys = set(preserved_point_keys or [])
simplified = [normalized[0]]
simplified_indices = [0]
for index, point in enumerate(normalized[1:], start=1):
_append_unique(simplified, point)
if len(simplified_indices) < len(simplified):
simplified_indices.append(index)
while len(simplified) >= 3 and _collinear_points(
simplified[-3],
simplified[-2],
simplified[-1],
):
if (
simplified_indices[-2] in preserved_indices
or _route_point_key(simplified[-2]) in preserved_point_keys
):
break
simplified.pop(-2)
simplified_indices.pop(-2)
return simplified
def _important_route_node_keys(network, path_keys, path_result):
edges = network.get("edges", {}) if isinstance(network, dict) else {}
important = {
key
for key in path_keys or []
if len(edges.get(key, []) or []) != 2
}
segments = path_result.get("segments", []) if isinstance(path_result, dict) else []
for index in range(1, len(path_keys or []) - 1):
previous_segment = segments[index - 1] if index - 1 < len(segments) else {}
next_segment = segments[index] if index < len(segments) else {}
previous_carrier = (previous_segment.get("carrier") or {}).get("name", "")
next_carrier = (next_segment.get("carrier") or {}).get("name", "")
if previous_carrier != next_carrier:
important.add(path_keys[index])
return important
def _offset(point, direction, distance):
return App.Vector(
float(point.x) + float(direction.x) * float(distance),
@ -648,46 +553,9 @@ def _set_string(obj, name, value, description="Routing connection property"):
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
def _clean_endpoint_metadata(endpoint_metadata):
if not isinstance(endpoint_metadata, dict):
return {}
allowed = (
"start_element_uuid",
"start_terminal_display",
"start_device_label",
"end_element_uuid",
"end_terminal_display",
"end_device_label",
"endpoint_label",
)
cleaned = {}
for key in allowed:
value = str(endpoint_metadata.get(key, "") or "").strip()
if value:
cleaned[key] = value
return cleaned
def _set_endpoint_metadata(wire, endpoint_metadata):
metadata = _clean_endpoint_metadata(endpoint_metadata)
property_names = {
"start_element_uuid": "QetStartElementUuid",
"start_terminal_display": "QetStartTerminalDisplay",
"start_device_label": "QetStartDeviceLabel",
"end_element_uuid": "QetEndElementUuid",
"end_terminal_display": "QetEndTerminalDisplay",
"end_device_label": "QetEndDeviceLabel",
"endpoint_label": "QetEndpointLabel",
}
for key, prop_name in property_names.items():
if key in metadata:
_set_string(wire, prop_name, metadata[key], "QET routed wire endpoint metadata")
return metadata
def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=None):
def _route_payload(route_data, collisions, wire_style_id=""):
points = route_data.get("points", [])
payload = {
return {
"algorithm": route_data.get("algorithm", ""),
"length_mm": _route_length(points),
"wire_style_id": str(wire_style_id or "").strip(),
@ -698,15 +566,10 @@ def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=N
"network": route_data.get("network", {}),
"route_track": route_data.get("route_track", {}),
}
metadata = _clean_endpoint_metadata(endpoint_metadata)
if metadata:
payload["endpoint_metadata"] = metadata
return payload
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id="", endpoint_metadata=None):
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=""):
length_mm = _route_length(route_data.get("points", []))
cleaned_endpoint_metadata = _set_endpoint_metadata(wire, endpoint_metadata)
_set_string(
wire,
"QetRouteAlgorithm",
@ -728,15 +591,7 @@ def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id
_set_string(
wire,
"QetRouteDiagnosticsJson",
json.dumps(
_route_payload(
route_data,
collisions,
wire_style_id=wire_style_id,
endpoint_metadata=cleaned_endpoint_metadata,
),
ensure_ascii=False,
),
json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False),
"Routing connection diagnostics",
)
if route_data.get("network"):
@ -774,8 +629,8 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
if network.get("segment_count", 0) <= 0:
return None
start_key, start_distance, start_mode = RoutingNetwork.connect_point_to_network(network, start_exit)
end_key, end_distance, end_mode = RoutingNetwork.connect_point_to_network(network, end_exit)
start_key, start_distance = RoutingNetwork.nearest_node(network, start_exit)
end_key, end_distance = RoutingNetwork.nearest_node(network, end_exit)
if start_key is None or end_key is None:
return None
@ -792,8 +647,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
end_key,
bend_penalty=float(opts.get("bend_penalty", 0.0) or 0.0),
kind_cost_factors=opts.get("carrier_kind_cost_factors", {}),
segment_usage_costs=opts.get("segment_usage_costs", {}),
segment_reuse_penalty=float(opts.get("segment_reuse_penalty", 0.0) or 0.0),
)
path_keys = path_result.get("path", []) if isinstance(path_result, dict) else []
if not path_keys:
@ -802,7 +655,7 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
carrier_points = RoutingNetwork.path_points(network, path_keys)
if not carrier_points:
return None
lane = _lane_payload(route_index, opts, route_points=carrier_points)
lane = _lane_payload(route_index, opts)
carrier_points = _apply_lane_offset(carrier_points, lane)
points = []
@ -813,10 +666,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
_append_unique(points, point)
_append_orthogonal(points, end_exit)
_append_unique(points, end_origin)
points = _simplify_collinear_points(
points,
preserved_point_keys=_important_route_node_keys(network, path_keys, path_result),
)
return {
"algorithm": "network-dijkstra-v1",
@ -828,8 +677,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
"nodes": len(network.get("nodes", {})),
"entry_distance": float(start_distance or 0.0),
"exit_distance": float(end_distance or 0.0),
"entry_point_mode": start_mode,
"exit_point_mode": end_mode,
"obstacle_aware": bool(obstacle_aware),
},
"route_track": path_result,
@ -1029,19 +876,14 @@ def detect_collisions(points, obstacles, ignored_segment_indices=None):
end = points[index + 1]
for obstacle in obstacles:
if _segment_intersects_bbox(start, end, obstacle["bbox"]):
raw_bbox = obstacle.get("raw_bbox") or obstacle.get("bbox") or {}
collision_kind = "HardIntersection"
if raw_bbox and not _segment_intersects_bbox(start, end, raw_bbox):
collision_kind = "ClearanceWarning"
collisions.append(
{
"segment_index": index,
"segment_start": _point_payload(start),
"segment_end": _point_payload(end),
"collision_kind": collision_kind,
"obstacle_name": obstacle.get("name", ""),
"obstacle_label": obstacle.get("label", ""),
"obstacle_bbox": dict(raw_bbox),
"obstacle_bbox": dict(obstacle.get("raw_bbox") or obstacle.get("bbox") or {}),
"collision_bbox": dict(obstacle.get("bbox", {}) or {}),
}
)
@ -1077,8 +919,8 @@ def _detach_object_from_groups(doc, obj):
pass
def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
matches = []
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
removed = 0
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection":
continue
@ -1096,13 +938,6 @@ def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=
)
if not same_direction and not reverse_direction:
continue
matches.append(obj)
return matches
def _remove_routing_connection_objects(doc, objects):
removed = 0
for obj in list(objects or []):
try:
_detach_object_from_groups(doc, obj)
doc.removeObject(obj.Name)
@ -1112,13 +947,6 @@ def _remove_routing_connection_objects(doc, objects):
return removed
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
return _remove_routing_connection_objects(
doc,
_matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid),
)
def _find_task_by_wire_uuid(doc, wire_uuid):
if not wire_uuid:
return None
@ -1175,7 +1003,6 @@ def route_eplan_connection_between_terminals(
wire_mark="",
wire_mark_is_manual=False,
wire_style_id="",
endpoint_metadata=None,
):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
@ -1194,6 +1021,9 @@ def route_eplan_connection_between_terminals(
if not project_uuid:
raise AutoRoutingError("Project UUID is required for routing connections.")
if opts.get("replace_existing", True):
_remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid)
route_data = build_network_route(
start_terminal,
end_terminal,
@ -1217,67 +1047,39 @@ def route_eplan_connection_between_terminals(
collisions = detect_collisions(points, obstacles, ignored_segment_indices=ignored_collision_segments)
status = "CollisionWarning" if collisions else "Routed"
existing_replacements = []
if opts.get("replace_existing", True):
existing_replacements = _matching_existing_routing_connections(
doc,
start_uuid,
end_uuid,
wire_uuid=wire_uuid,
)
wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid))
wire = None
try:
wire = _create_wire_geometry(doc, wire_name, points)
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="RoutedConnection",
route_status=status,
route_mode="EplanRoute",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(
wire,
route_data,
collisions,
wire_style_id=effective_wire_style_id,
endpoint_metadata=endpoint_metadata,
)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try:
routed_group.ViewObject.Visibility = True
except Exception:
pass
_style_wire(wire, collision_count=len(collisions))
wire = _create_wire_geometry(doc, wire_name, points)
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="RoutedConnection",
route_status=status,
route_mode="EplanRoute",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
task = _find_task_by_wire_uuid(doc, wire_uuid)
_set_task_status(task, status)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try:
routed_group.ViewObject.Visibility = True
except Exception:
if wire is not None:
_remove_routing_connection_objects(doc, [wire])
raise
pass
_style_wire(wire, collision_count=len(collisions))
if existing_replacements:
removed_existing = _remove_routing_connection_objects(doc, existing_replacements)
if removed_existing != len(existing_replacements):
if wire is not None:
_remove_routing_connection_objects(doc, [wire])
raise AutoRoutingError("Failed to replace existing routed connection.")
task = _find_task_by_wire_uuid(doc, wire_uuid)
_set_task_status(task, status)
try:
doc.recompute()
@ -1320,65 +1122,6 @@ def _route_lane_key(start_uuid, end_uuid):
return tuple(endpoints)
def _route_segment_key(segment):
if not isinstance(segment, dict):
return None
carrier = segment.get("carrier") or {}
carrier_name = str(carrier.get("name", "") or "").strip()
from_key = tuple(segment.get("from_key", []) or [])
to_key = tuple(segment.get("to_key", []) or [])
if not from_key or not to_key:
return None
return (
carrier_name,
tuple(sorted((from_key, to_key))),
)
def _route_segment_keys(result):
route_track = result.get("route_track", {}) if isinstance(result, dict) else {}
return _route_track_segment_keys(route_track)
def _route_track_segment_keys(route_track):
segments = route_track.get("segments", []) if isinstance(route_track, dict) else []
keys = []
for segment in segments or []:
key = _route_segment_key(segment)
if key is not None:
keys.append(key)
return keys
def _incoming_wire_uuids(wires):
wire_uuids = set()
for item in wires or []:
if not isinstance(item, dict):
continue
wire_uuid = _wire_item_value(item, "wire_id", "wire_uuid", "id")
if wire_uuid:
wire_uuids.add(wire_uuid)
return wire_uuids
def _existing_routed_segment_usage(doc, excluded_wire_uuids=None):
excluded_wire_uuids = set(excluded_wire_uuids or [])
usage = {}
for wire in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(wire, "RouteType", "") or "").strip() != "RoutedConnection":
continue
wire_uuid = (getattr(wire, "QetWireUuid", "") or "").strip()
if wire_uuid and wire_uuid in excluded_wire_uuids:
continue
try:
route_track = json.loads((getattr(wire, "QetRouteTrackJson", "") or "").strip() or "{}")
except Exception:
route_track = {}
for segment_key in _route_track_segment_keys(route_track):
usage[segment_key] = usage.get(segment_key, 0) + 1
return usage
def bind_wire_task_terminals_from_payload(doc, payload):
"""Bind local template terminals to QET terminal UUIDs without creating wires."""
if doc is None:
@ -1459,36 +1202,11 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
report["prepared_layout"] = prepared_layout
missing_endpoint_uuids = set()
lane_indexes_by_pair = {}
lane_indexes_by_segment = {}
segment_usage_costs = _existing_routed_segment_usage(
doc,
excluded_wire_uuids=_incoming_wire_uuids(wires),
)
def add_status(status):
key = str(status or "").strip() or "Unknown"
report["route_status_counts"][key] = report["route_status_counts"].get(key, 0) + 1
def create_route(route_lane_index, item, start_terminal, end_terminal, endpoint_metadata):
route_options = dict(options or {})
if isinstance(item, dict) and "__segment_usage_costs" in item:
route_options["segment_usage_costs"] = item.get("__segment_usage_costs", {})
return route_eplan_connection_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=route_lane_index,
options=route_options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
endpoint_metadata=endpoint_metadata,
)
for item in wires:
if not isinstance(item, dict):
report["skipped_invalid"] += 1
@ -1524,42 +1242,20 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
lane_key = _route_lane_key(start_uuid, end_uuid)
route_lane_index = lane_indexes_by_pair.get(lane_key, 0)
try:
endpoint_metadata = {
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
}
result = create_route(
route_lane_index,
dict(item, __segment_usage_costs=segment_usage_costs),
result = route_eplan_connection_between_terminals(
doc,
start_terminal,
end_terminal,
endpoint_metadata,
route_index=route_lane_index,
options=options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
)
route_segment_keys = _route_segment_keys(result)
shared_lane_index = max(
[lane_indexes_by_segment.get(key, 0) for key in route_segment_keys] or [0]
)
final_lane_index = max(route_lane_index, shared_lane_index)
if final_lane_index != route_lane_index:
initial_wire = result.get("wire") if isinstance(result, dict) else None
try:
result = create_route(
final_lane_index,
dict(item, __segment_usage_costs=segment_usage_costs),
start_terminal,
end_terminal,
endpoint_metadata,
)
except Exception:
if initial_wire is not None:
_remove_routing_connection_objects(doc, [initial_wire])
raise
route_segment_keys = _route_segment_keys(result)
except Exception as exc:
error_text = str(exc)
report["errors"].append(error_text)
@ -1570,28 +1266,12 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_terminal_uuid": end_uuid,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
"error": error_text,
}
)
continue
lane_indexes_by_pair[lane_key] = max(
lane_indexes_by_pair.get(lane_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
for segment_key in route_segment_keys:
lane_indexes_by_segment[segment_key] = max(
lane_indexes_by_segment.get(segment_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
segment_usage_costs[segment_key] = segment_usage_costs.get(segment_key, 0) + 1
lane_indexes_by_pair[lane_key] = route_lane_index + 1
if result["route_status"] == "CollisionWarning":
report["collision_warnings"] += 1
add_status(result["route_status"])
@ -1618,14 +1298,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"wire_style_id": _wire_item_value(item, "wire_style_id"),
"start_terminal_uuid": start_uuid,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_terminal_uuid": end_uuid,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
"algorithm": result["algorithm"],
"route_status": result["route_status"],
"length_mm": route_length,
@ -1641,54 +1314,6 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
return report
def _missing_endpoint_label(sample, side):
terminal_uuid = str(sample.get("{0}_terminal_uuid".format(side), "") or "").strip()
element_uuid = str(sample.get("{0}_element_uuid".format(side), "") or "").strip()
terminal_display = str(sample.get("{0}_terminal_display".format(side), "") or "").strip()
if element_uuid and terminal_display:
label = "{0}/{1}".format(element_uuid, terminal_display)
elif terminal_display:
label = terminal_display
elif element_uuid:
label = element_uuid
else:
return terminal_uuid
if terminal_uuid and terminal_uuid != label:
return "{0} ({1})".format(label, terminal_uuid)
return label
def _missing_endpoint_side_summary(sample):
missing_sides = []
if sample.get("start_found") is False:
missing_sides.append("起点")
if sample.get("end_found") is False:
missing_sides.append("终点")
if not missing_sides:
return ""
if len(missing_sides) == 2:
return "(缺失:两端)"
return "(缺失:{0}".format(missing_sides[0])
def _wire_sample_text(sample):
return (
str(sample.get("wire_label", "") or "").strip()
or str(sample.get("wire_uuid", "") or "").strip()
or "未知导线"
)
def _endpoint_pair_text(sample):
endpoint_label = str(sample.get("endpoint_label", "") or "").strip()
if endpoint_label:
return endpoint_label
return "{0} -> {1}".format(
_missing_endpoint_label(sample, "start"),
_missing_endpoint_label(sample, "end"),
)
def format_eplan_connection_route_report(report):
message = "批量生成布线连接完成routed={0}, collision_warnings={1}, missing_terminals={2}".format(
report.get("routed", 0),
@ -1734,13 +1359,6 @@ def format_eplan_connection_route_report(report):
errors = report.get("errors", []) or []
if errors:
message += "\n首个错误:{0}".format(str(errors[0]))
error_sample = (report.get("error_samples") or [None])[0]
if error_sample:
message += "\n错误示例:导线 {0}{1}{2}".format(
_wire_sample_text(error_sample),
_endpoint_pair_text(error_sample),
error_sample.get("error", ""),
)
collision_sample = (report.get("collision_samples") or [None])[0]
if collision_sample:
obstacle_text = (
@ -1748,21 +1366,12 @@ def format_eplan_connection_route_report(report):
or collision_sample.get("obstacle_name")
or "未知对象"
)
wire_text = (
message += "\n碰撞示例:导线 {0} 碰到 {1}".format(
collision_sample.get("wire_label")
or collision_sample.get("wire_uuid")
or "未知导线"
or "未知导线",
obstacle_text,
)
if collision_sample.get("collision_kind") == "ClearanceWarning":
message += "\n碰撞示例:导线 {0} 进入 {1} 的安全间隙。".format(
wire_text,
obstacle_text,
)
else:
message += "\n碰撞示例:导线 {0} 碰到 {1}".format(
wire_text,
obstacle_text,
)
auto_bound = report.get("auto_bound_terminals", 0)
auto_created = report.get("auto_created_terminals", 0)
if auto_bound or auto_created:
@ -1782,10 +1391,9 @@ def format_eplan_connection_route_report(report):
message += " 请先从 QET 重新导入/更新工程端子,使端子 UUID 不再是 local:...。"
sample = (report.get("missing_endpoint_samples") or [None])[0]
if sample:
message += "\n缺失示例:{0} -> {1}{2}".format(
_missing_endpoint_label(sample, "start"),
_missing_endpoint_label(sample, "end"),
_missing_endpoint_side_summary(sample),
message += "\n缺失示例:{0} -> {1}".format(
sample.get("start_terminal_uuid", ""),
sample.get("end_terminal_uuid", ""),
)
return message
@ -1873,13 +1481,10 @@ def _wire_tasks_payload(doc):
"start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(),
"start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(),
"start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(),
"start_device_label": (getattr(task, "QetStartDeviceLabel", "") or "").strip(),
"end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(),
"end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(),
"end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(),
"end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(),
"end_device_label": (getattr(task, "QetEndDeviceLabel", "") or "").strip(),
"endpoint_label": (getattr(task, "QetEndpointLabel", "") or "").strip(),
}
)
return payload
@ -1961,92 +1566,6 @@ def check_eplan_routing_path_network(doc, project_uuid="", options=None):
}
def _format_distance_mm(value):
try:
return "{0:.1f} mm".format(float(value))
except Exception:
return "未知距离"
def _format_point_text(point):
if not isinstance(point, dict):
return "未知位置"
try:
return "({0:.1f}, {1:.1f}, {2:.1f})".format(
float(point.get("x", 0.0)),
float(point.get("y", 0.0)),
float(point.get("z", 0.0)),
)
except Exception:
return "未知位置"
def _diagnostic_terminal_text(item):
if not isinstance(item, dict):
return "未知端子"
return (
item.get("terminal_uuid")
or item.get("label")
or item.get("name")
or "未知端子"
)
def _dict_items(value):
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def format_routing_path_network_report(diagnostic):
"""Return an actionable Chinese summary for routing path network diagnostics."""
if not isinstance(diagnostic, dict):
return "布线路径网络检查失败:诊断结果无效。"
summary = diagnostic.get("summary", {}) if isinstance(diagnostic.get("summary", {}), dict) else {}
issues = _dict_items(diagnostic.get("issues", []) or [])
if not issues:
return "布线路径网络检查通过:{0} 条 carrier / {1} 段 / {2} 个节点。".format(
summary.get("carriers", 0),
summary.get("segments", 0),
summary.get("nodes", 0),
)
message = "布线路径网络检查发现 {0} 类问题。".format(len(issues))
unconnected = _dict_items(diagnostic.get("unconnected_terminals", []) or [])
if unconnected:
sample = unconnected[0]
message += "\n端子未接入:{0},距离最近网络 {1}。请重新生成布线路径网络,或补一段线槽/辅助路径到该端子。".format(
_diagnostic_terminal_text(sample),
_format_distance_mm(sample.get("nearest_network_distance_mm")),
)
possible_breaks = _dict_items(diagnostic.get("possible_breaks", []) or [])
if possible_breaks:
sample = possible_breaks[0]
carrier = sample.get("carrier", {}) if isinstance(sample.get("carrier", {}), dict) else {}
carrier_text = carrier.get("label") or carrier.get("name") or "未知线槽"
message += "\n线槽端点疑似断开:{0} @ {1}。请补齐相邻线槽、开口或辅助路径。".format(
carrier_text,
_format_point_text(sample.get("point")),
)
isolated = _dict_items(diagnostic.get("isolated_components", []) or [])
if isolated:
sample = isolated[0]
carriers = sample.get("carrier_labels") or sample.get("carrier_names") or []
carrier_text = "".join([str(item) for item in carriers[:3]]) if carriers else "未知 carrier"
message += "\n存在孤立路径网络:{0}。请用线槽/辅助路径把孤立网络接入主网络。".format(carrier_text)
if not (unconnected or possible_breaks or isolated):
first_issue = issues[0]
message += "\n首个问题:{0} ({1})。".format(
first_issue.get("code", "unknown"),
first_issue.get("count", 0),
)
return message
def update_eplan_routing_path_network(doc, project_uuid="", options=None, selection_ex=None):
"""Update the routing path network before EPLAN-style Route."""
return generate_eplan_routing_path_network(

@ -276,10 +276,25 @@ class AutoRoutingTaskPanel:
try:
result = self.controller.check_routing_path_network()
diagnostic = result.get("diagnostic", {}) if isinstance(result.get("diagnostic", {}), dict) else {}
issues = diagnostic.get("issues", []) or []
summary = diagnostic.get("summary", {}) if isinstance(diagnostic.get("summary", {}), dict) else {}
if not issues:
self._set_status(
"布线路径网络检查通过:{0} 条 carrier / {1} 段 / {2} 个节点。{3}".format(
summary.get("carriers", 0),
summary.get("segments", 0),
summary.get("nodes", 0),
self.controller.summary(),
)
)
return
first_issue = issues[0]
self._set_status(
"{0}{1}".format(
AutoRouting.format_routing_path_network_report(diagnostic),
"\n" + self.controller.summary(),
"布线路径网络检查发现 {0} 类问题:{1} ({2})。{3}".format(
len(issues),
first_issue.get("code", ""),
first_issue.get("count", 0),
self.controller.summary(),
)
)
except Exception as exc:

@ -409,21 +409,7 @@ def _ensure_vector_list_property(obj, prop_name, description):
)
def _ensure_integer_property(obj, prop_name, description, value):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty(
"App::PropertyInteger",
prop_name,
PROPERTY_GROUP,
description,
)
try:
setattr(obj, prop_name, int(value))
except Exception:
setattr(obj, prop_name, 0)
def _set_route_carrier_semantics(obj, project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
def _set_route_carrier_semantics(obj, project_uuid="", kind=ROUTE_CARRIER_KIND):
TerminalObjects.ensure_string_property(
obj,
"QetRoutingRole",
@ -452,26 +438,9 @@ def _set_route_carrier_semantics(obj, project_uuid="", kind=ROUTE_CARRIER_KIND,
"Whether routing connections can use this path",
True,
)
_ensure_integer_property(
obj,
"QetRouteCarrierCapacity",
"How many routed wires can reuse this carrier segment before detouring is preferred",
capacity,
)
return obj
def _route_carrier_capacity_value(obj, default=1):
for property_name in ("QetRouteCarrierCapacity", "QetWireCapacity"):
try:
value = int(float(getattr(obj, property_name, 0) or 0))
except Exception:
value = 0
if value > 0:
return value
return int(default or 1)
def _set_wire_duct_source_semantics(source):
if source is None:
return
@ -489,12 +458,6 @@ def _set_wire_duct_source_semantics(source):
"How routing connection collision checks should treat this object",
WIRE_DUCT_OBSTACLE_MODE,
)
_ensure_integer_property(
source,
"QetRouteCarrierCapacity",
"How many routed wires can reuse generated wire duct segments before detouring is preferred",
_route_carrier_capacity_value(source, default=1),
)
def _set_support_surface_source_semantics(source):
@ -588,7 +551,11 @@ def _create_carrier_geometry(doc, name, points):
return obj
def _normalized_route_points(points):
def create_route_carrier(doc, points, label="", project_uuid="", kind=ROUTE_CARRIER_KIND):
"""Create a routable carrier from ordered 3D points."""
if doc is None:
raise RoutingNetworkError("No FreeCAD document is available.")
normalized = []
for point in points or []:
vector = _vector(point)
@ -596,48 +563,20 @@ def _normalized_route_points(points):
continue
if not normalized or _distance(normalized[-1], vector) > DEFAULT_NODE_TOLERANCE:
normalized.append(vector)
return normalized
def _set_route_carrier_points(carrier, points):
_ensure_vector_list_property(
carrier,
"Points",
"Ordered centerline points used by the 3D router",
)
carrier.Points = list(points)
try:
import Part
carrier.Shape = Part.makePolygon(points)
except Exception:
pass
def _update_route_carrier(carrier, points, project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
normalized = _normalized_route_points(points)
if len(normalized) < 2:
return False
_set_route_carrier_points(carrier, normalized)
_set_route_carrier_semantics(carrier, project_uuid=project_uuid, kind=kind, capacity=capacity)
_style_route_carrier(carrier, kind)
return True
def create_route_carrier(doc, points, label="", project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
"""Create a routable carrier from ordered 3D points."""
if doc is None:
raise RoutingNetworkError("No FreeCAD document is available.")
normalized = _normalized_route_points(points)
if len(normalized) < 2:
raise RoutingNetworkError("A route carrier requires at least two distinct points.")
name = _unique_name(doc, "QETRouteCarrier")
carrier = _create_carrier_geometry(doc, name, normalized)
carrier.Label = label or "QET Route Carrier"
_set_route_carrier_points(carrier, normalized)
_set_route_carrier_semantics(carrier, project_uuid=project_uuid, kind=kind, capacity=capacity)
_ensure_vector_list_property(
carrier,
"Points",
"Ordered centerline points used by the 3D router",
)
carrier.Points = list(normalized)
_set_route_carrier_semantics(carrier, project_uuid=project_uuid, kind=kind)
group = WiringObjects.ensure_carrier_group(doc, project_uuid)
if carrier not in getattr(group, "Group", []):
@ -1420,40 +1359,6 @@ def _wire_duct_centerline_from_bbox(bbox, margin=DEFAULT_WIRE_DUCT_MARGIN, min_a
).get("centerline", [])
def _sync_wire_duct_source_carriers(doc, source, spec, project_uuid="", capacity=1):
carriers = _live_source_carriers(doc, source)
if not carriers:
return False
desired = [
(spec.get("centerline", []), ROUTE_CARRIER_KIND_WIRE_DUCT),
]
desired.extend(
(points, ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END)
for points in (spec.get("open_ends", []) or [])
)
updated = []
for carrier, desired_item in zip(carriers, desired):
points, kind = desired_item
if _update_route_carrier(
carrier,
points,
project_uuid=project_uuid,
kind=kind,
capacity=capacity,
):
updated.append(carrier)
if updated:
_mark_wire_duct_source(source, updated[0], updated)
try:
doc.recompute()
except Exception:
pass
return True
def _wiring_cut_out_points_from_bbox(bbox):
extents = _bbox_extents(bbox)
if not extents:
@ -1494,57 +1399,7 @@ def _wire_duct_sources_from_selection(selection_ex):
return sources
def _route_source_carrier_names(source):
names = []
try:
raw = (getattr(source, "QetRouteCarrierNamesJson", "") or "").strip()
if raw:
parsed = json.loads(raw)
if isinstance(parsed, list):
names.extend(str(item).strip() for item in parsed if str(item).strip())
except Exception:
names = []
carrier_name = (getattr(source, "QetRouteCarrierName", "") or "").strip()
if carrier_name:
names.insert(0, carrier_name)
result = []
seen = set()
for name in names:
if name in seen:
continue
seen.add(name)
result.append(name)
return result
def _live_source_carriers(doc, source):
if doc is None or source is None:
return []
carriers = []
for carrier_name in _route_source_carrier_names(source):
carrier = doc.getObject(carrier_name)
if carrier is not None and is_route_carrier(carrier):
carriers.append(carrier)
return carriers
def _remember_source_carriers(source, carriers):
live_names = [
getattr(carrier, "Name", "")
for carrier in (carriers or [])
if carrier is not None and getattr(carrier, "Name", "")
]
if live_names:
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierNamesJson",
PROPERTY_GROUP,
"Generated route carriers for this source",
json.dumps(live_names, ensure_ascii=False),
)
def _mark_wire_duct_source(source, carrier, carriers=None):
def _mark_wire_duct_source(source, carrier):
if source is None:
return
try:
@ -1557,7 +1412,6 @@ def _mark_wire_duct_source(source, carrier, carriers=None):
"Generated route carrier for this source",
getattr(carrier, "Name", ""),
)
_remember_source_carriers(source, carriers or ([carrier] if carrier is not None else []))
except Exception:
pass
@ -1617,8 +1471,13 @@ def _mark_terminal_access_source(source, carrier):
def _live_source_carrier(doc, source):
carriers = _live_source_carriers(doc, source)
return carriers[0] if carriers else None
carrier_name = (getattr(source, "QetRouteCarrierName", "") or "").strip()
if not carrier_name or doc is None:
return None
carrier = doc.getObject(carrier_name)
if carrier is not None and is_route_carrier(carrier):
return carrier
return None
def detect_wire_duct_sources(doc, min_aspect=DEFAULT_AUTO_WIRE_DUCT_MIN_ASPECT):
@ -1714,6 +1573,8 @@ def create_wire_duct_carriers_from_document(
"""Auto-detect wire duct objects in the document and create WireDuct centerlines."""
created = []
for index, source in enumerate(detect_wire_duct_sources(doc, min_aspect=min_aspect), start=1):
if _live_source_carrier(doc, source) is not None:
continue
bbox = _bound_box_from_object(source)
if bbox is None:
continue
@ -1726,39 +1587,27 @@ def create_wire_duct_carriers_from_document(
if len(points) < 2:
continue
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Wire Duct"
capacity = _route_carrier_capacity_value(source, default=1)
if _sync_wire_duct_source_carriers(
doc,
source,
spec,
project_uuid=project_uuid,
capacity=capacity,
):
continue
carrier = create_route_carrier(
doc,
points,
label="QET Auto Wire Duct Centerline {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT,
capacity=capacity,
)
source_created = [carrier]
_mark_wire_duct_source(source, carrier)
created.append(carrier)
for end_index, open_end_points in enumerate(spec.get("open_ends", []) or [], start=1):
if len(open_end_points) < 2:
continue
open_end_carrier = create_route_carrier(
doc,
open_end_points,
label="QET Auto Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
capacity=capacity,
created.append(
create_route_carrier(
doc,
open_end_points,
label="QET Auto Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
)
)
source_created.append(open_end_carrier)
created.append(open_end_carrier)
_mark_wire_duct_source(source, carrier, source_created)
return created
@ -2039,39 +1888,27 @@ def create_wire_duct_carriers_from_selection(
if len(points) < 2:
continue
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Wire Duct"
capacity = _route_carrier_capacity_value(source, default=1)
if _sync_wire_duct_source_carriers(
doc,
source,
spec,
project_uuid=project_uuid,
capacity=capacity,
):
continue
carrier = create_route_carrier(
doc,
points,
label="QET Wire Duct Centerline {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT,
capacity=capacity,
)
source_created = [carrier]
_mark_wire_duct_source(source, carrier)
created.append(carrier)
for end_index, open_end_points in enumerate(spec.get("open_ends", []) or [], start=1):
if len(open_end_points) < 2:
continue
open_end_carrier = create_route_carrier(
doc,
open_end_points,
label="QET Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
capacity=capacity,
created.append(
create_route_carrier(
doc,
open_end_points,
label="QET Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
)
)
source_created.append(open_end_carrier)
created.append(open_end_carrier)
_mark_wire_duct_source(source, carrier, source_created)
return created
@ -2340,91 +2177,6 @@ def nearest_point_on_network(network, point):
return nearest_node(network, target)
def connect_point_to_network(network, point):
"""Connect the closest projected point to a route graph and return key/distance/mode."""
if not isinstance(network, dict):
return None, None, "none"
nodes = network.get("nodes", {}) or {}
edges = network.get("edges", {}) or {}
if not nodes or not edges:
return None, None, "none"
tolerance = float(network.get("tolerance", DEFAULT_NODE_TOLERANCE) or DEFAULT_NODE_TOLERANCE)
target = _vector(point)
best = None
seen = set()
for key, neighbors in edges.items():
start = nodes.get(key)
if start is None:
continue
for next_key, _weight, carrier in neighbors:
pair = tuple(sorted((key, next_key)))
if pair in seen:
continue
seen.add(pair)
end = nodes.get(next_key)
if end is None:
continue
projected = _closest_point_on_segment(target, start, end)
distance = _distance(target, projected)
if best is None or distance < best["distance"]:
best = {
"key": key,
"next_key": next_key,
"carrier": carrier,
"point": projected,
"distance": distance,
}
if best is None:
node_key, distance = nearest_node(network, target)
return node_key, distance, "node" if node_key is not None else "none"
projected_key = _point_key(best["point"], tolerance=tolerance)
if projected_key in nodes:
return projected_key, best["distance"], "node"
start_key = best["key"]
end_key = best["next_key"]
start = nodes[start_key]
end = nodes[end_key]
carrier = best["carrier"]
def remove_edge_once(left_key, right_key, fallback_to_pair=False):
neighbors = list(edges.get(left_key, []) or [])
for index, (candidate_key, _weight, candidate_carrier) in enumerate(neighbors):
if candidate_key == right_key and candidate_carrier is carrier:
del neighbors[index]
edges[left_key] = neighbors
return True
if fallback_to_pair:
for index, (candidate_key, _weight, _candidate_carrier) in enumerate(neighbors):
if candidate_key == right_key:
del neighbors[index]
edges[left_key] = neighbors
return True
return False
removed_forward = remove_edge_once(start_key, end_key)
remove_edge_once(end_key, start_key, fallback_to_pair=removed_forward)
nodes[projected_key] = best["point"]
edges[projected_key] = []
added_segments = 0
for left_key, left_point, right_key, right_point in (
(start_key, start, projected_key, best["point"]),
(projected_key, best["point"], end_key, end),
):
weight = _distance(left_point, right_point)
if weight <= tolerance:
continue
edges[left_key].append((right_key, weight, carrier))
edges[right_key].append((left_key, weight, carrier))
added_segments += 1
network["segment_count"] = max(int(network.get("segment_count", 0) or 0) - 1 + added_segments, 0)
return projected_key, best["distance"], "segment_projection"
def _carrier_track_payload(carrier):
return {
"name": getattr(carrier, "Name", ""),
@ -2433,36 +2185,7 @@ def _carrier_track_payload(carrier):
}
def _segment_usage_key(carrier, from_key, to_key):
carrier_name = getattr(carrier, "Name", "") if carrier is not None else ""
return (
carrier_name,
tuple(sorted((from_key, to_key))),
)
def _carrier_capacity(carrier):
if carrier is None:
return 1
for property_name in ("QetRouteCarrierCapacity", "QetWireCapacity"):
try:
value = int(float(getattr(carrier, property_name, 0) or 0))
except Exception:
value = 0
if value > 0:
return value
return 1
def shortest_path_with_carriers(
network,
start_key,
end_key,
bend_penalty=0.0,
kind_cost_factors=None,
segment_usage_costs=None,
segment_reuse_penalty=0.0,
):
def shortest_path_with_carriers(network, start_key, end_key, bend_penalty=0.0, kind_cost_factors=None):
"""Dijkstra search with a small extra cost when route direction changes."""
if start_key is None or end_key is None:
return None
@ -2535,19 +2258,8 @@ def shortest_path_with_carriers(
bend_cost = 0.0
if previous_direction is not None and direction != previous_direction:
bend_cost = float(bend_penalty or 0.0)
usage_cost = 0.0
if segment_usage_costs:
usage_count = float(segment_usage_costs.get(_segment_usage_key(carrier, key, next_key), 0.0) or 0.0)
capacity = float(_carrier_capacity(carrier))
excess_usage = max(usage_count - capacity + 1.0, 0.0)
usage_cost = excess_usage * float(segment_reuse_penalty or 0.0)
next_state = (next_key, direction)
next_cost = (
cost
+ float(weight) * _carrier_cost_factor(carrier, kind_cost_factors)
+ bend_cost
+ usage_cost
)
next_cost = cost + float(weight) * _carrier_cost_factor(carrier, kind_cost_factors) + bend_cost
if next_cost < distances.get(next_state, float("inf")):
distances[next_state] = next_cost
previous[next_state] = {

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save