You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3073 lines
101 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# FreeCADExchange route carrier network helpers.
#
# 这个模块只管理 FreeCAD 文档里的走线网络,不写数据库。
# 第一版的思路是:用户或模板把线槽/导轨中心线标成 carrier
# 布线连接算法再沿这些 carrier 做最短路搜索。
import heapq
import json
import math
import FreeCAD as App
import TerminalObjects
import WiringObjects
ROUTING_ROLE = "RoutingCarrier"
ROUTE_CARRIER_KIND = "RoutingPath"
ROUTE_CARRIER_KIND_WIRE_DUCT = "WireDuct"
ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END = "WireDuctOpenEnd"
ROUTE_CARRIER_KIND_WIRING_CUT_OUT = "WiringCutOut"
ROUTE_CARRIER_KIND_AUXILIARY_PATH = "AuxiliaryPath"
ROUTE_CARRIER_KIND_ROUTING_RANGE = "RoutingRange"
ROUTE_CARRIER_KIND_TERMINAL_ACCESS = "TerminalAccess"
MANAGED_ROUTE_SOURCE_KINDS = {
ROUTE_CARRIER_KIND_WIRE_DUCT,
ROUTE_CARRIER_KIND_WIRING_CUT_OUT,
ROUTE_CARRIER_KIND_ROUTING_RANGE,
}
PROPERTY_GROUP = "QET Routing"
DEFAULT_NODE_TOLERANCE = 0.001
DEFAULT_SURFACE_LANE_SPACING = 100.0
DEFAULT_SURFACE_OFFSET = 5.0
DEFAULT_SURFACE_MARGIN = 20.0
DEFAULT_WIRE_DUCT_MARGIN = 20.0
DEFAULT_WIRE_DUCT_OPEN_END_MIN_LENGTH = 20.0
DEFAULT_ROUTE_PATH_FACE_OFFSET = 2.0
DEFAULT_AUTO_WIRE_DUCT_MIN_ASPECT = 2.5
DEFAULT_TERMINAL_ACCESS_MAX_DISTANCE = 1000.0
DEFAULT_ADJOINING_DUCT_TOLERANCE = 5.0
DEFAULT_WIRING_CUT_OUT_BRIDGE_EXTENSION = 20.0
WIRE_DUCT_OBSTACLE_MODE = "PassThrough"
SUPPORT_SURFACE_OBSTACLE_MODE = "SupportSurface"
WIRE_DUCT_NAME_KEYWORDS = (
"wire duct",
"wiring duct",
"cable duct",
"cable tray",
"trunking",
"wireway",
"线槽",
"走线槽",
"走线",
"电缆槽",
"配线槽",
)
WIRE_DUCT_EXCLUDE_KEYWORDS = (
"cabinet",
"door",
"panel",
"backplate",
"base plate",
"mounting plate",
"机柜",
"柜体",
"门板",
"安装板",
"背板",
"底板",
)
WIRING_CUT_OUT_NAME_KEYWORDS = (
"wiring cut-out",
"wiring cutout",
"wire cut-out",
"wire cutout",
"cable cut-out",
"cable cutout",
"through hole",
"pass-through",
"passthrough",
"穿线孔",
"过线孔",
"开孔",
"过线",
)
SUPPORT_SURFACE_NAME_KEYWORDS = (
"mounting plate",
"base plate",
"back plate",
"backplate",
"panel",
"door panel",
"rear door",
"front door",
"cabinet face",
"cabinet panel",
"\u5b89\u88c5\u677f",
"\u80cc\u677f",
"\u5e95\u677f",
"\u95e8\u677f",
"\u67dc\u9762",
)
SUPPORT_SURFACE_CARRIER_KINDS = {
"cabinet",
"panel",
"cabinet_face",
"mounting_plate",
"routing_range",
}
DEFAULT_KIND_COST_FACTORS = {
ROUTE_CARRIER_KIND_WIRE_DUCT: 1.0,
ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END: 1.0,
ROUTE_CARRIER_KIND_WIRING_CUT_OUT: 1.0,
ROUTE_CARRIER_KIND: 1.0,
ROUTE_CARRIER_KIND_AUXILIARY_PATH: 2.0,
ROUTE_CARRIER_KIND_TERMINAL_ACCESS: 2.0,
ROUTE_CARRIER_KIND_ROUTING_RANGE: 8.0,
"UserPath": 1.0,
}
ROUTE_CARRIER_VIEW_STYLES = {
ROUTE_CARRIER_KIND_WIRE_DUCT: {
"color": (1.0, 0.55, 0.0),
"width": 4.0,
},
ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END: {
"color": (1.0, 0.72, 0.2),
"width": 3.0,
},
ROUTE_CARRIER_KIND_WIRING_CUT_OUT: {
"color": (0.0, 0.72, 0.85),
"width": 3.0,
},
ROUTE_CARRIER_KIND_ROUTING_RANGE: {
"color": (0.0, 0.65, 0.35),
"width": 1.0,
},
ROUTE_CARRIER_KIND_TERMINAL_ACCESS: {
"color": (0.65, 0.2, 1.0),
"width": 2.0,
},
ROUTE_CARRIER_KIND_AUXILIARY_PATH: {
"color": (0.45, 0.45, 0.45),
"width": 2.0,
},
ROUTE_CARRIER_KIND: {
"color": (0.0, 0.45, 0.85),
"width": 2.0,
},
}
class RoutingNetworkError(RuntimeError):
pass
class _SimpleBoundBox:
def __init__(self, xmin, xmax, ymin, ymax, zmin, zmax):
self.XMin = float(xmin)
self.XMax = float(xmax)
self.YMin = float(ymin)
self.YMax = float(ymax)
self.ZMin = float(zmin)
self.ZMax = float(zmax)
class _PointVertex:
def __init__(self, point):
self.Point = point
class _BBoxFace:
ShapeType = "Face"
def __init__(self, points, normal):
self.Vertexes = [_PointVertex(point) for point in points]
self._normal = normal
self.QetSurfaceUAxis = _subtract(points[1], points[0]) if len(points) > 1 else None
self.CenterOfMass = _average_points(points)
xs = [point.x for point in points]
ys = [point.y for point in points]
zs = [point.z for point in points]
self.BoundBox = _SimpleBoundBox(min(xs), max(xs), min(ys), max(ys), min(zs), max(zs))
def normalAt(self, _u, _v):
return self._normal
def _vector(point):
if isinstance(point, App.Vector):
return App.Vector(point.x, point.y, point.z)
if isinstance(point, (list, tuple)) and len(point) >= 3:
return App.Vector(float(point[0]), float(point[1]), float(point[2]))
if isinstance(point, dict):
return App.Vector(
float(point.get("x", 0.0)),
float(point.get("y", 0.0)),
float(point.get("z", 0.0)),
)
if all(hasattr(point, name) for name in ("x", "y", "z")):
return App.Vector(float(point.x), float(point.y), float(point.z))
raise RoutingNetworkError("Route carrier point must be a 3D point.")
def _distance(left, right):
dx = float(left.x) - float(right.x)
dy = float(left.y) - float(right.y)
dz = float(left.z) - float(right.z)
return (dx * dx + dy * dy + dz * dz) ** 0.5
def _add(left, right):
return App.Vector(
float(left.x) + float(right.x),
float(left.y) + float(right.y),
float(left.z) + float(right.z),
)
def _subtract(left, right):
return App.Vector(
float(left.x) - float(right.x),
float(left.y) - float(right.y),
float(left.z) - float(right.z),
)
def _scale(vector, factor):
return App.Vector(
float(vector.x) * float(factor),
float(vector.y) * float(factor),
float(vector.z) * float(factor),
)
def _closest_point_on_segment(point, start, end):
target = _vector(point)
start = _vector(start)
end = _vector(end)
segment = _subtract(end, start)
length_squared = _dot(segment, segment)
if length_squared <= DEFAULT_NODE_TOLERANCE * DEFAULT_NODE_TOLERANCE:
return start
parameter = _dot(_subtract(target, start), segment) / length_squared
parameter = max(0.0, min(1.0, parameter))
return _add(start, _scale(segment, parameter))
def _dot(left, right):
return (
float(left.x) * float(right.x)
+ float(left.y) * float(right.y)
+ float(left.z) * float(right.z)
)
def _cross(left, right):
return App.Vector(
float(left.y) * float(right.z) - float(left.z) * float(right.y),
float(left.z) * float(right.x) - float(left.x) * float(right.z),
float(left.x) * float(right.y) - float(left.y) * float(right.x),
)
def _normalize(vector):
length = _distance(vector, App.Vector(0, 0, 0))
if length <= DEFAULT_NODE_TOLERANCE:
return None
return _scale(vector, 1.0 / length)
def _direction_key(left, right, tolerance=DEFAULT_NODE_TOLERANCE):
dx = float(right.x) - float(left.x)
dy = float(right.y) - float(left.y)
dz = float(right.z) - float(left.z)
length = (dx * dx + dy * dy + dz * dz) ** 0.5
if length <= tolerance:
return (0, 0, 0)
return (
int(round(dx / length * 1000.0)),
int(round(dy / length * 1000.0)),
int(round(dz / length * 1000.0)),
)
def _dominant_axis(vector):
components = {
"x": abs(float(getattr(vector, "x", 0.0))),
"y": abs(float(getattr(vector, "y", 0.0))),
"z": abs(float(getattr(vector, "z", 0.0))),
}
axis = max(components, key=components.get)
if components[axis] <= 0.000001:
return None
return axis
def _axis_value(point, axis):
return float(getattr(point, axis, 0.0))
def _set_axis(point, axis, value):
return App.Vector(
float(value) if axis == "x" else float(point.x),
float(value) if axis == "y" else float(point.y),
float(value) if axis == "z" else float(point.z),
)
def _bound_box_from_object(obj):
if obj is None:
return None
shape = getattr(obj, "Shape", None)
bbox = getattr(shape, "BoundBox", None)
if bbox is not None:
return bbox
bbox = getattr(obj, "BoundBox", None)
if bbox is not None:
return bbox
merged = None
for child in list(getattr(obj, "Group", []) or []):
child_bbox = _bound_box_from_object(child)
if child_bbox is None:
continue
if merged is None:
merged = _SimpleBoundBox(
child_bbox.XMin,
child_bbox.XMax,
child_bbox.YMin,
child_bbox.YMax,
child_bbox.ZMin,
child_bbox.ZMax,
)
continue
merged = _SimpleBoundBox(
min(merged.XMin, child_bbox.XMin),
max(merged.XMax, child_bbox.XMax),
min(merged.YMin, child_bbox.YMin),
max(merged.YMax, child_bbox.YMax),
min(merged.ZMin, child_bbox.ZMin),
max(merged.ZMax, child_bbox.ZMax),
)
return merged
def _bbox_center(bbox):
return App.Vector(
(float(bbox.XMin) + float(bbox.XMax)) * 0.5,
(float(bbox.YMin) + float(bbox.YMax)) * 0.5,
(float(bbox.ZMin) + float(bbox.ZMax)) * 0.5,
)
def _average_points(points):
points = list(points or [])
if not points:
return App.Vector(0, 0, 0)
total = App.Vector(0, 0, 0)
for point in points:
total = _add(total, point)
return _scale(total, 1.0 / len(points))
def _bbox_extent(bbox, axis):
low, high = _bbox_axis_range(bbox, axis)
return abs(high - low)
def _point_key(point, tolerance=DEFAULT_NODE_TOLERANCE):
scale = 1.0 / float(tolerance or DEFAULT_NODE_TOLERANCE)
return (
int(round(float(point.x) * scale)),
int(round(float(point.y) * scale)),
int(round(float(point.z) * scale)),
)
def _point_payload(point):
return {
"x": float(point.x),
"y": float(point.y),
"z": float(point.z),
}
def _is_finite_point(point):
try:
return all(
math.isfinite(float(getattr(point, axis, 0.0)))
for axis in ("x", "y", "z")
)
except Exception:
return False
def _unique_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _ensure_vector_list_property(obj, prop_name, description):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty(
"App::PropertyVectorList",
prop_name,
PROPERTY_GROUP,
description,
)
def _ensure_integer_property(obj, prop_name, description, value):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty(
"App::PropertyInteger",
prop_name,
PROPERTY_GROUP,
description,
)
try:
setattr(obj, prop_name, int(value))
except Exception:
setattr(obj, prop_name, 0)
def _ensure_float_property(obj, prop_name, description, value):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty(
"App::PropertyFloat",
prop_name,
PROPERTY_GROUP,
description,
)
try:
setattr(obj, prop_name, float(value))
except Exception:
setattr(obj, prop_name, 0.0)
def _wiring_cut_out_bridge_extension_value(source, default=DEFAULT_WIRING_CUT_OUT_BRIDGE_EXTENSION):
try:
value = float(getattr(source, "QetWiringCutOutBridgeExtensionMm", default) or 0.0)
except Exception:
value = float(default or 0.0)
if value < 0.0:
return 0.0
return value
def _set_route_carrier_semantics(obj, project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
TerminalObjects.ensure_string_property(
obj,
"QetRoutingRole",
PROPERTY_GROUP,
"Routing role marker",
ROUTING_ROLE,
)
TerminalObjects.ensure_string_property(
obj,
"QetRouteCarrierKind",
PROPERTY_GROUP,
"Route carrier kind",
kind,
)
TerminalObjects.ensure_string_property(
obj,
"QetProjectUuid",
PROPERTY_GROUP,
"Project UUID for this route carrier",
project_uuid,
)
TerminalObjects.ensure_bool_property(
obj,
"CanRouteWire",
PROPERTY_GROUP,
"Whether routing connections can use this path",
True,
)
_ensure_integer_property(
obj,
"QetRouteCarrierCapacity",
"How many routed wires can reuse this carrier segment before detouring is preferred",
capacity,
)
return obj
def _route_carrier_capacity_value(obj, default=1):
for property_name in ("QetRouteCarrierCapacity", "QetWireCapacity"):
try:
value = int(float(getattr(obj, property_name, 0) or 0))
except Exception:
value = 0
if value > 0:
return value
return int(default or 1)
def _set_wire_duct_source_semantics(source):
if source is None:
return
TerminalObjects.ensure_string_property(
source,
"QetRoutingSourceKind",
PROPERTY_GROUP,
"Routing source kind",
ROUTE_CARRIER_KIND_WIRE_DUCT,
)
TerminalObjects.ensure_string_property(
source,
"QetRoutingObstacleMode",
PROPERTY_GROUP,
"How routing connection collision checks should treat this object",
WIRE_DUCT_OBSTACLE_MODE,
)
_ensure_integer_property(
source,
"QetRouteCarrierCapacity",
"How many routed wires can reuse generated wire duct segments before detouring is preferred",
_route_carrier_capacity_value(source, default=1),
)
def _set_support_surface_source_semantics(source):
if source is None:
return
TerminalObjects.ensure_string_property(
source,
"QetRoutingSourceKind",
PROPERTY_GROUP,
"Routing source kind",
ROUTE_CARRIER_KIND_ROUTING_RANGE,
)
TerminalObjects.ensure_string_property(
source,
"QetRoutingObstacleMode",
PROPERTY_GROUP,
"How routing connection collision checks should treat this object",
SUPPORT_SURFACE_OBSTACLE_MODE,
)
def _set_wiring_cut_out_source_semantics(source, bridge_extension=DEFAULT_WIRING_CUT_OUT_BRIDGE_EXTENSION):
if source is None:
return
TerminalObjects.ensure_string_property(
source,
"QetRoutingSourceKind",
PROPERTY_GROUP,
"Routing source kind",
ROUTE_CARRIER_KIND_WIRING_CUT_OUT,
)
TerminalObjects.ensure_string_property(
source,
"QetRoutingObstacleMode",
PROPERTY_GROUP,
"How routing connection collision checks should treat this object",
WIRE_DUCT_OBSTACLE_MODE,
)
_ensure_float_property(
source,
"QetWiringCutOutBridgeExtensionMm",
"How far the generated wiring cut-out carrier extends beyond each side of the opening",
_wiring_cut_out_bridge_extension_value(source, default=bridge_extension),
)
def _style_route_carrier(carrier, kind):
style = ROUTE_CARRIER_VIEW_STYLES.get(kind) or ROUTE_CARRIER_VIEW_STYLES[ROUTE_CARRIER_KIND]
try:
carrier.ViewObject.Visibility = True
carrier.ViewObject.LineWidth = float(style.get("width", 2.0))
carrier.ViewObject.LineColor = style.get("color", (0.0, 0.45, 0.85))
# Keep routing helper geometry on stable solid-line rendering. Dashed/dotted
# Coin3D line rendering can make large FreeCAD scenes disappear while rotating.
if hasattr(carrier.ViewObject, "DrawStyle"):
carrier.ViewObject.DrawStyle = "Solid"
if hasattr(carrier.ViewObject, "DisplayMode"):
carrier.ViewObject.DisplayMode = "Wireframe"
except Exception:
pass
def _create_carrier_geometry(doc, name, points):
# Use a simple Part edge shape by default. It is less feature-rich than Draft
# Wire, but much more stable for large 3D scenes while rotating the view.
try:
import Part
obj = doc.addObject("Part::Feature", name)
obj.Shape = Part.makePolygon(points)
return obj
except Exception:
pass
if getattr(App, "ActiveDocument", None) is doc:
try:
import Draft
obj = Draft.make_wire(
points,
closed=False,
placement=None,
face=False,
support=None,
bs2wire=False,
)
if obj is not None:
try:
obj.MakeFace = False
except Exception:
pass
return obj
except Exception:
pass
obj = doc.addObject("App::FeaturePython", name)
return obj
def _normalized_route_points(points):
normalized = []
for point in points or []:
vector = _vector(point)
if not _is_finite_point(vector):
continue
if not normalized or _distance(normalized[-1], vector) > DEFAULT_NODE_TOLERANCE:
normalized.append(vector)
return normalized
def _set_route_carrier_points(carrier, points):
_ensure_vector_list_property(
carrier,
"Points",
"Ordered centerline points used by the 3D router",
)
carrier.Points = list(points)
try:
import Part
carrier.Shape = Part.makePolygon(points)
except Exception:
pass
def _update_route_carrier(carrier, points, project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
normalized = _normalized_route_points(points)
if len(normalized) < 2:
return False
_set_route_carrier_points(carrier, normalized)
_set_route_carrier_semantics(carrier, project_uuid=project_uuid, kind=kind, capacity=capacity)
_style_route_carrier(carrier, kind)
return True
def create_route_carrier(doc, points, label="", project_uuid="", kind=ROUTE_CARRIER_KIND, capacity=1):
"""Create a routable carrier from ordered 3D points."""
if doc is None:
raise RoutingNetworkError("No FreeCAD document is available.")
normalized = _normalized_route_points(points)
if len(normalized) < 2:
raise RoutingNetworkError("A route carrier requires at least two distinct points.")
name = _unique_name(doc, "QETRouteCarrier")
carrier = _create_carrier_geometry(doc, name, normalized)
carrier.Label = label or "QET Route Carrier"
_set_route_carrier_points(carrier, normalized)
_set_route_carrier_semantics(carrier, project_uuid=project_uuid, kind=kind, capacity=capacity)
group = WiringObjects.ensure_carrier_group(doc, project_uuid)
if carrier not in getattr(group, "Group", []):
group.addObject(carrier)
_style_route_carrier(carrier, kind)
try:
doc.recompute()
except Exception:
pass
return carrier
def is_route_carrier(obj):
if obj is None:
return False
role = (getattr(obj, "QetRoutingRole", "") or "").strip()
return role == ROUTING_ROLE and bool(getattr(obj, "CanRouteWire", False))
def _carrier_points(obj):
points = list(getattr(obj, "Points", []) or [])
if points:
return [_vector(point) for point in points]
shape = getattr(obj, "Shape", None)
ordered = getattr(shape, "OrderedVertexes", None)
if ordered:
return [_vector(vertex.Point) for vertex in ordered if getattr(vertex, "Point", None) is not None]
vertexes = getattr(shape, "Vertexes", None)
if vertexes:
return [_vector(vertex.Point) for vertex in vertexes if getattr(vertex, "Point", None) is not None]
return []
def _segment_axis(start, end, tolerance=DEFAULT_NODE_TOLERANCE):
varying = [
axis
for axis in ("x", "y", "z")
if abs(_axis_value(start, axis) - _axis_value(end, axis)) > tolerance
]
if len(varying) == 1:
return varying[0]
return None
def _between(value, first, second, tolerance=DEFAULT_NODE_TOLERANCE):
low = min(float(first), float(second)) - float(tolerance)
high = max(float(first), float(second)) + float(tolerance)
return low <= float(value) <= high
def _dedupe_points(points, tolerance=DEFAULT_NODE_TOLERANCE):
deduped = []
seen = set()
for point in points:
key = _point_key(point, tolerance=tolerance)
if key in seen:
continue
seen.add(key)
deduped.append(point)
return deduped
def _orthogonal_segment_intersections(
first_start,
first_end,
second_start,
second_end,
tolerance=DEFAULT_NODE_TOLERANCE,
):
first_axis = _segment_axis(first_start, first_end, tolerance=tolerance)
second_axis = _segment_axis(second_start, second_end, tolerance=tolerance)
if first_axis is None or second_axis is None:
return []
if first_axis == second_axis:
for axis in ("x", "y", "z"):
if axis == first_axis:
continue
if abs(_axis_value(first_start, axis) - _axis_value(second_start, axis)) > tolerance:
return []
first_low = min(_axis_value(first_start, first_axis), _axis_value(first_end, first_axis))
first_high = max(_axis_value(first_start, first_axis), _axis_value(first_end, first_axis))
second_low = min(_axis_value(second_start, second_axis), _axis_value(second_end, second_axis))
second_high = max(_axis_value(second_start, second_axis), _axis_value(second_end, second_axis))
overlap_low = max(first_low, second_low)
overlap_high = min(first_high, second_high)
if overlap_high < overlap_low - tolerance:
return []
if abs(overlap_high - overlap_low) <= tolerance:
return [_set_axis(first_start, first_axis, overlap_low)]
return [
_set_axis(first_start, first_axis, overlap_low),
_set_axis(first_start, first_axis, overlap_high),
]
remaining_axes = [axis for axis in ("x", "y", "z") if axis not in {first_axis, second_axis}]
if len(remaining_axes) != 1:
return []
shared_axis = remaining_axes[0]
if abs(_axis_value(first_start, shared_axis) - _axis_value(second_start, shared_axis)) > tolerance:
return []
first_axis_value = _axis_value(second_start, first_axis)
second_axis_value = _axis_value(first_start, second_axis)
if not _between(first_axis_value, _axis_value(first_start, first_axis), _axis_value(first_end, first_axis), tolerance):
return []
if not _between(second_axis_value, _axis_value(second_start, second_axis), _axis_value(second_end, second_axis), tolerance):
return []
coordinates = {
first_axis: first_axis_value,
second_axis: second_axis_value,
shared_axis: (_axis_value(first_start, shared_axis) + _axis_value(second_start, shared_axis)) * 0.5,
}
return [App.Vector(coordinates["x"], coordinates["y"], coordinates["z"])]
def _sorted_segment_points(start, end, points, tolerance=DEFAULT_NODE_TOLERANCE):
points = _dedupe_points(points, tolerance=tolerance)
axis = _segment_axis(start, end, tolerance=tolerance)
if axis is not None:
reverse = _axis_value(start, axis) > _axis_value(end, axis)
return sorted(points, key=lambda point: _axis_value(point, axis), reverse=reverse)
return sorted(points, key=lambda point: _distance(start, point))
def _segment_intersects_bbox_payload(start, end, bbox):
if not isinstance(bbox, dict):
return False
try:
t_min = 0.0
t_max = 1.0
for axis, min_key, max_key in (
("x", "xmin", "xmax"),
("y", "ymin", "ymax"),
("z", "zmin", "zmax"),
):
start_value = _axis_value(start, axis)
end_value = _axis_value(end, axis)
delta = end_value - start_value
low = float(bbox[min_key])
high = float(bbox[max_key])
if abs(delta) <= DEFAULT_NODE_TOLERANCE:
if start_value < low or start_value > high:
return False
continue
inv = 1.0 / delta
near = (low - start_value) * inv
far = (high - start_value) * inv
if near > far:
near, far = far, near
t_min = max(t_min, near)
t_max = min(t_max, far)
if t_min > t_max:
return False
except Exception:
return False
return True
def _segment_hits_blocked_bbox(start, end, blocked_bboxes):
for bbox in blocked_bboxes or []:
if _segment_intersects_bbox_payload(start, end, bbox):
return True
return False
def collect_route_carriers(doc):
if doc is None:
return []
group = None
try:
group = doc.getObject("QETWiring_02_Carriers")
except Exception:
group = None
candidates = []
if group is not None:
candidates.extend(list(getattr(group, "Group", []) or []))
candidates.extend(list(getattr(doc, "Objects", []) or []))
result = []
seen = set()
for obj in candidates:
if obj is None or id(obj) in seen:
continue
seen.add(id(obj))
if is_route_carrier(obj):
result.append(obj)
return result
def _detach_from_groups(doc, obj):
for parent in list(getattr(obj, "InList", []) or []):
group = list(getattr(parent, "Group", []) or [])
if obj not in group:
continue
try:
if hasattr(parent, "removeObject"):
parent.removeObject(obj)
else:
parent.Group = [child for child in group if child is not obj]
except Exception:
try:
parent.Group = [child for child in group if child is not obj]
except Exception:
pass
for parent in list(getattr(doc, "Objects", []) or []):
group = list(getattr(parent, "Group", []) or [])
if obj not in group:
continue
try:
if hasattr(parent, "removeObject"):
parent.removeObject(obj)
else:
parent.Group = [child for child in group if child is not obj]
except Exception:
try:
parent.Group = [child for child in group if child is not obj]
except Exception:
pass
def _remove_route_carriers(doc, carriers):
removed = 0
for carrier in list(carriers or []):
if carrier is None or not is_route_carrier(carrier):
continue
_detach_from_groups(doc, carrier)
try:
if doc.getObject(getattr(carrier, "Name", "")) is not None:
doc.removeObject(carrier.Name)
removed += 1
except Exception:
pass
return removed
def clear_route_carriers(doc):
"""Delete generated route carriers while keeping terminals and routed wires."""
removed = _remove_route_carriers(doc, collect_route_carriers(doc))
try:
doc.recompute()
except Exception:
pass
return removed
def _shape_center(shape):
bbox = getattr(shape, "BoundBox", None)
if bbox is None:
return None
return App.Vector(
(float(bbox.XMin) + float(bbox.XMax)) * 0.5,
(float(bbox.YMin) + float(bbox.YMax)) * 0.5,
(float(bbox.ZMin) + float(bbox.ZMax)) * 0.5,
)
def _edge_points(edge):
first = None
last = None
vertexes = list(getattr(edge, "Vertexes", []) or [])
if len(vertexes) >= 2:
first = getattr(vertexes[0], "Point", None)
last = getattr(vertexes[-1], "Point", None)
if first is not None and last is not None:
return [_vector(first), _vector(last)]
try:
first = edge.valueAt(edge.FirstParameter)
last = edge.valueAt(edge.LastParameter)
return [_vector(first), _vector(last)]
except Exception:
return []
def _is_route_path_source_object(obj):
if obj is None:
return False
type_id = (getattr(obj, "TypeId", "") or "").lower()
if "sketch" in type_id:
return True
if list(getattr(obj, "Points", []) or []):
return True
shape = getattr(obj, "Shape", None)
if shape is None:
return False
# SOLIDWORKS/EPLAN 的 routing path 是草图/线槽路径,不是把实体零件的全部边都当路径。
# 所以只有纯线状对象才允许整对象转换;带 Face/Solid 的实体必须显式选中边。
faces = list(getattr(shape, "Faces", []) or [])
solids = list(getattr(shape, "Solids", []) or [])
shells = list(getattr(shape, "Shells", []) or [])
if faces or solids or shells:
return False
return bool(list(getattr(shape, "Edges", []) or []))
def _routing_source_text(obj):
return " ".join(
str(value or "")
for value in (
getattr(obj, "Name", ""),
getattr(obj, "Label", ""),
getattr(obj, "QetCarrierKind", ""),
getattr(obj, "QetCarrierRoleLabel", ""),
getattr(obj, "QetRoutingSourceKind", ""),
)
).lower()
def _bbox_aspect_ratio(bbox):
extents = sorted(
(_bbox_extent(bbox, axis) for axis in ("x", "y", "z")),
reverse=True,
)
if not extents or extents[0] <= DEFAULT_NODE_TOLERANCE:
return 0.0
if len(extents) < 2 or extents[1] <= DEFAULT_NODE_TOLERANCE:
return float("inf")
return extents[0] / extents[1]
def _is_wire_duct_candidate(obj, min_aspect=DEFAULT_AUTO_WIRE_DUCT_MIN_ASPECT):
if obj is None:
return False
if is_route_carrier(obj) or TerminalObjects.is_terminal_object(obj):
return False
if (getattr(obj, "RouteType", "") or "").strip():
return False
text = _routing_source_text(obj)
if any(keyword in text for keyword in WIRE_DUCT_EXCLUDE_KEYWORDS):
return False
has_semantic_hint = (
(getattr(obj, "QetRoutingSourceKind", "") or "").strip() == ROUTE_CARRIER_KIND_WIRE_DUCT
or (getattr(obj, "QetCarrierKind", "") or "").strip().lower() == "wire_duct"
)
has_name_hint = any(keyword in text for keyword in WIRE_DUCT_NAME_KEYWORDS)
if not has_semantic_hint and not has_name_hint:
return False
bbox = _bound_box_from_object(obj)
if bbox is None:
return False
# 自动识别只接受明显细长的对象,避免把柜体、门板、安装板误判成线槽。
return _bbox_aspect_ratio(bbox) >= float(min_aspect or 1.0)
def _bbox_extents(bbox):
return {
axis: _bbox_extent(bbox, axis)
for axis in ("x", "y", "z")
}
def _is_thin_surface_bbox(bbox, min_surface_extent=50.0, max_thickness=40.0, thickness_ratio=0.2):
extents = _bbox_extents(bbox)
ordered = sorted(extents.values())
if len(ordered) < 3 or ordered[-1] <= DEFAULT_NODE_TOLERANCE:
return False
thickness = ordered[0]
second_extent = ordered[1]
longest = ordered[2]
if second_extent < float(min_surface_extent or 0.0):
return False
allowed_thickness = min(float(max_thickness or 0.0), longest * float(thickness_ratio or 0.0))
return thickness <= max(allowed_thickness, DEFAULT_NODE_TOLERANCE)
def _is_support_surface_candidate(obj):
if obj is None:
return False
if is_route_carrier(obj) or TerminalObjects.is_terminal_object(obj):
return False
if (getattr(obj, "RouteType", "") or "").strip():
return False
text = _routing_source_text(obj)
if any(keyword in text for keyword in WIRE_DUCT_NAME_KEYWORDS):
return False
carrier_kind = (getattr(obj, "QetCarrierKind", "") or "").strip().lower()
source_kind = (getattr(obj, "QetRoutingSourceKind", "") or "").strip()
has_semantic_hint = (
source_kind == ROUTE_CARRIER_KIND_ROUTING_RANGE
or carrier_kind in SUPPORT_SURFACE_CARRIER_KINDS
)
has_name_hint = any(keyword in text for keyword in SUPPORT_SURFACE_NAME_KEYWORDS)
if not has_semantic_hint and not has_name_hint:
return False
bbox = _bound_box_from_object(obj)
if bbox is None:
return False
return _is_thin_surface_bbox(bbox)
def _is_wiring_cut_out_candidate(obj):
if obj is None:
return False
if is_route_carrier(obj) or TerminalObjects.is_terminal_object(obj):
return False
if (getattr(obj, "RouteType", "") or "").strip():
return False
source_kind = (getattr(obj, "QetRoutingSourceKind", "") or "").strip()
carrier_kind = (getattr(obj, "QetCarrierKind", "") or "").strip().lower()
has_semantic_hint = (
source_kind == ROUTE_CARRIER_KIND_WIRING_CUT_OUT
or carrier_kind in {"wiring_cut_out", "wiring_cutout", "wire_cutout"}
)
text = _routing_source_text(obj)
has_name_hint = any(keyword in text for keyword in WIRING_CUT_OUT_NAME_KEYWORDS)
if not has_semantic_hint and not has_name_hint:
return False
return _bound_box_from_object(obj) is not None
def _support_face_from_bbox(bbox):
extents = _bbox_extents(bbox)
normal_axis = min(extents, key=extents.get)
surface_axes = sorted(
[axis for axis in ("x", "y", "z") if axis != normal_axis],
key=lambda axis: _bbox_extent(bbox, axis),
reverse=True,
)
normal_value = _bbox_axis_range(bbox, normal_axis)[1]
normal = App.Vector(
1.0 if normal_axis == "x" else 0.0,
1.0 if normal_axis == "y" else 0.0,
1.0 if normal_axis == "z" else 0.0,
)
first_axis = surface_axes[0]
second_axis = surface_axes[1]
first_low, first_high = _bbox_axis_range(bbox, first_axis)
second_low, second_high = _bbox_axis_range(bbox, second_axis)
points = []
for first_value, second_value in (
(first_low, second_low),
(first_high, second_low),
(first_high, second_high),
(first_low, second_high),
):
coordinates = {
normal_axis: normal_value,
first_axis: first_value,
second_axis: second_value,
}
points.append(App.Vector(coordinates["x"], coordinates["y"], coordinates["z"]))
return _BBoxFace(points, normal)
def _points_from_selection_item(selection_item):
points = []
for point in list(getattr(selection_item, "PickedPoints", []) or []):
points.append(_vector(point))
for sub_object in list(getattr(selection_item, "SubObjects", []) or []):
shape_type = (getattr(sub_object, "ShapeType", "") or "").lower()
if shape_type == "edge":
points.extend(_edge_points(sub_object))
continue
if shape_type == "vertex":
point = getattr(sub_object, "Point", None)
if point is not None:
points.append(_vector(point))
continue
center = _shape_center(sub_object)
if center is not None:
points.append(center)
obj = getattr(selection_item, "Object", None)
shape = getattr(obj, "Shape", None)
if shape is not None and _is_route_path_source_object(obj):
for edge in list(getattr(shape, "Edges", []) or []):
points.extend(_edge_points(edge))
if not points:
center = _shape_center(shape)
if center is not None:
points.append(center)
normalized = []
for point in points:
if not normalized or _distance(normalized[-1], point) > DEFAULT_NODE_TOLERANCE:
normalized.append(point)
return normalized
def _support_face_from_selection(selection_ex):
for item in selection_ex or []:
for sub_object in list(getattr(item, "SubObjects", []) or []):
if (getattr(sub_object, "ShapeType", "") or "").lower() == "face":
return sub_object
return None
def _selection_item_is_only_support_face(selection_item):
sub_objects = list(getattr(selection_item, "SubObjects", []) or [])
if not sub_objects:
return False
return all(
(getattr(sub_object, "ShapeType", "") or "").lower() == "face"
for sub_object in sub_objects
)
def _face_normal(face):
try:
return _vector(face.normalAt(0.5, 0.5))
except Exception:
pass
try:
return _vector(face.normalAt(0.0, 0.0))
except Exception:
pass
return None
def _bbox_axis_range(bbox, axis):
if axis == "x":
return float(bbox.XMin), float(bbox.XMax)
if axis == "y":
return float(bbox.YMin), float(bbox.YMax)
return float(bbox.ZMin), float(bbox.ZMax)
def _surface_grid_values(min_value, max_value, spacing, margin):
low = float(min_value) + float(margin)
high = float(max_value) - float(margin)
if high < low:
low = float(min_value)
high = float(max_value)
if abs(high - low) <= DEFAULT_NODE_TOLERANCE:
return [low]
spacing = max(float(spacing or DEFAULT_SURFACE_LANE_SPACING), 1.0)
values = [low]
current = low + spacing
while current < high - DEFAULT_NODE_TOLERANCE:
values.append(current)
current += spacing
if abs(values[-1] - high) > DEFAULT_NODE_TOLERANCE:
values.append(high)
return values
def _face_points(face):
points = []
for vertex in list(getattr(face, "Vertexes", []) or []):
point = getattr(vertex, "Point", None)
if point is not None:
points.append(_vector(point))
if points:
return points
bbox = getattr(face, "BoundBox", None)
if bbox is None:
return []
return [
App.Vector(bbox.XMin, bbox.YMin, bbox.ZMin),
App.Vector(bbox.XMin, bbox.YMin, bbox.ZMax),
App.Vector(bbox.XMin, bbox.YMax, bbox.ZMin),
App.Vector(bbox.XMin, bbox.YMax, bbox.ZMax),
App.Vector(bbox.XMax, bbox.YMin, bbox.ZMin),
App.Vector(bbox.XMax, bbox.YMin, bbox.ZMax),
App.Vector(bbox.XMax, bbox.YMax, bbox.ZMin),
App.Vector(bbox.XMax, bbox.YMax, bbox.ZMax),
]
def _face_origin(face, fallback_points):
center = getattr(face, "CenterOfMass", None)
if center is not None:
return _vector(center)
if fallback_points:
total = App.Vector(0, 0, 0)
for point in fallback_points:
total = _add(total, point)
return _scale(total, 1.0 / len(fallback_points))
bbox = getattr(face, "BoundBox", None)
if bbox is not None:
return _bbox_center(bbox)
return App.Vector(0, 0, 0)
def _face_u_axis(face, normal, points, origin):
explicit_axis = getattr(face, "QetSurfaceUAxis", None)
if explicit_axis is not None:
explicit_axis = _vector(explicit_axis)
candidate = _subtract(explicit_axis, _scale(normal, _dot(explicit_axis, normal)))
normalized = _normalize(candidate)
if normalized is not None:
return normalized
best = None
best_length = 0.0
for left in points:
for right in points:
candidate = _subtract(right, left)
candidate = _subtract(candidate, _scale(normal, _dot(candidate, normal)))
length = _distance(candidate, App.Vector(0, 0, 0))
if length > best_length:
best = candidate
best_length = length
if best is not None and best_length > DEFAULT_NODE_TOLERANCE:
return _normalize(best)
seed = App.Vector(1, 0, 0)
if abs(_dot(normal, seed)) > 0.9:
seed = App.Vector(0, 1, 0)
candidate = _subtract(seed, _scale(normal, _dot(seed, normal)))
return _normalize(candidate)
def _surface_face_grid_points(face, spacing, offset, margin):
normal = _normalize(_face_normal(face))
if normal is None:
return []
face_points = _face_points(face)
origin = _face_origin(face, face_points)
if not face_points:
return []
u_axis = _face_u_axis(face, normal, face_points, origin)
if u_axis is None:
return []
v_axis = _normalize(_cross(normal, u_axis))
if v_axis is None:
return []
projected_u = []
projected_v = []
for point in face_points:
relative = _subtract(point, origin)
projected_u.append(_dot(relative, u_axis))
projected_v.append(_dot(relative, v_axis))
first_values = _surface_grid_values(min(projected_u), max(projected_u), spacing, margin)
second_values = _surface_grid_values(min(projected_v), max(projected_v), spacing, margin)
if len(first_values) < 2 or len(second_values) < 2:
return []
plane_origin = _add(origin, _scale(normal, float(offset or 0.0)))
rows = []
for second_value in second_values:
row = []
for first_value in first_values:
point = _add(
_add(plane_origin, _scale(u_axis, first_value)),
_scale(v_axis, second_value),
)
row.append(point)
rows.append(row)
columns = []
for first_index in range(len(first_values)):
column = []
for row in rows:
column.append(row[first_index])
columns.append(column)
# 行和列都要生成 carrierDijkstra 才能在网格交点处横竖换向。
return rows + columns
def _project_points_to_face(points, face, offset=DEFAULT_ROUTE_PATH_FACE_OFFSET):
normal = _normalize(_face_normal(face))
if normal is None:
return list(points or [])
face_points = _face_points(face)
origin = _face_origin(face, face_points)
distances = [_dot(_subtract(point, origin), normal) for point in points or []]
if not distances:
return []
# 保留线段原本所在的面侧,避免投影到板子的背面。
average_distance = sum(distances) / float(len(distances))
signed_offset = abs(float(offset or 0.0))
if average_distance < 0.0:
signed_offset = -signed_offset
projected = []
for point, distance in zip(points, distances):
projected.append(_subtract(point, _scale(normal, distance - signed_offset)))
return projected
def create_carriers_from_selection(doc, selection_ex, project_uuid="", kind=ROUTE_CARRIER_KIND):
created = []
support_face = _support_face_from_selection(selection_ex)
for index, item in enumerate(selection_ex or [], start=1):
if support_face is not None and _selection_item_is_only_support_face(item):
continue
points = _points_from_selection_item(item)
if len(points) < 2:
continue
if support_face is not None:
# 如果同时选中了支撑面和草图/线段,先把草图点投影到支撑面的平面上。
# Draft 自身只记录工作平面坐标,不会自动吸附到柜板面。
points = _project_points_to_face(points, support_face)
created.append(
create_route_carrier(
doc,
points,
label="QET Route Carrier {0}".format(index),
project_uuid=project_uuid,
kind=kind,
)
)
return created
def _wire_duct_centerline_spec_from_bbox(bbox, margin=DEFAULT_WIRE_DUCT_MARGIN, min_aspect=1.5):
extents = {
axis: _bbox_extent(bbox, axis)
for axis in ("x", "y", "z")
}
main_axis = max(extents, key=extents.get)
sorted_extents = sorted(extents.values(), reverse=True)
if sorted_extents[0] <= DEFAULT_NODE_TOLERANCE:
return {"centerline": [], "open_ends": []}
if len(sorted_extents) > 1 and sorted_extents[1] > DEFAULT_NODE_TOLERANCE:
if sorted_extents[0] / sorted_extents[1] < float(min_aspect or 1.0):
return {"centerline": [], "open_ends": []}
low, high = _bbox_axis_range(bbox, main_axis)
center = _bbox_center(bbox)
usable_margin = max(float(margin or 0.0), 0.0)
if abs(high - low) <= usable_margin * 2.0:
usable_margin = 0.0
start = _set_axis(center, main_axis, low + usable_margin)
end = _set_axis(center, main_axis, high - usable_margin)
if _distance(start, end) <= DEFAULT_NODE_TOLERANCE:
return {"centerline": [], "open_ends": []}
cross_axes = sorted(
[axis for axis in ("x", "y", "z") if axis != main_axis],
key=lambda axis: _bbox_extent(bbox, axis),
reverse=True,
)
open_ends = []
if cross_axes:
cross_axis = cross_axes[0]
cross_extent = _bbox_extent(bbox, cross_axis)
half_length = max(
min(cross_extent * 0.5, float(margin or DEFAULT_WIRE_DUCT_MARGIN)),
min(cross_extent * 0.5, DEFAULT_WIRE_DUCT_OPEN_END_MIN_LENGTH * 0.5),
)
if half_length > DEFAULT_NODE_TOLERANCE:
for endpoint in (start, end):
open_ends.append(
[
_set_axis(endpoint, cross_axis, _axis_value(center, cross_axis) - half_length),
_set_axis(endpoint, cross_axis, _axis_value(center, cross_axis) + half_length),
]
)
return {
"centerline": [start, end],
"open_ends": open_ends,
"main_axis": main_axis,
}
def _wire_duct_centerline_from_bbox(bbox, margin=DEFAULT_WIRE_DUCT_MARGIN, min_aspect=1.5):
return _wire_duct_centerline_spec_from_bbox(
bbox,
margin=margin,
min_aspect=min_aspect,
).get("centerline", [])
def _sync_wire_duct_source_carriers(doc, source, spec, project_uuid="", capacity=1):
carriers = _live_source_carriers(doc, source)
if not carriers:
return False
desired = [
(spec.get("centerline", []), ROUTE_CARRIER_KIND_WIRE_DUCT),
]
desired.extend(
(points, ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END)
for points in (spec.get("open_ends", []) or [])
)
updated = []
for carrier, desired_item in zip(carriers, desired):
points, kind = desired_item
if _update_route_carrier(
carrier,
points,
project_uuid=project_uuid,
kind=kind,
capacity=capacity,
):
updated.append(carrier)
if updated:
_mark_wire_duct_source(source, updated[0], updated)
try:
doc.recompute()
except Exception:
pass
return True
def _wiring_cut_out_points_from_bbox(bbox, bridge_extension=0.0):
extents = _bbox_extents(bbox)
if not extents:
return []
through_axis = min(extents, key=extents.get)
low, high = _bbox_axis_range(bbox, through_axis)
center = _bbox_center(bbox)
if abs(high - low) <= DEFAULT_NODE_TOLERANCE:
other_extents = [
_bbox_extent(bbox, axis)
for axis in ("x", "y", "z")
if axis != through_axis
]
fallback = max(other_extents or [DEFAULT_WIRE_DUCT_OPEN_END_MIN_LENGTH])
low = _axis_value(center, through_axis) - fallback * 0.5
high = _axis_value(center, through_axis) + fallback * 0.5
extension = max(float(bridge_extension or 0.0), 0.0)
low -= extension
high += extension
start = _set_axis(center, through_axis, low)
end = _set_axis(center, through_axis, high)
if _distance(start, end) <= DEFAULT_NODE_TOLERANCE:
return []
return [start, end]
def _wire_duct_sources_from_selection(selection_ex):
sources = []
seen = set()
for item in selection_ex or []:
obj = getattr(item, "Object", None)
if obj is not None and id(obj) not in seen:
seen.add(id(obj))
sources.append(obj)
continue
for sub_object in list(getattr(item, "SubObjects", []) or []):
if sub_object is None or id(sub_object) in seen:
continue
seen.add(id(sub_object))
sources.append(sub_object)
return sources
def _route_source_carrier_names(source):
names = []
try:
raw = (getattr(source, "QetRouteCarrierNamesJson", "") or "").strip()
if raw:
parsed = json.loads(raw)
if isinstance(parsed, list):
names.extend(str(item).strip() for item in parsed if str(item).strip())
except Exception:
names = []
carrier_name = (getattr(source, "QetRouteCarrierName", "") or "").strip()
if carrier_name:
names.insert(0, carrier_name)
result = []
seen = set()
for name in names:
if name in seen:
continue
seen.add(name)
result.append(name)
return result
def _live_source_carriers(doc, source):
if doc is None or source is None:
return []
carriers = []
for carrier_name in _route_source_carrier_names(source):
carrier = doc.getObject(carrier_name)
if carrier is not None and is_route_carrier(carrier):
carriers.append(carrier)
return carriers
def _source_kind_value(source):
return (getattr(source, "QetRoutingSourceKind", "") or "").strip()
def _set_route_carrier_source_metadata(carrier, source, source_kind=""):
if carrier is None or source is None:
return
source_name = (getattr(source, "Name", "") or "").strip()
if not source_name:
return
kind = (source_kind or _source_kind_value(source)).strip()
TerminalObjects.ensure_string_property(
carrier,
"QetRouteSourceName",
PROPERTY_GROUP,
"FreeCAD source object name that generated this route carrier",
source_name,
)
TerminalObjects.ensure_string_property(
carrier,
"QetRouteSourceLabel",
PROPERTY_GROUP,
"FreeCAD source object label that generated this route carrier",
getattr(source, "Label", "") or source_name,
)
TerminalObjects.ensure_string_property(
carrier,
"QetRouteSourceKind",
PROPERTY_GROUP,
"Routing source kind that generated this route carrier",
kind,
)
def _remember_source_carriers(source, carriers):
live_names = [
getattr(carrier, "Name", "")
for carrier in (carriers or [])
if carrier is not None and getattr(carrier, "Name", "")
]
if live_names:
source_kind = _source_kind_value(source)
for carrier in carriers or []:
_set_route_carrier_source_metadata(carrier, source, source_kind=source_kind)
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierNamesJson",
PROPERTY_GROUP,
"Generated route carriers for this source",
json.dumps(live_names, ensure_ascii=False),
)
def _mark_wire_duct_source(source, carrier, carriers=None):
if source is None:
return
try:
_set_wire_duct_source_semantics(source)
if carrier is not None:
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierName",
PROPERTY_GROUP,
"Generated route carrier for this source",
getattr(carrier, "Name", ""),
)
_remember_source_carriers(source, carriers or ([carrier] if carrier is not None else []))
except Exception:
pass
def _mark_support_surface_source(source, carriers):
if source is None or not carriers:
return
try:
_set_support_surface_source_semantics(source)
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierName",
PROPERTY_GROUP,
"Generated route carrier for this source",
getattr(carriers[0], "Name", ""),
)
_remember_source_carriers(source, carriers)
except Exception:
pass
def _mark_wiring_cut_out_source(source, carrier, bridge_extension=DEFAULT_WIRING_CUT_OUT_BRIDGE_EXTENSION):
if source is None or carrier is None:
return
try:
_set_wiring_cut_out_source_semantics(source, bridge_extension=bridge_extension)
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierName",
PROPERTY_GROUP,
"Generated route carrier for this source",
getattr(carrier, "Name", ""),
)
_remember_source_carriers(source, [carrier])
except Exception:
pass
def _mark_terminal_access_source(source, carrier):
if source is None or carrier is None:
return
try:
TerminalObjects.ensure_string_property(
source,
"QetRoutingSourceKind",
PROPERTY_GROUP,
"Routing source kind",
ROUTE_CARRIER_KIND_TERMINAL_ACCESS,
)
TerminalObjects.ensure_string_property(
source,
"QetRouteCarrierName",
PROPERTY_GROUP,
"Generated route carrier for this source",
getattr(carrier, "Name", ""),
)
_remember_source_carriers(source, [carrier])
except Exception:
pass
def _live_source_carrier(doc, source):
carriers = _live_source_carriers(doc, source)
return carriers[0] if carriers else None
def _source_is_valid_for_kind(source, source_kind):
if source_kind == ROUTE_CARRIER_KIND_WIRE_DUCT:
return _is_wire_duct_candidate(source)
if source_kind == ROUTE_CARRIER_KIND_ROUTING_RANGE:
return _is_support_surface_candidate(source)
if source_kind == ROUTE_CARRIER_KIND_WIRING_CUT_OUT:
return _is_wiring_cut_out_candidate(source)
return True
def _clear_invalid_source_route_metadata(source):
for property_name in (
"QetRouteCarrierName",
"QetRouteCarrierNamesJson",
"QetRoutingObstacleMode",
):
if property_name not in getattr(source, "PropertiesList", []) and not getattr(source, property_name, ""):
continue
TerminalObjects.ensure_string_property(
source,
property_name,
PROPERTY_GROUP,
"Cleared invalid routing source metadata",
"",
)
def _document_object_by_name(doc, name):
if doc is None or not name:
return None
try:
return doc.getObject(name)
except Exception:
return None
def cleanup_invalid_source_carriers(doc):
"""Remove generated carriers whose FreeCAD source object is missing or invalid."""
if doc is None:
return 0
removed = 0
for carrier in list(collect_route_carriers(doc)):
source_name = (getattr(carrier, "QetRouteSourceName", "") or "").strip()
source_kind = (getattr(carrier, "QetRouteSourceKind", "") or "").strip()
if source_kind not in MANAGED_ROUTE_SOURCE_KINDS or not source_name:
continue
if _document_object_by_name(doc, source_name) is None:
removed += _remove_route_carriers(doc, [carrier])
for source in list(getattr(doc, "Objects", []) or []):
if source is None or is_route_carrier(source):
continue
source_kind = _source_kind_value(source)
if source_kind not in MANAGED_ROUTE_SOURCE_KINDS:
continue
if not _route_source_carrier_names(source):
continue
if _source_is_valid_for_kind(source, source_kind):
continue
removed += _remove_route_carriers(doc, _live_source_carriers(doc, source))
_clear_invalid_source_route_metadata(source)
if removed:
try:
doc.recompute()
except Exception:
pass
return removed
def detect_wire_duct_sources(doc, min_aspect=DEFAULT_AUTO_WIRE_DUCT_MIN_ASPECT):
"""Return document objects that look like wire ducts based on semantics/name and shape."""
sources = []
seen = set()
for obj in list(getattr(doc, "Objects", []) or []):
if id(obj) in seen:
continue
seen.add(id(obj))
if _is_wire_duct_candidate(obj, min_aspect=min_aspect):
sources.append(obj)
return sources
def detect_support_surface_sources(doc):
"""Return thin cabinet/panel objects that can provide low-priority support routes."""
sources = []
seen = set()
for obj in list(getattr(doc, "Objects", []) or []):
if id(obj) in seen:
continue
seen.add(id(obj))
if _is_support_surface_candidate(obj):
sources.append(obj)
return sources
def detect_wiring_cut_out_sources(doc):
"""Return pass-through cut-out objects that can bridge routing carriers."""
sources = []
seen = set()
for obj in list(getattr(doc, "Objects", []) or []):
if id(obj) in seen:
continue
seen.add(id(obj))
if _is_wiring_cut_out_candidate(obj):
sources.append(obj)
return sources
def prepare_layout_space_sources_from_document(doc, project_uuid=""):
"""Normalize the current FreeCAD document as an EPLAN-style layout space.
This does not generate the routing path network. It marks source objects so
wire ducts are pass-through objects, support panels can become routing ranges,
and the wiring buckets exist before network generation or routing.
"""
if doc is None:
raise RoutingNetworkError("No FreeCAD document is available.")
WiringObjects.ensure_wiring_root_group(doc, project_uuid)
cleanup_invalid_source_carriers(doc)
wire_duct_sources = detect_wire_duct_sources(doc)
support_surface_sources = detect_support_surface_sources(doc)
wiring_cut_out_sources = detect_wiring_cut_out_sources(doc)
for source in wire_duct_sources:
try:
_set_wire_duct_source_semantics(source)
except Exception:
pass
for source in support_surface_sources:
try:
_set_support_surface_source_semantics(source)
except Exception:
pass
for source in wiring_cut_out_sources:
try:
_set_wiring_cut_out_source_semantics(source)
except Exception:
pass
try:
doc.recompute()
except Exception:
pass
return {
"wire_duct_sources": len(wire_duct_sources),
"support_surface_sources": len(support_surface_sources),
"wiring_cut_out_sources": len(wiring_cut_out_sources),
"routable_terminals": len(_collect_routable_terminals(doc)),
"existing_network": network_summary(doc),
}
def create_wire_duct_carriers_from_document(
doc,
project_uuid="",
margin=DEFAULT_WIRE_DUCT_MARGIN,
min_aspect=DEFAULT_AUTO_WIRE_DUCT_MIN_ASPECT,
):
"""Auto-detect wire duct objects in the document and create WireDuct centerlines."""
cleanup_invalid_source_carriers(doc)
created = []
for index, source in enumerate(detect_wire_duct_sources(doc, min_aspect=min_aspect), start=1):
bbox = _bound_box_from_object(source)
if bbox is None:
continue
spec = _wire_duct_centerline_spec_from_bbox(
bbox,
margin=margin,
min_aspect=min_aspect,
)
points = spec.get("centerline", [])
if len(points) < 2:
continue
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Wire Duct"
capacity = _route_carrier_capacity_value(source, default=1)
if _sync_wire_duct_source_carriers(
doc,
source,
spec,
project_uuid=project_uuid,
capacity=capacity,
):
continue
carrier = create_route_carrier(
doc,
points,
label="QET Auto Wire Duct Centerline {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT,
capacity=capacity,
)
source_created = [carrier]
created.append(carrier)
for end_index, open_end_points in enumerate(spec.get("open_ends", []) or [], start=1):
if len(open_end_points) < 2:
continue
open_end_carrier = create_route_carrier(
doc,
open_end_points,
label="QET Auto Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
capacity=capacity,
)
source_created.append(open_end_carrier)
created.append(open_end_carrier)
_mark_wire_duct_source(source, carrier, source_created)
return created
def create_wiring_cut_out_carriers_from_document(
doc,
project_uuid="",
bridge_extension=DEFAULT_WIRING_CUT_OUT_BRIDGE_EXTENSION,
):
"""Create pass-through route carriers for wiring cut-out objects."""
cleanup_invalid_source_carriers(doc)
created = []
for source in detect_wiring_cut_out_sources(doc):
bbox = _bound_box_from_object(source)
if bbox is None:
continue
source_bridge_extension = _wiring_cut_out_bridge_extension_value(source, default=bridge_extension)
points = _wiring_cut_out_points_from_bbox(bbox, bridge_extension=source_bridge_extension)
if len(points) < 2:
continue
live_carrier = _live_source_carrier(doc, source)
if live_carrier is not None:
if _update_route_carrier(
live_carrier,
points,
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRING_CUT_OUT,
):
_mark_wiring_cut_out_source(source, live_carrier, bridge_extension=source_bridge_extension)
try:
doc.recompute()
except Exception:
pass
continue
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Wiring Cut-Out"
carrier = create_route_carrier(
doc,
points,
label="QET Auto Wiring Cut-Out {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRING_CUT_OUT,
)
_mark_wiring_cut_out_source(source, carrier, bridge_extension=source_bridge_extension)
created.append(carrier)
return created
def create_surface_carriers_from_document(
doc,
project_uuid="",
spacing=DEFAULT_SURFACE_LANE_SPACING,
offset=DEFAULT_SURFACE_OFFSET,
margin=DEFAULT_SURFACE_MARGIN,
):
"""Auto-detect thin support panels and create low-priority RoutingRange grids."""
cleanup_invalid_source_carriers(doc)
created = []
for source in detect_support_surface_sources(doc):
bbox = _bound_box_from_object(source)
if bbox is None:
continue
support_face = _support_face_from_bbox(bbox)
grids = _surface_face_grid_points(
support_face,
spacing=spacing,
offset=offset,
margin=margin,
)
live_carriers = _live_source_carriers(doc, source)
if live_carriers:
updated = []
for carrier, points in zip(live_carriers, grids):
if _update_route_carrier(
carrier,
points,
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_ROUTING_RANGE,
):
updated.append(carrier)
source_created = []
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Support Surface"
for index, points in enumerate(grids[len(live_carriers):], start=len(live_carriers) + 1):
if len(points) < 2:
continue
carrier = create_route_carrier(
doc,
points,
label="QET Auto Support Surface Route {0} {1}".format(label, index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_ROUTING_RANGE,
)
source_created.append(carrier)
created.append(carrier)
for stale_carrier in live_carriers[len(grids):]:
_detach_from_groups(doc, stale_carrier)
try:
if doc.getObject(getattr(stale_carrier, "Name", "")) is not None:
doc.removeObject(stale_carrier.Name)
except Exception:
pass
current_carriers = updated + source_created
if updated:
_mark_support_surface_source(source, current_carriers)
try:
doc.recompute()
except Exception:
pass
continue
source_created = []
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Support Surface"
for index, points in enumerate(grids, start=1):
if len(points) < 2:
continue
carrier = create_route_carrier(
doc,
points,
label="QET Auto Support Surface Route {0} {1}".format(label, index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_ROUTING_RANGE,
)
source_created.append(carrier)
created.append(carrier)
_mark_support_surface_source(source, source_created)
return created
def _collect_routable_terminals(doc):
terminals = []
seen = set()
root = None
try:
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
except Exception:
root = None
if root is not None:
terminals.extend(TerminalObjects.collect_terminal_objects(root))
terminals.extend(
obj
for obj in list(getattr(doc, "Objects", []) or [])
if TerminalObjects.is_terminal_object(obj)
)
result = []
for terminal in terminals:
if terminal is None or id(terminal) in seen:
continue
seen.add(id(terminal))
result.append(terminal)
return result
def _terminal_exit_point(terminal, exit_length):
origin = _vector(TerminalObjects.terminal_origin(terminal))
direction = _normalize(_vector(TerminalObjects.terminal_direction(terminal)))
if direction is None:
direction = App.Vector(0, 0, 1)
return _add(origin, _scale(direction, max(float(exit_length or 0.0), 0.0)))
def _orthogonal_access_points(start, end):
"""Create a Manhattan path so access carriers can join the routing graph."""
start = _vector(start)
end = _vector(end)
points = [start]
current = start
axes = sorted(
("x", "y", "z"),
key=lambda axis: abs(_axis_value(end, axis) - _axis_value(start, axis)),
reverse=True,
)
for axis in axes:
if abs(_axis_value(end, axis) - _axis_value(current, axis)) <= DEFAULT_NODE_TOLERANCE:
continue
current = _set_axis(current, axis, _axis_value(end, axis))
if _distance(points[-1], current) > DEFAULT_NODE_TOLERANCE:
points.append(current)
if _distance(points[-1], end) > DEFAULT_NODE_TOLERANCE:
points.append(end)
return points
def create_terminal_access_carriers_from_document(
doc,
project_uuid="",
terminal_exit_length=20.0,
max_distance=DEFAULT_TERMINAL_ACCESS_MAX_DISTANCE,
):
"""Connect every engineering terminal to the generated route network.
EPLAN/SW 的一键布线不是让用户给每个端子手工画辅助线,而是先把端子
自动接入路由网络。这里生成短的 TerminalAccess carrier后续 Dijkstra
才能从端子入口进入线槽/布线面。
"""
# TerminalAccess depends directly on current terminal placement, so regenerate it
# every time the layout space is prepared. This keeps one-click routing predictable
# after devices or terminals are moved in FreeCAD.
for carrier in list(collect_route_carriers(doc)):
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() != ROUTE_CARRIER_KIND_TERMINAL_ACCESS:
continue
_detach_from_groups(doc, carrier)
try:
if doc.getObject(getattr(carrier, "Name", "")) is not None:
doc.removeObject(carrier.Name)
except Exception:
pass
try:
doc.recompute()
except Exception:
pass
network = build_route_graph(doc)
if network.get("segment_count", 0) <= 0:
return []
created = []
for terminal in _collect_routable_terminals(doc):
if _live_source_carrier(doc, terminal) is not None:
continue
exit_point = _terminal_exit_point(terminal, terminal_exit_length)
nearest_point, distance = nearest_point_on_network(network, exit_point)
if nearest_point is None:
continue
if max_distance and float(distance or 0.0) > float(max_distance):
continue
if float(distance or 0.0) <= DEFAULT_NODE_TOLERANCE:
continue
points = _orthogonal_access_points(exit_point, nearest_point)
if len(points) < 2:
continue
label = getattr(terminal, "Label", "") or getattr(terminal, "Name", "") or "Terminal"
carrier = create_route_carrier(
doc,
points,
label="QET Auto Terminal Access {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_TERMINAL_ACCESS,
)
_mark_terminal_access_source(terminal, carrier)
created.append(carrier)
return created
def create_routing_path_network_from_document(
doc,
project_uuid="",
selection_ex=None,
terminal_exit_length=20.0,
terminal_access_max_distance=DEFAULT_TERMINAL_ACCESS_MAX_DISTANCE,
):
"""Generate the EPLAN-style routing path network for the layout space.
Selection is treated as a hint for wire ducts that cannot be detected from
names or semantics. The full document is still scanned afterwards, matching
EPLAN's "generate routing path network for the layout space" behavior.
"""
layout_space = prepare_layout_space_sources_from_document(
doc,
project_uuid=project_uuid,
)
selected_wire_ducts = []
if selection_ex:
selected_wire_ducts = create_wire_duct_carriers_from_selection(
doc,
selection_ex,
project_uuid=project_uuid,
)
wire_ducts = create_wire_duct_carriers_from_document(
doc,
project_uuid=project_uuid,
)
cut_outs = create_wiring_cut_out_carriers_from_document(
doc,
project_uuid=project_uuid,
)
surfaces = create_surface_carriers_from_document(
doc,
project_uuid=project_uuid,
)
terminal_access = create_terminal_access_carriers_from_document(
doc,
project_uuid=project_uuid,
terminal_exit_length=terminal_exit_length,
max_distance=terminal_access_max_distance,
)
all_wire_duct_created = list(selected_wire_ducts) + list(wire_ducts)
wire_duct_main_count = sum(
1
for carrier in all_wire_duct_created
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() == ROUTE_CARRIER_KIND_WIRE_DUCT
)
selected_wire_duct_main_count = sum(
1
for carrier in selected_wire_ducts
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() == ROUTE_CARRIER_KIND_WIRE_DUCT
)
open_end_count = sum(
1
for carrier in all_wire_duct_created
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip()
== ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END
)
return {
"wire_duct_carriers": wire_duct_main_count,
"selected_wire_duct_carriers": selected_wire_duct_main_count,
"wire_duct_open_end_carriers": open_end_count,
"wiring_cut_out_carriers": len(cut_outs),
"surface_carriers": len(surfaces),
"terminal_access_carriers": len(terminal_access),
"layout_space": layout_space,
"network": network_summary(doc),
}
def create_wire_duct_carriers_from_selection(
doc,
selection_ex,
project_uuid="",
margin=DEFAULT_WIRE_DUCT_MARGIN,
min_aspect=1.5,
):
"""Create WireDuct centerline carriers from selected duct-like solids."""
cleanup_invalid_source_carriers(doc)
created = []
for index, source in enumerate(_wire_duct_sources_from_selection(selection_ex), start=1):
bbox = _bound_box_from_object(source)
if bbox is None:
continue
spec = _wire_duct_centerline_spec_from_bbox(
bbox,
margin=margin,
min_aspect=min_aspect,
)
points = spec.get("centerline", [])
if len(points) < 2:
continue
label = getattr(source, "Label", "") or getattr(source, "Name", "") or "Wire Duct"
capacity = _route_carrier_capacity_value(source, default=1)
if _sync_wire_duct_source_carriers(
doc,
source,
spec,
project_uuid=project_uuid,
capacity=capacity,
):
continue
carrier = create_route_carrier(
doc,
points,
label="QET Wire Duct Centerline {0}".format(label),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT,
capacity=capacity,
)
source_created = [carrier]
created.append(carrier)
for end_index, open_end_points in enumerate(spec.get("open_ends", []) or [], start=1):
if len(open_end_points) < 2:
continue
open_end_carrier = create_route_carrier(
doc,
open_end_points,
label="QET Wire Duct Open End {0} {1}".format(label, end_index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_WIRE_DUCT_OPEN_END,
capacity=capacity,
)
source_created.append(open_end_carrier)
created.append(open_end_carrier)
_mark_wire_duct_source(source, carrier, source_created)
return created
def create_surface_carriers_from_selection(
doc,
selection_ex,
project_uuid="",
spacing=DEFAULT_SURFACE_LANE_SPACING,
offset=DEFAULT_SURFACE_OFFSET,
margin=DEFAULT_SURFACE_MARGIN,
):
"""Create a supported route grid on selected planar cabinet/panel faces."""
created = []
for item in selection_ex or []:
item_created = []
for sub_object in list(getattr(item, "SubObjects", []) or []):
shape_type = (getattr(sub_object, "ShapeType", "") or "").lower()
if shape_type != "face":
continue
grids = _surface_face_grid_points(
sub_object,
spacing=spacing,
offset=offset,
margin=margin,
)
for index, points in enumerate(grids, start=1):
if len(points) < 2:
continue
carrier = create_route_carrier(
doc,
points,
label="QET Surface Route {0}".format(index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_ROUTING_RANGE,
)
item_created.append(carrier)
created.append(carrier)
if item_created:
continue
obj = getattr(item, "Object", None)
if not _is_support_surface_candidate(obj):
continue
if _live_source_carrier(doc, obj) is not None:
continue
bbox = _bound_box_from_object(obj)
if bbox is None:
continue
support_face = _support_face_from_bbox(bbox)
grids = _surface_face_grid_points(
support_face,
spacing=spacing,
offset=offset,
margin=margin,
)
source_created = []
label = getattr(obj, "Label", "") or getattr(obj, "Name", "") or "Support Surface"
for index, points in enumerate(grids, start=1):
if len(points) < 2:
continue
carrier = create_route_carrier(
doc,
points,
label="QET Surface Route {0} {1}".format(label, index),
project_uuid=project_uuid,
kind=ROUTE_CARRIER_KIND_ROUTING_RANGE,
)
source_created.append(carrier)
created.append(carrier)
_mark_support_surface_source(obj, source_created)
return created
def _carrier_cost_factor(carrier, kind_cost_factors=None):
kind = (getattr(carrier, "QetRouteCarrierKind", "") or "").strip()
factors = dict(DEFAULT_KIND_COST_FACTORS)
if isinstance(kind_cost_factors, dict):
factors.update(kind_cost_factors)
try:
return max(float(factors.get(kind, 1.0)), 0.01)
except Exception:
return 1.0
def build_route_graph(
doc,
tolerance=DEFAULT_NODE_TOLERANCE,
blocked_bboxes=None,
adjoining_duct_tolerance=DEFAULT_ADJOINING_DUCT_TOLERANCE,
):
"""Build an undirected graph from every enabled route carrier."""
nodes = {}
edges = {}
carriers = collect_route_carriers(doc)
segment_count = 0
blocked_segment_count = 0
bridged_segment_count = 0
blocked_bboxes = list(blocked_bboxes or [])
segments = []
wire_duct_endpoint_nodes = []
def ensure_node(point):
key = _point_key(point, tolerance=tolerance)
if key not in nodes:
nodes[key] = point
edges[key] = []
return key
for carrier in carriers:
points = _carrier_points(carrier)
if len(points) < 2:
continue
for index in range(len(points) - 1):
start = points[index]
end = points[index + 1]
if _distance(start, end) <= tolerance:
continue
segments.append(
{
"carrier": carrier,
"start": start,
"end": end,
"points": [start, end],
}
)
# Several wire ducts often touch or cross geometrically without sharing endpoint
# coordinates. Split those carrier segments at the intersection points so Dijkstra
# can change direction there, which matches CAD routing path behavior.
for left_index in range(len(segments)):
left = segments[left_index]
for right in segments[left_index + 1:]:
intersections = _orthogonal_segment_intersections(
left["start"],
left["end"],
right["start"],
right["end"],
tolerance=tolerance,
)
if not intersections:
continue
left["points"].extend(intersections)
right["points"].extend(intersections)
for segment in segments:
ordered = _sorted_segment_points(
segment["start"],
segment["end"],
segment["points"],
tolerance=tolerance,
)
if len(ordered) < 2:
continue
carrier = segment["carrier"]
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() == ROUTE_CARRIER_KIND_WIRE_DUCT:
for endpoint in (ordered[0], ordered[-1]):
endpoint_key = ensure_node(endpoint)
wire_duct_endpoint_nodes.append((endpoint_key, nodes[endpoint_key], carrier))
previous_key = ensure_node(ordered[0])
previous_point = nodes[previous_key]
for point in ordered[1:]:
current_key = ensure_node(point)
current_point = nodes[current_key]
weight = _distance(previous_point, current_point)
if weight > tolerance:
if _segment_hits_blocked_bbox(previous_point, current_point, blocked_bboxes):
blocked_segment_count += 1
previous_key = current_key
previous_point = current_point
continue
edges[previous_key].append((current_key, weight, carrier))
edges[current_key].append((previous_key, weight, carrier))
segment_count += 1
previous_key = current_key
previous_point = current_point
adjoining_limit = max(float(adjoining_duct_tolerance or 0.0), 0.0)
bridged_pairs = set()
if adjoining_limit > tolerance:
for left_index, left in enumerate(wire_duct_endpoint_nodes):
left_key, left_point, left_carrier = left
for right_key, right_point, right_carrier in wire_duct_endpoint_nodes[left_index + 1:]:
if left_key == right_key or left_carrier is right_carrier:
continue
pair = tuple(sorted((left_key, right_key)))
if pair in bridged_pairs:
continue
distance = _distance(left_point, right_point)
if distance <= tolerance or distance > adjoining_limit:
continue
if any(next_key == right_key for next_key, _weight, _carrier in edges.get(left_key, [])):
continue
if _segment_hits_blocked_bbox(left_point, right_point, blocked_bboxes):
blocked_segment_count += 1
continue
edges[left_key].append((right_key, distance, left_carrier))
edges[right_key].append((left_key, distance, right_carrier))
segment_count += 1
bridged_segment_count += 1
bridged_pairs.add(pair)
return {
"nodes": nodes,
"edges": edges,
"carriers": carriers,
"carrier_count": len(carriers),
"segment_count": segment_count,
"bridged_segment_count": bridged_segment_count,
"blocked_segment_count": blocked_segment_count,
"tolerance": tolerance,
}
def nearest_node(network, point):
nodes = network.get("nodes", {}) if isinstance(network, dict) else {}
if not nodes:
return None, None
target = _vector(point)
best_key = None
best_distance = None
for key, node_point in nodes.items():
distance = _distance(target, node_point)
if best_distance is None or distance < best_distance:
best_key = key
best_distance = distance
return best_key, best_distance
def nearest_point_on_network(network, point):
"""Return the closest point on any route-network edge.
The point may lie in the middle of a carrier segment. If a TerminalAccess
carrier ends there, the next graph build will split the crossed segment at
that point and create an EPLAN-like jump-in routing point.
"""
if not isinstance(network, dict):
return None, None
nodes = network.get("nodes", {}) or {}
edges = network.get("edges", {}) or {}
if not nodes or not edges:
return None, None
target = _vector(point)
best_point = None
best_distance = None
seen = set()
for key, neighbors in edges.items():
start = nodes.get(key)
if start is None:
continue
for next_key, _weight, _carrier in neighbors:
pair = tuple(sorted((key, next_key)))
if pair in seen:
continue
seen.add(pair)
end = nodes.get(next_key)
if end is None:
continue
candidate = _closest_point_on_segment(target, start, end)
distance = _distance(target, candidate)
if best_distance is None or distance < best_distance:
best_point = candidate
best_distance = distance
if best_point is not None:
return best_point, best_distance
return nearest_node(network, target)
def connect_point_to_network(network, point):
"""Connect the closest projected point to a route graph and return key/distance/mode."""
if not isinstance(network, dict):
return None, None, "none"
nodes = network.get("nodes", {}) or {}
edges = network.get("edges", {}) or {}
if not nodes or not edges:
return None, None, "none"
tolerance = float(network.get("tolerance", DEFAULT_NODE_TOLERANCE) or DEFAULT_NODE_TOLERANCE)
target = _vector(point)
best = None
seen = set()
for key, neighbors in edges.items():
start = nodes.get(key)
if start is None:
continue
for next_key, _weight, carrier in neighbors:
pair = tuple(sorted((key, next_key)))
if pair in seen:
continue
seen.add(pair)
end = nodes.get(next_key)
if end is None:
continue
projected = _closest_point_on_segment(target, start, end)
distance = _distance(target, projected)
if best is None or distance < best["distance"]:
best = {
"key": key,
"next_key": next_key,
"carrier": carrier,
"point": projected,
"distance": distance,
}
if best is None:
node_key, distance = nearest_node(network, target)
return node_key, distance, "node" if node_key is not None else "none"
projected_key = _point_key(best["point"], tolerance=tolerance)
if projected_key in nodes:
return projected_key, best["distance"], "node"
start_key = best["key"]
end_key = best["next_key"]
start = nodes[start_key]
end = nodes[end_key]
carrier = best["carrier"]
def remove_edge_once(left_key, right_key, fallback_to_pair=False):
neighbors = list(edges.get(left_key, []) or [])
for index, (candidate_key, _weight, candidate_carrier) in enumerate(neighbors):
if candidate_key == right_key and candidate_carrier is carrier:
del neighbors[index]
edges[left_key] = neighbors
return True
if fallback_to_pair:
for index, (candidate_key, _weight, _candidate_carrier) in enumerate(neighbors):
if candidate_key == right_key:
del neighbors[index]
edges[left_key] = neighbors
return True
return False
removed_forward = remove_edge_once(start_key, end_key)
remove_edge_once(end_key, start_key, fallback_to_pair=removed_forward)
nodes[projected_key] = best["point"]
edges[projected_key] = []
added_segments = 0
for left_key, left_point, right_key, right_point in (
(start_key, start, projected_key, best["point"]),
(projected_key, best["point"], end_key, end),
):
weight = _distance(left_point, right_point)
if weight <= tolerance:
continue
edges[left_key].append((right_key, weight, carrier))
edges[right_key].append((left_key, weight, carrier))
added_segments += 1
network["segment_count"] = max(int(network.get("segment_count", 0) or 0) - 1 + added_segments, 0)
return projected_key, best["distance"], "segment_projection"
def _carrier_track_payload(carrier):
payload = {
"name": getattr(carrier, "Name", ""),
"label": getattr(carrier, "Label", ""),
"kind": (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() or ROUTE_CARRIER_KIND,
}
source_fields = (
("source_name", "QetRouteSourceName"),
("source_label", "QetRouteSourceLabel"),
("source_kind", "QetRouteSourceKind"),
)
for payload_key, property_name in source_fields:
value = (getattr(carrier, property_name, "") or "").strip()
if value:
payload[payload_key] = value
return payload
def _segment_usage_key(carrier, from_key, to_key):
carrier_name = getattr(carrier, "Name", "") if carrier is not None else ""
return (
carrier_name,
tuple(sorted((from_key, to_key))),
)
def _carrier_capacity(carrier):
if carrier is None:
return 1
for property_name in ("QetRouteCarrierCapacity", "QetWireCapacity"):
try:
value = int(float(getattr(carrier, property_name, 0) or 0))
except Exception:
value = 0
if value > 0:
return value
return 1
def shortest_path_with_carriers(
network,
start_key,
end_key,
bend_penalty=0.0,
kind_cost_factors=None,
segment_usage_costs=None,
segment_reuse_penalty=0.0,
):
"""Dijkstra search with a small extra cost when route direction changes."""
if start_key is None or end_key is None:
return None
if start_key == end_key:
return {
"path": [start_key],
"segments": [],
"cost": 0.0,
}
nodes = network.get("nodes", {})
edges = network.get("edges", {})
queue = []
counter = 0
start_state = (start_key, None)
distances = {start_state: 0.0}
previous = {}
heapq.heappush(queue, (0.0, counter, start_key, None))
while queue:
cost, _counter, key, previous_direction = heapq.heappop(queue)
state = (key, previous_direction)
if cost > distances.get(state, float("inf")):
continue
if key == end_key:
path = [key]
segments = []
current_state = state
while current_state in previous:
previous_entry = previous[current_state]
previous_state = previous_entry["state"]
previous_key = previous_state[0]
current_key = current_state[0]
carrier = previous_entry.get("carrier")
segments.append(
{
"from_key": list(previous_key),
"to_key": list(current_key),
"from": _point_payload(nodes[previous_key]),
"to": _point_payload(nodes[current_key]),
"length_mm": float(previous_entry.get("weight", 0.0) or 0.0),
"carrier": _carrier_track_payload(carrier),
}
)
current_state = previous_state
path.append(current_state[0])
path.reverse()
segments.reverse()
carrier_names = []
carrier_kinds = {}
for segment in segments:
carrier = segment.get("carrier", {})
name = carrier.get("name", "")
if name and name not in carrier_names:
carrier_names.append(name)
kind = carrier.get("kind", "") or ROUTE_CARRIER_KIND
carrier_kinds[kind] = carrier_kinds.get(kind, 0) + 1
return {
"path": path,
"segments": segments,
"carrier_names": carrier_names,
"carrier_kinds": carrier_kinds,
"cost": float(cost),
}
for next_key, weight, carrier in edges.get(key, []):
direction = _direction_key(nodes[key], nodes[next_key])
bend_cost = 0.0
if previous_direction is not None and direction != previous_direction:
bend_cost = float(bend_penalty or 0.0)
usage_cost = 0.0
if segment_usage_costs:
usage_count = float(segment_usage_costs.get(_segment_usage_key(carrier, key, next_key), 0.0) or 0.0)
capacity = float(_carrier_capacity(carrier))
excess_usage = max(usage_count - capacity + 1.0, 0.0)
usage_cost = excess_usage * float(segment_reuse_penalty or 0.0)
next_state = (next_key, direction)
next_cost = (
cost
+ float(weight) * _carrier_cost_factor(carrier, kind_cost_factors)
+ bend_cost
+ usage_cost
)
if next_cost < distances.get(next_state, float("inf")):
distances[next_state] = next_cost
previous[next_state] = {
"state": state,
"carrier": carrier,
"weight": weight,
}
counter += 1
heapq.heappush(queue, (next_cost, counter, next_key, direction))
return None
def path_points(network, path_keys):
nodes = network.get("nodes", {}) if isinstance(network, dict) else {}
return [nodes[key] for key in path_keys or [] if key in nodes]
def network_summary(doc):
network = build_route_graph(doc)
return _network_summary_from_graph(network)
def _network_summary_from_graph(network):
kinds = {}
for carrier in network.get("carriers", []) or []:
kind = (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() or ROUTE_CARRIER_KIND
kinds[kind] = kinds.get(kind, 0) + 1
return {
"carriers": int(network.get("carrier_count", 0)),
"segments": int(network.get("segment_count", 0)),
"bridged_segments": int(network.get("bridged_segment_count", 0)),
"blocked_segments": int(network.get("blocked_segment_count", 0)),
"nodes": len(network.get("nodes", {})),
"kinds": kinds,
}
def _route_graph_components(network):
nodes = network.get("nodes", {}) or {}
edges = network.get("edges", {}) or {}
seen = set()
components = []
for start_key in nodes.keys():
if start_key in seen:
continue
stack = [start_key]
seen.add(start_key)
node_keys = []
edge_pairs = set()
carriers = {}
kinds = {}
while stack:
key = stack.pop()
node_keys.append(key)
for next_key, _weight, carrier in edges.get(key, []) or []:
pair = tuple(sorted((key, next_key)))
edge_pairs.add(pair)
if carrier is not None:
carrier_name = getattr(carrier, "Name", "")
if carrier_name:
carriers[carrier_name] = _carrier_track_payload(carrier)
kind = (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() or ROUTE_CARRIER_KIND
kinds[kind] = kinds.get(kind, 0) + 1
if next_key not in seen:
seen.add(next_key)
stack.append(next_key)
components.append(
{
"index": len(components),
"nodes": len(node_keys),
"segments": len(edge_pairs),
"carrier_names": sorted(carriers.keys()),
"carrier_kinds": kinds,
"has_terminal_access": any(
carrier.get("kind") == ROUTE_CARRIER_KIND_TERMINAL_ACCESS
for carrier in carriers.values()
),
}
)
return components
def _wire_duct_endpoint_breaks(network):
nodes = network.get("nodes", {}) or {}
edges = network.get("edges", {}) or {}
breaks = []
for carrier in network.get("carriers", []) or []:
if (getattr(carrier, "QetRouteCarrierKind", "") or "").strip() != ROUTE_CARRIER_KIND_WIRE_DUCT:
continue
points = _carrier_points(carrier)
if len(points) < 2:
continue
for endpoint in (points[0], points[-1]):
key = _point_key(endpoint, tolerance=network.get("tolerance", DEFAULT_NODE_TOLERANCE))
degree = len(edges.get(key, []) or [])
if degree > 1:
continue
breaks.append(
{
"carrier": _carrier_track_payload(carrier),
"point": _point_payload(nodes.get(key, endpoint)),
"degree": degree,
}
)
return breaks
def _terminal_diagnostic_payload(terminal):
return {
"name": getattr(terminal, "Name", ""),
"label": getattr(terminal, "Label", ""),
"terminal_uuid": (getattr(terminal, "QetTerminalUuid", "") or "").strip(),
"instance_id": (getattr(terminal, "QetInstanceId", "") or "").strip(),
}
def diagnose_routing_path_network(
doc,
terminal_exit_length=20.0,
terminal_access_max_distance=DEFAULT_TERMINAL_ACCESS_MAX_DISTANCE,
):
"""Inspect the generated routing path network without routing wires."""
if doc is None:
raise RoutingNetworkError("No FreeCAD document is available.")
network = build_route_graph(doc)
components = _route_graph_components(network)
summary = _network_summary_from_graph(network)
isolated_components = components if len(components) > 1 else []
unconnected_terminals = []
max_distance = max(float(terminal_access_max_distance or 0.0), 0.0)
for terminal in _collect_routable_terminals(doc):
exit_point = _terminal_exit_point(terminal, terminal_exit_length)
nearest_point, distance = nearest_point_on_network(network, exit_point)
access_carrier = _live_source_carrier(doc, terminal)
access_live = access_carrier is not None and is_route_carrier(access_carrier)
too_far = nearest_point is None or (max_distance > 0.0 and float(distance or 0.0) > max_distance)
connected_directly = nearest_point is not None and float(distance or 0.0) <= DEFAULT_NODE_TOLERANCE
if (access_live or connected_directly) and not too_far:
continue
payload = _terminal_diagnostic_payload(terminal)
payload.update(
{
"access_carrier": getattr(access_carrier, "Name", "") if access_carrier is not None else "",
"nearest_network_distance_mm": None if distance is None else float(distance),
"nearest_network_point": None if nearest_point is None else _point_payload(nearest_point),
"code": "terminal_access_missing" if not access_live else "terminal_access_too_far",
}
)
unconnected_terminals.append(payload)
possible_breaks = _wire_duct_endpoint_breaks(network)
issues = []
if isolated_components:
issues.append(
{
"severity": "warning",
"code": "isolated_network_components",
"message": "Routing path network contains isolated components.",
"count": len(isolated_components),
}
)
if unconnected_terminals:
issues.append(
{
"severity": "error",
"code": "unconnected_terminals",
"message": "Some terminals are not connected to the routing path network.",
"count": len(unconnected_terminals),
}
)
if possible_breaks:
issues.append(
{
"severity": "warning",
"code": "wire_duct_endpoint_breaks",
"message": "Some wire duct endpoints have no adjacent network connection.",
"count": len(possible_breaks),
}
)
return {
"summary": summary,
"component_count": len(components),
"components": components,
"isolated_components": isolated_components,
"unconnected_terminals": unconnected_terminals,
"possible_breaks": possible_breaks,
"issues": issues,
"ok": not issues,
}
def _highlight_routing_network_diagnostics(doc, diagnostic):
isolated_carriers = set()
for component in diagnostic.get("isolated_components", []) or []:
isolated_carriers.update(component.get("carrier_names", []) or [])
unconnected_terminal_names = set(
item.get("name", "")
for item in diagnostic.get("unconnected_terminals", []) or []
if item.get("name", "")
)
break_carriers = set(
item.get("carrier", {}).get("name", "")
for item in diagnostic.get("possible_breaks", []) or []
if item.get("carrier", {}).get("name", "")
)
for obj in list(getattr(doc, "Objects", []) or []):
name = getattr(obj, "Name", "")
try:
if name in unconnected_terminal_names:
obj.ViewObject.LineColor = (1.0, 0.0, 0.0)
obj.ViewObject.LineWidth = 4.0
elif name in break_carriers:
obj.ViewObject.LineColor = (1.0, 0.0, 0.0)
obj.ViewObject.LineWidth = 4.0
elif name in isolated_carriers:
obj.ViewObject.LineColor = (1.0, 0.35, 0.0)
obj.ViewObject.LineWidth = 3.0
except Exception:
pass
def _clear_routing_path_network_diagnostics(doc, group):
removed = 0
for obj in list(getattr(group, "Group", []) or []):
if (getattr(obj, "QetDiagnosticKind", "") or "").strip() != "RoutingPathNetwork":
continue
_detach_from_groups(doc, obj)
try:
if doc.getObject(getattr(obj, "Name", "")) is not None:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def write_routing_path_network_diagnostic(
doc,
project_uuid="",
terminal_exit_length=20.0,
terminal_access_max_distance=DEFAULT_TERMINAL_ACCESS_MAX_DISTANCE,
):
diagnostic = diagnose_routing_path_network(
doc,
terminal_exit_length=terminal_exit_length,
terminal_access_max_distance=terminal_access_max_distance,
)
group = WiringObjects.ensure_diagnostic_group(doc, project_uuid)
_clear_routing_path_network_diagnostics(doc, group)
obj = doc.addObject("App::DocumentObjectGroup", _unique_name(doc, "QETRoutingPathNetworkDiagnostic"))
obj.Label = "QET Routing Path Network Diagnostic"
TerminalObjects.ensure_string_property(
obj,
"QetDiagnosticKind",
PROPERTY_GROUP,
"QET diagnostic kind",
"RoutingPathNetwork",
)
TerminalObjects.ensure_string_property(
obj,
"QetDiagnosticJson",
PROPERTY_GROUP,
"QET routing path network diagnostic payload",
json.dumps(diagnostic, ensure_ascii=False),
)
group.addObject(obj)
_highlight_routing_network_diagnostics(doc, diagnostic)
try:
doc.recompute()
except Exception:
pass
return {
"diagnostic": diagnostic,
"diagnostic_object": obj,
}
def carrier_payload(carrier):
return {
"name": getattr(carrier, "Name", ""),
"label": getattr(carrier, "Label", ""),
"kind": getattr(carrier, "QetRouteCarrierKind", ""),
"points": [_point_payload(point) for point in _carrier_points(carrier)],
}