You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2295 lines
83 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# FreeCADExchange 3D routing connections.
#
# 第一版不碰 C++,也不把 3D 走线结果写进数据库。
# 它只读取 FreeCAD 文档里的端子、走线网络和几何障碍,
# 然后在 QETWiring_04_Routed 下生成一条可见的折线导线。
import json
import math
import FreeCAD as App
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
import RoutingNetwork
import TerminalObjects
import TemplateSemantics
import WiringObjects
DEFAULT_OPTIONS = {
# 端子出来先走一小段,避免导线贴着设备外壳起步。
"terminal_exit_length": 20.0,
"lane_axis": "auto",
"lane_spacing": 10.0,
"segment_reuse_penalty": 200.0,
# 线槽网络相关参数。
"use_routing_network": True,
"network_entry_max_distance": 1000.0,
"adjoining_duct_tolerance": RoutingNetwork.DEFAULT_ADJOINING_DUCT_TOLERANCE,
"bend_penalty": 25.0,
# EPLAN/SOLIDWORKS 风格:线槽/路由路径最优先,辅助面域只作为过渡/兜底区域。
"carrier_kind_cost_factors": {
"WireDuct": 1.0,
"WireDuctOpenEnd": 1.0,
"WiringCutOut": 1.0,
"RoutingPath": 1.0,
"UserPath": 1.0,
"AuxiliaryPath": 2.0,
"TerminalAccess": 2.0,
"RoutingRange": 8.0,
},
# 主干必须走 carrier/贴面网络;没有布线路径网络时直接失败。
# 障碍包围盒会按这个距离膨胀,用于提前发现贴碰风险。
"obstacle_clearance": 5.0,
# 端子出线/入线段通常会贴近端子塑壳或设备外壳,不作为主路径碰撞判定依据。
"ignore_endpoint_collision_segments": True,
# 防止坐标异常或端子离路由网络过远时生成超长接入线,把 FreeCAD
# 视图包围盒拉得过大,导致旋转时模型被裁剪到看不见。
"terminal_access_max_distance": 1000.0,
# 先把穿过障碍包围盒的路由网络边从 Dijkstra 图中移除;如果没有安全
# 替代路径,再退回原图并用 CollisionWarning 告诉用户当前网络不足。
"avoid_obstacles": True,
"replace_existing": True,
}
class AutoRoutingError(RuntimeError):
pass
def _merged_options(options):
merged = dict(DEFAULT_OPTIONS)
if isinstance(options, dict):
merged.update(options)
return merged
def _vector(point):
if isinstance(point, App.Vector):
return App.Vector(point.x, point.y, point.z)
if isinstance(point, (list, tuple)) and len(point) >= 3:
return App.Vector(float(point[0]), float(point[1]), float(point[2]))
if isinstance(point, dict):
return App.Vector(
float(point.get("x", 0.0)),
float(point.get("y", 0.0)),
float(point.get("z", 0.0)),
)
if all(hasattr(point, name) for name in ("x", "y", "z")):
return App.Vector(float(point.x), float(point.y), float(point.z))
return App.Vector(0, 0, 0)
def _distance(left, right):
dx = float(left.x) - float(right.x)
dy = float(left.y) - float(right.y)
dz = float(left.z) - float(right.z)
return (dx * dx + dy * dy + dz * dz) ** 0.5
def _vector_close(left, right, tolerance=0.000001):
return _distance(left, right) <= tolerance
def _point_payload(point):
return {
"x": float(point.x),
"y": float(point.y),
"z": float(point.z),
}
def _route_length(points):
total = 0.0
normalized = [_vector(point) for point in points or []]
for index in range(len(normalized) - 1):
total += _distance(normalized[index], normalized[index + 1])
return total
def _is_finite_point(point):
try:
return all(
math.isfinite(float(getattr(point, axis, 0.0)))
for axis in ("x", "y", "z")
)
except Exception:
return False
def _append_unique(points, point):
vector = _vector(point)
if not _is_finite_point(vector):
return
if not points or not _vector_close(points[-1], vector):
points.append(vector)
def _axis_value(point, axis):
return float(getattr(point, axis, 0.0))
def _with_axis(point, axis, value):
return App.Vector(
float(value) if axis == "x" else float(point.x),
float(value) if axis == "y" else float(point.y),
float(value) if axis == "z" else float(point.z),
)
def _auto_lane_axis(route_points):
points = [_vector(point) for point in route_points or []]
if len(points) < 2:
return "y"
extents = {"x": 0.0, "y": 0.0, "z": 0.0}
for index in range(len(points) - 1):
start = points[index]
end = points[index + 1]
extents["x"] += abs(float(end.x) - float(start.x))
extents["y"] += abs(float(end.y) - float(start.y))
extents["z"] += abs(float(end.z) - float(start.z))
dominant_axis = max(extents, key=lambda axis: extents[axis])
if dominant_axis == "y":
return "x"
if dominant_axis == "x":
return "y"
return "x"
def _lane_payload(route_index, options, route_points=None):
opts = options or {}
lane_axis = (opts.get("lane_axis") or "y").lower()
if lane_axis == "auto":
lane_axis = _auto_lane_axis(route_points)
if lane_axis not in {"x", "y", "z"}:
lane_axis = "y"
lane_index = max(int(route_index or 0), 0)
lane_spacing = float(opts.get("lane_spacing", 0.0) or 0.0)
if lane_index <= 0:
lane_offset = 0.0
else:
lane_order = (lane_index + 1) // 2
lane_direction = 1.0 if lane_index % 2 == 1 else -1.0
lane_offset = float(lane_order) * lane_spacing * lane_direction
return {
"index": lane_index,
"axis": lane_axis,
"spacing_mm": lane_spacing,
"offset_mm": lane_offset,
}
def _apply_lane_offset(points, lane):
offset = float((lane or {}).get("offset_mm", 0.0) or 0.0)
if abs(offset) <= 0.000001:
return list(points or [])
axis = (lane or {}).get("axis", "y")
return [
_with_axis(point, axis, _axis_value(point, axis) + offset)
for point in list(points or [])
]
def _orthogonal_points(start_point, end_point, preferred_axis=None):
if _vector_close(start_point, end_point):
return [start_point]
# 每一段只沿一个坐标轴移动,这样生成的线天然是机柜布线常见的折线。
axis_order = sorted(
("x", "y", "z"),
key=lambda axis: abs(_axis_value(end_point, axis) - _axis_value(start_point, axis)),
reverse=True,
)
if preferred_axis in {"x", "y", "z"}:
axis_order = [axis for axis in axis_order if axis != preferred_axis]
axis_order.append(preferred_axis)
points = [start_point]
current = start_point
for axis in axis_order:
target = _axis_value(end_point, axis)
if abs(_axis_value(current, axis) - target) <= 0.000001:
continue
current = _with_axis(current, axis, target)
_append_unique(points, current)
_append_unique(points, end_point)
return points
def _append_orthogonal(points, target_point, preferred_axis=None):
if not points:
_append_unique(points, target_point)
return
segment = _orthogonal_points(points[-1], _vector(target_point), preferred_axis)
for point in segment[1:]:
_append_unique(points, point)
def _collinear_points(first, middle, last):
ax = float(middle.x) - float(first.x)
ay = float(middle.y) - float(first.y)
az = float(middle.z) - float(first.z)
bx = float(last.x) - float(middle.x)
by = float(last.y) - float(middle.y)
bz = float(last.z) - float(middle.z)
cross_x = ay * bz - az * by
cross_y = az * bx - ax * bz
cross_z = ax * by - ay * bx
dot = ax * bx + ay * by + az * bz
return (
abs(cross_x) <= 0.000001
and abs(cross_y) <= 0.000001
and abs(cross_z) <= 0.000001
and dot >= -0.000001
)
def _route_point_key(point, tolerance=0.001):
scale = 1.0 / float(tolerance or 0.001)
return (
int(round(float(point.x) * scale)),
int(round(float(point.y) * scale)),
int(round(float(point.z) * scale)),
)
def _simplify_collinear_points(points, preserved_point_keys=None):
normalized = [_vector(point) for point in points or [] if _is_finite_point(_vector(point))]
if len(normalized) <= 2:
return normalized
preserved_indices = {0, 1, len(normalized) - 2, len(normalized) - 1}
preserved_point_keys = set(preserved_point_keys or [])
simplified = [normalized[0]]
simplified_indices = [0]
for index, point in enumerate(normalized[1:], start=1):
_append_unique(simplified, point)
if len(simplified_indices) < len(simplified):
simplified_indices.append(index)
while len(simplified) >= 3 and _collinear_points(
simplified[-3],
simplified[-2],
simplified[-1],
):
if (
simplified_indices[-2] in preserved_indices
or _route_point_key(simplified[-2]) in preserved_point_keys
):
break
simplified.pop(-2)
simplified_indices.pop(-2)
return simplified
def _important_route_node_keys(network, path_keys, path_result):
edges = network.get("edges", {}) if isinstance(network, dict) else {}
important = {
key
for key in path_keys or []
if len(edges.get(key, []) or []) != 2
}
segments = path_result.get("segments", []) if isinstance(path_result, dict) else []
for index in range(1, len(path_keys or []) - 1):
previous_segment = segments[index - 1] if index - 1 < len(segments) else {}
next_segment = segments[index] if index < len(segments) else {}
previous_carrier = (previous_segment.get("carrier") or {}).get("name", "")
next_carrier = (next_segment.get("carrier") or {}).get("name", "")
if previous_carrier != next_carrier:
important.add(path_keys[index])
return important
def _offset(point, direction, distance):
return App.Vector(
float(point.x) + float(direction.x) * float(distance),
float(point.y) + float(direction.y) * float(distance),
float(point.z) + float(direction.z) * float(distance),
)
def _terminal_origin(terminal):
return _vector(TerminalObjects.terminal_origin(terminal))
def _terminal_direction(terminal):
try:
return _vector(TerminalObjects.terminal_direction(terminal))
except Exception:
return App.Vector(0, 0, 1)
def _project_uuid(doc, start_terminal=None, end_terminal=None):
for obj in (start_terminal, end_terminal):
value = (getattr(obj, "QetProjectUuid", "") or "").strip()
if value:
return value
try:
return (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
return ""
def index_terminals(doc):
"""Return {terminal_uuid: terminal_object} for routable engineering terminals."""
if doc is None:
return {}
terminals = []
root = None
try:
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
except Exception:
root = None
if root is not None:
terminals.extend(TerminalObjects.collect_terminal_objects(root))
terminals.extend(
obj
for obj in list(getattr(doc, "Objects", []) or [])
if TerminalObjects.is_terminal_object(obj)
)
indexed = {}
for terminal in terminals:
terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip()
if terminal_uuid and terminal_uuid not in indexed:
indexed[terminal_uuid] = terminal
return indexed
def _normalized_match_token(value):
return (value or "").strip().lower().replace(" ", "")
def _device_group_for_wire_endpoint(doc, instance_id, element_uuid):
device_group = TerminalObjects.find_device_group_by_instance_id(doc, instance_id)
if device_group is None:
device_group = TerminalObjects.find_device_group(doc, element_uuid)
return device_group
def _terminal_match_tokens(obj):
tokens = []
for value in (
getattr(obj, "QetTemplateSlotName", ""),
getattr(obj, "QetTerminalLabel", ""),
getattr(obj, "Label", ""),
getattr(obj, "Name", ""),
):
token = _normalized_match_token(value)
if token and token not in tokens:
tokens.append(token)
return tokens
def _slot_match_tokens(slot):
tokens = []
for value in (
slot.get("name", ""),
slot.get("label", ""),
):
token = _normalized_match_token(value)
if token and token not in tokens:
tokens.append(token)
return tokens
def _matching_local_terminal(terminal_group, terminal_display, used_objects):
local_terminals = []
display_token = _normalized_match_token(terminal_display)
for terminal in TerminalObjects.collect_terminal_objects(terminal_group):
if terminal in used_objects:
continue
terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip()
if not TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
local_terminals.append(terminal)
if display_token and display_token in _terminal_match_tokens(terminal):
return terminal
if not display_token and len(local_terminals) == 1:
return local_terminals[0]
return None
def _matching_template_slot(device_group, terminal_display, used_slot_tokens):
display_token = _normalized_match_token(terminal_display)
if not display_token:
return None
for slot in TemplateSemantics.collect_terminal_hints(device_group):
slot_tokens = _slot_match_tokens(slot)
if display_token not in slot_tokens:
continue
slot_token = slot_tokens[0] if slot_tokens else ""
if slot_token and slot_token in used_slot_tokens:
continue
return slot
return None
def _slot_placement(slot):
base = slot.get("base")
if not isinstance(base, App.Vector):
base = App.Vector(0, 0, 0)
rotation = App.Rotation()
rotation_value = slot.get("rotation")
if isinstance(rotation_value, dict):
axis = rotation_value.get("axis")
angle = rotation_value.get("angle")
if isinstance(axis, App.Vector) and angle is not None:
try:
rotation = App.Rotation(axis, float(angle))
except Exception:
rotation = App.Rotation()
return App.Placement(base, rotation)
def _wire_endpoint_entries(payload):
entries = []
seen = set()
for item in payload.get("wires", []) or []:
if not isinstance(item, dict):
continue
for prefix in ("start", "end"):
terminal_uuid = _wire_item_value(item, "{0}_terminal_uuid".format(prefix))
if not terminal_uuid or TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
if terminal_uuid in seen:
continue
seen.add(terminal_uuid)
entries.append(
{
"terminal_uuid": terminal_uuid,
"element_uuid": _wire_item_value(item, "{0}_element_uuid".format(prefix)),
"instance_id": _wire_item_value(item, "{0}_instance_id".format(prefix)),
"terminal_display": _wire_item_value(
item,
"{0}_terminal_display".format(prefix),
"{0}_terminal_label".format(prefix),
),
}
)
return entries
def _bind_wire_task_terminals(doc, payload):
"""Promote matching local template terminals to QET terminal UUIDs before routing."""
report = {
"bound": 0,
"created": 0,
"skipped": 0,
"warnings": [],
}
if doc is None or not isinstance(payload, dict):
return report
project_uuid = (payload.get("project_uuid") or "").strip()
if not project_uuid:
try:
project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
project_uuid = ""
indexed = index_terminals(doc)
used_objects = set()
used_slot_tokens = set()
for entry in _wire_endpoint_entries(payload):
terminal_uuid = entry["terminal_uuid"]
if terminal_uuid in indexed:
continue
device_group = _device_group_for_wire_endpoint(
doc,
entry.get("instance_id", ""),
entry.get("element_uuid", ""),
)
if device_group is None:
report["skipped"] += 1
report["warnings"].append(
"端子 {0} 找不到所属 3D 设备实例。".format(terminal_uuid)
)
continue
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
terminal_group = TerminalObjects.ensure_terminal_group(
doc,
device_group,
project_uuid=project_uuid,
instance_id=instance_id,
)
terminal_display = entry.get("terminal_display", "")
terminal_obj = _matching_local_terminal(terminal_group, terminal_display, used_objects)
if terminal_obj is None:
slot = _matching_template_slot(device_group, terminal_display, used_slot_tokens)
if slot is None:
report["skipped"] += 1
report["warnings"].append(
"端子 {0} 没有匹配到模板槽位 {1}".format(
terminal_uuid,
terminal_display or "<empty>",
)
)
continue
slot_name = (slot.get("name") or terminal_display or terminal_uuid).strip()
terminal_obj = TerminalObjects.create_lcs_object(
doc,
"QETTerminal_{0}".format(TerminalObjects.safe_token(terminal_uuid)),
placement=_slot_placement(slot),
label=terminal_display or terminal_uuid,
)
terminal_group.addObject(terminal_obj)
source_obj = slot.get("source_object")
if source_obj is not None:
try:
source_obj.ViewObject.Visibility = False
except Exception:
pass
report["created"] += 1
else:
slot_name = (getattr(terminal_obj, "QetTemplateSlotName", "") or terminal_display).strip()
report["bound"] += 1
TerminalObjects.set_terminal_semantics(
terminal_obj,
project_uuid,
element_uuid,
terminal_uuid,
instance_id,
label=terminal_display or getattr(terminal_obj, "Label", "") or terminal_uuid,
slot_name=slot_name,
)
used_objects.add(terminal_obj)
slot_token = _normalized_match_token(slot_name)
if slot_token:
used_slot_tokens.add(slot_token)
if report["bound"] or report["created"]:
try:
doc.recompute()
except Exception:
pass
return report
def _wire_object_name(start_terminal, end_terminal, wire_uuid=""):
if wire_uuid:
return "QETRoutedConnection_{0}".format(TerminalObjects.safe_token(wire_uuid))
return "QETRoutedConnection_{0}_{1}".format(
TerminalObjects.safe_token(getattr(start_terminal, "QetTerminalUuid", "")),
TerminalObjects.safe_token(getattr(end_terminal, "QetTerminalUuid", "")),
)
def _unique_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _create_wire_geometry(doc, name, points):
# Use a plain Part edge for generated auto wires. Draft wires are convenient for
# editing, but in large imported assemblies they can trigger view-provider redraw
# glitches while rotating the 3D scene.
try:
import Part
obj = doc.addObject("Part::Feature", name)
obj.Shape = Part.makePolygon(points)
_set_points(obj, points)
return obj
except Exception:
pass
if getattr(App, "ActiveDocument", None) is doc:
try:
import Draft
obj = Draft.make_wire(
points,
closed=False,
placement=None,
face=False,
support=None,
bs2wire=False,
)
if obj is not None:
try:
obj.MakeFace = False
except Exception:
pass
_set_points(obj, points)
return obj
except Exception:
pass
obj = doc.addObject("App::FeaturePython", name)
_set_points(obj, points)
return obj
def _set_points(obj, points):
try:
if "Points" not in getattr(obj, "PropertiesList", []):
obj.addProperty("App::PropertyVectorList", "Points", "QET Wiring", "Route points")
obj.Points = list(points)
except Exception:
pass
def _set_string(obj, name, value, description="Routing connection property"):
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
def _clean_endpoint_metadata(endpoint_metadata):
if not isinstance(endpoint_metadata, dict):
return {}
allowed = (
"start_element_uuid",
"start_terminal_display",
"start_device_label",
"end_element_uuid",
"end_terminal_display",
"end_device_label",
"endpoint_label",
)
cleaned = {}
for key in allowed:
value = str(endpoint_metadata.get(key, "") or "").strip()
if value:
cleaned[key] = value
return cleaned
def _set_endpoint_metadata(wire, endpoint_metadata):
metadata = _clean_endpoint_metadata(endpoint_metadata)
property_names = {
"start_element_uuid": "QetStartElementUuid",
"start_terminal_display": "QetStartTerminalDisplay",
"start_device_label": "QetStartDeviceLabel",
"end_element_uuid": "QetEndElementUuid",
"end_terminal_display": "QetEndTerminalDisplay",
"end_device_label": "QetEndDeviceLabel",
"endpoint_label": "QetEndpointLabel",
}
for key, prop_name in property_names.items():
if key in metadata:
_set_string(wire, prop_name, metadata[key], "QET routed wire endpoint metadata")
return metadata
def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=None):
points = route_data.get("points", [])
payload = {
"algorithm": route_data.get("algorithm", ""),
"length_mm": _route_length(points),
"wire_style_id": str(wire_style_id or "").strip(),
"lane": route_data.get("lane", {}),
"points": [_point_payload(point) for point in points],
"collision_count": len(collisions),
"collisions": collisions,
"network": route_data.get("network", {}),
"route_track": route_data.get("route_track", {}),
}
metadata = _clean_endpoint_metadata(endpoint_metadata)
if metadata:
payload["endpoint_metadata"] = metadata
return payload
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id="", endpoint_metadata=None):
length_mm = _route_length(route_data.get("points", []))
cleaned_endpoint_metadata = _set_endpoint_metadata(wire, endpoint_metadata)
_set_string(
wire,
"QetRouteAlgorithm",
route_data.get("algorithm", ""),
"Routing connection algorithm used for this wire",
)
_set_string(
wire,
"QetRouteLengthMm",
"{0:.3f}".format(length_mm),
"Routing connection length in millimeters",
)
_set_string(
wire,
"QetWireStyleId",
str(wire_style_id or "").strip(),
"QET wire style ID",
)
_set_string(
wire,
"QetRouteDiagnosticsJson",
json.dumps(
_route_payload(
route_data,
collisions,
wire_style_id=wire_style_id,
endpoint_metadata=cleaned_endpoint_metadata,
),
ensure_ascii=False,
),
"Routing connection diagnostics",
)
if route_data.get("network"):
_set_string(
wire,
"QetRouteNetworkJson",
json.dumps(route_data.get("network", {}), ensure_ascii=False),
"Route network metadata used by this wire",
)
if route_data.get("route_track"):
_set_string(
wire,
"QetRouteTrackJson",
json.dumps(route_data.get("route_track", {}), ensure_ascii=False),
"Routing carriers passed through by this wire",
)
def build_network_route(start_terminal, end_terminal, route_index=0, options=None, doc=None):
opts = _merged_options(options)
if not opts.get("use_routing_network", True):
return None
if doc is None:
doc = getattr(start_terminal, "Document", None) or getattr(App, "ActiveDocument", None)
if doc is None:
return None
exit_length = max(float(opts.get("terminal_exit_length", 0.0) or 0.0), 0.0)
start_origin = _terminal_origin(start_terminal)
end_origin = _terminal_origin(end_terminal)
start_exit = _offset(start_origin, _terminal_direction(start_terminal), exit_length)
end_exit = _offset(end_origin, _terminal_direction(end_terminal), exit_length)
def route_on_network(network, obstacle_aware=False):
if network.get("segment_count", 0) <= 0:
return None
start_key, start_distance, start_mode = RoutingNetwork.connect_point_to_network(network, start_exit)
end_key, end_distance, end_mode = RoutingNetwork.connect_point_to_network(network, end_exit)
if start_key is None or end_key is None:
return None
max_distance = float(opts.get("network_entry_max_distance", 0.0) or 0.0)
if max_distance > 0.0 and (
float(start_distance or 0.0) > max_distance
or float(end_distance or 0.0) > max_distance
):
return None
path_result = RoutingNetwork.shortest_path_with_carriers(
network,
start_key,
end_key,
bend_penalty=float(opts.get("bend_penalty", 0.0) or 0.0),
kind_cost_factors=opts.get("carrier_kind_cost_factors", {}),
segment_usage_costs=opts.get("segment_usage_costs", {}),
segment_reuse_penalty=float(opts.get("segment_reuse_penalty", 0.0) or 0.0),
)
path_keys = path_result.get("path", []) if isinstance(path_result, dict) else []
if not path_keys:
return None
carrier_points = RoutingNetwork.path_points(network, path_keys)
if not carrier_points:
return None
lane = _lane_payload(route_index, opts, route_points=carrier_points)
carrier_points = _apply_lane_offset(carrier_points, lane)
points = []
_append_unique(points, start_origin)
_append_unique(points, start_exit)
_append_orthogonal(points, carrier_points[0])
for point in carrier_points[1:]:
_append_unique(points, point)
_append_orthogonal(points, end_exit)
_append_unique(points, end_origin)
points = _simplify_collinear_points(
points,
preserved_point_keys=_important_route_node_keys(network, path_keys, path_result),
)
return {
"algorithm": "network-dijkstra-v1",
"points": points,
"network": {
"carriers": int(network.get("carrier_count", 0)),
"segments": int(network.get("segment_count", 0)),
"bridged_segments": int(network.get("bridged_segment_count", 0)),
"blocked_segments": int(network.get("blocked_segment_count", 0)),
"nodes": len(network.get("nodes", {})),
"entry_distance": float(start_distance or 0.0),
"exit_distance": float(end_distance or 0.0),
"entry_point_mode": start_mode,
"exit_point_mode": end_mode,
"obstacle_aware": bool(obstacle_aware),
},
"route_track": path_result,
"lane": lane,
}
use_obstacle_avoidance = bool(opts.get("avoid_obstacles", True))
obstacles = []
if use_obstacle_avoidance:
obstacles = collect_obstacles(doc, exclude=[start_terminal, end_terminal], options=opts)
blocked_bboxes = [obstacle["bbox"] for obstacle in obstacles if obstacle.get("bbox")]
if blocked_bboxes:
obstacle_aware_network = RoutingNetwork.build_route_graph(
doc,
blocked_bboxes=blocked_bboxes,
adjoining_duct_tolerance=float(opts.get("adjoining_duct_tolerance", 0.0) or 0.0),
)
route_data = route_on_network(obstacle_aware_network, obstacle_aware=True)
if route_data is not None:
return route_data
network = RoutingNetwork.build_route_graph(
doc,
adjoining_duct_tolerance=float(opts.get("adjoining_duct_tolerance", 0.0) or 0.0),
)
return route_on_network(network, obstacle_aware=False)
def _is_group(obj):
try:
return bool(obj.isDerivedFrom("App::DocumentObjectGroup"))
except Exception:
return False
def _is_origin_helper(obj):
type_id = (getattr(obj, "TypeId", "") or "").lower()
name = (getattr(obj, "Name", "") or "").lower()
label = (getattr(obj, "Label", "") or "").lower()
compact_name = "".join(ch for ch in name if not ch.isdigit()).replace("-", "_")
compact_label = "".join(ch for ch in label if not ch.isdigit()).replace("-", "_")
helper_names = {
"origin",
"xy_plane",
"xz_plane",
"yz_plane",
"x_axis",
"y_axis",
"z_axis",
}
if "origin" in type_id or compact_name in helper_names:
return True
return compact_label in helper_names or compact_label.replace(" ", "_") in helper_names
def _bbox_payload(obj, clearance=0.0):
shape = getattr(obj, "Shape", None)
bbox = getattr(shape, "BoundBox", None)
if bbox is None:
return None
return {
"xmin": float(bbox.XMin) - clearance,
"xmax": float(bbox.XMax) + clearance,
"ymin": float(bbox.YMin) - clearance,
"ymax": float(bbox.YMax) + clearance,
"zmin": float(bbox.ZMin) - clearance,
"zmax": float(bbox.ZMax) + clearance,
}
def _collect_group_tree_ids(root):
excluded = set()
stack = [root]
while stack:
obj = stack.pop()
if obj is None or id(obj) in excluded:
continue
excluded.add(id(obj))
stack.extend(list(getattr(obj, "Group", []) or []))
return excluded
def _expanded_obstacle_exclusion_ids(doc, exclude):
excluded = set(id(obj) for obj in (exclude or []) if obj is not None)
endpoint_instance_ids = {
(getattr(obj, "QetInstanceId", "") or "").strip()
for obj in (exclude or [])
if obj is not None and (getattr(obj, "QetInstanceId", "") or "").strip()
}
if not endpoint_instance_ids:
return excluded
for obj in list(getattr(doc, "Objects", []) or []):
instance_id = (getattr(obj, "QetInstanceId", "") or "").strip()
parent_instance_ids = {
(getattr(parent, "QetInstanceId", "") or "").strip()
for parent in list(getattr(obj, "InList", []) or [])
if (getattr(parent, "QetInstanceId", "") or "").strip()
}
if instance_id in endpoint_instance_ids or parent_instance_ids.intersection(endpoint_instance_ids):
excluded.update(_collect_group_tree_ids(obj))
excluded.add(id(obj))
return excluded
def _distance_point_to_bbox(point, bbox):
squared = 0.0
for axis, min_key, max_key in (
("x", "xmin", "xmax"),
("y", "ymin", "ymax"),
("z", "zmin", "zmax"),
):
value = _axis_value(point, axis)
low = float(bbox[min_key])
high = float(bbox[max_key])
if value < low:
squared += (low - value) * (low - value)
elif value > high:
squared += (value - high) * (value - high)
return math.sqrt(squared)
def collect_obstacles(doc, exclude=None, options=None):
opts = _merged_options(options)
excluded = _expanded_obstacle_exclusion_ids(doc, exclude)
clearance = float(opts.get("obstacle_clearance", 0.0) or 0.0)
endpoint_clearance = max(float(opts.get("terminal_exit_length", 0.0) or 0.0), 0.0) + clearance
endpoint_points = []
for obj in exclude or []:
if obj is not None and TerminalObjects.is_terminal_object(obj):
endpoint_points.append(_terminal_origin(obj))
obstacles = []
for obj in list(getattr(doc, "Objects", []) or []):
if id(obj) in excluded:
continue
obstacle_mode = (getattr(obj, "QetRoutingObstacleMode", "") or "").strip()
if obstacle_mode in {"PassThrough", "WireDuctPassThrough", "SupportSurface"}:
continue
if _is_group(obj) or _is_origin_helper(obj):
continue
if TerminalObjects.is_lcs_like(obj) or TerminalObjects.is_terminal_object(obj):
continue
if RoutingNetwork.is_route_carrier(obj) or WiringObjects.is_routed_wire_object(obj):
continue
raw_bbox = _bbox_payload(obj, clearance=0.0)
bbox = _bbox_payload(obj, clearance=clearance)
if bbox is None:
continue
if endpoint_points and any(
_distance_point_to_bbox(point, bbox) <= endpoint_clearance
for point in endpoint_points
):
continue
obstacles.append(
{
"name": getattr(obj, "Name", ""),
"label": getattr(obj, "Label", ""),
"type_id": getattr(obj, "TypeId", ""),
"bbox": bbox,
"raw_bbox": raw_bbox or bbox,
}
)
return obstacles
def _segment_intersects_bbox(start, end, bbox):
# Slab intersection: 把线段参数化为 start + t*(end-start)t 在 [0, 1] 内相交即命中。
t_min = 0.0
t_max = 1.0
for axis, min_key, max_key in (
("x", "xmin", "xmax"),
("y", "ymin", "ymax"),
("z", "zmin", "zmax"),
):
start_value = _axis_value(start, axis)
end_value = _axis_value(end, axis)
delta = end_value - start_value
low = float(bbox[min_key])
high = float(bbox[max_key])
if abs(delta) <= 0.000001:
if start_value < low or start_value > high:
return False
continue
inv = 1.0 / delta
near = (low - start_value) * inv
far = (high - start_value) * inv
if near > far:
near, far = far, near
t_min = max(t_min, near)
t_max = min(t_max, far)
if t_min > t_max:
return False
return True
def detect_collisions(points, obstacles, ignored_segment_indices=None):
ignored = set(ignored_segment_indices or [])
collisions = []
for index in range(max(len(points) - 1, 0)):
if index in ignored:
continue
start = points[index]
end = points[index + 1]
for obstacle in obstacles:
if _segment_intersects_bbox(start, end, obstacle["bbox"]):
raw_bbox = obstacle.get("raw_bbox") or obstacle.get("bbox") or {}
collision_kind = "HardIntersection"
if raw_bbox and not _segment_intersects_bbox(start, end, raw_bbox):
collision_kind = "ClearanceWarning"
collisions.append(
{
"segment_index": index,
"segment_start": _point_payload(start),
"segment_end": _point_payload(end),
"collision_kind": collision_kind,
"obstacle_name": obstacle.get("name", ""),
"obstacle_label": obstacle.get("label", ""),
"obstacle_bbox": dict(raw_bbox),
"collision_bbox": dict(obstacle.get("bbox", {}) or {}),
}
)
return collisions
def _endpoint_collision_segment_indices(points):
segment_count = max(len(points or []) - 1, 0)
if segment_count <= 0:
return set()
ignored = {0}
if segment_count > 1:
ignored.add(segment_count - 1)
return ignored
def _detach_object_from_groups(doc, obj):
parents = list(getattr(obj, "InList", []) or [])
parents.extend(list(getattr(doc, "Objects", []) or []))
for parent in parents:
group = list(getattr(parent, "Group", []) or [])
if obj not in group:
continue
try:
if hasattr(parent, "removeObject"):
parent.removeObject(obj)
else:
parent.Group = [child for child in group if child is not obj]
except Exception:
try:
parent.Group = [child for child in group if child is not obj]
except Exception:
pass
def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
matches = []
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection":
continue
if wire_uuid:
if (getattr(obj, "QetWireUuid", "") or "").strip() != wire_uuid:
continue
else:
same_direction = (
(getattr(obj, "QetStartTerminalUuid", "") or "").strip() == start_uuid
and (getattr(obj, "QetEndTerminalUuid", "") or "").strip() == end_uuid
)
reverse_direction = (
(getattr(obj, "QetStartTerminalUuid", "") or "").strip() == end_uuid
and (getattr(obj, "QetEndTerminalUuid", "") or "").strip() == start_uuid
)
if not same_direction and not reverse_direction:
continue
matches.append(obj)
return matches
def _remove_routing_connection_objects(doc, objects):
removed = 0
for obj in list(objects or []):
try:
_detach_object_from_groups(doc, obj)
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
return _remove_routing_connection_objects(
doc,
_matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid),
)
def _find_task_by_wire_uuid(doc, wire_uuid):
if not wire_uuid:
return None
try:
task_group = doc.getObject("QETWiring_01_Tasks")
except Exception:
task_group = None
if task_group is None:
return None
for task in list(getattr(task_group, "Group", []) or []):
if (getattr(task, "QetWireUuid", "") or "").strip() == wire_uuid:
return task
return None
def _set_task_status(task, status):
if task is None:
return
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
status,
)
def _style_wire(wire, collision_count=0):
try:
wire.ViewObject.Visibility = True
wire.ViewObject.LineWidth = 5.0
if hasattr(wire.ViewObject, "DrawStyle"):
wire.ViewObject.DrawStyle = "Solid"
if hasattr(wire.ViewObject, "DisplayMode"):
wire.ViewObject.DisplayMode = "Wireframe"
if collision_count:
wire.ViewObject.LineColor = (1.0, 0.1, 0.0)
else:
wire.ViewObject.LineColor = (0.0, 0.35, 1.0)
except Exception:
pass
def route_eplan_connection_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=0,
options=None,
wire_uuid="",
wire_label="",
net_uuid="",
group_uuid="",
wire_mark="",
wire_mark_is_manual=False,
wire_style_id="",
endpoint_metadata=None,
):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not TerminalObjects.is_terminal_object(start_terminal):
raise AutoRoutingError("Start object is not a routable terminal.")
if not TerminalObjects.is_terminal_object(end_terminal):
raise AutoRoutingError("End object is not a routable terminal.")
if start_terminal == end_terminal:
raise AutoRoutingError("Start and end terminal must be different.")
opts = _merged_options(options)
effective_wire_style_id = str(wire_style_id or opts.get("wire_style_id", "") or "").strip()
start_uuid = (getattr(start_terminal, "QetTerminalUuid", "") or "").strip()
end_uuid = (getattr(end_terminal, "QetTerminalUuid", "") or "").strip()
project_uuid = _project_uuid(doc, start_terminal, end_terminal)
if not project_uuid:
raise AutoRoutingError("Project UUID is required for routing connections.")
route_data = build_network_route(
start_terminal,
end_terminal,
route_index=route_index,
options=opts,
doc=doc,
)
if route_data is None:
raise AutoRoutingError(
"没有可用的布线路径网络;请先生成布线布局空间和布线路径网络。"
)
points = route_data.get("points", [])
if len(points) < 2:
raise AutoRoutingError("Routing connection produced fewer than two points.")
obstacles = collect_obstacles(doc, exclude=[start_terminal, end_terminal], options=opts)
ignored_collision_segments = set()
if opts.get("ignore_endpoint_collision_segments", True):
ignored_collision_segments = _endpoint_collision_segment_indices(points)
collisions = detect_collisions(points, obstacles, ignored_segment_indices=ignored_collision_segments)
status = "CollisionWarning" if collisions else "Routed"
existing_replacements = []
if opts.get("replace_existing", True):
existing_replacements = _matching_existing_routing_connections(
doc,
start_uuid,
end_uuid,
wire_uuid=wire_uuid,
)
wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid))
wire = None
try:
wire = _create_wire_geometry(doc, wire_name, points)
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="RoutedConnection",
route_status=status,
route_mode="EplanRoute",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(
wire,
route_data,
collisions,
wire_style_id=effective_wire_style_id,
endpoint_metadata=endpoint_metadata,
)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try:
routed_group.ViewObject.Visibility = True
except Exception:
pass
_style_wire(wire, collision_count=len(collisions))
task = _find_task_by_wire_uuid(doc, wire_uuid)
_set_task_status(task, status)
except Exception:
if wire is not None:
_remove_routing_connection_objects(doc, [wire])
raise
if existing_replacements:
removed_existing = _remove_routing_connection_objects(doc, existing_replacements)
if removed_existing != len(existing_replacements):
if wire is not None:
_remove_routing_connection_objects(doc, [wire])
raise AutoRoutingError("Failed to replace existing routed connection.")
try:
doc.recompute()
except Exception:
pass
return {
"wire": wire,
"route_status": status,
"algorithm": route_data.get("algorithm", ""),
"network": route_data.get("network", {}),
"route_track": route_data.get("route_track", {}),
"points": points,
"lane": route_data.get("lane", {}),
"length_mm": _route_length(points),
"collision_count": len(collisions),
"collisions": collisions,
}
def _wire_item_value(item, *names):
if not isinstance(item, dict):
return ""
for name in names:
value = item.get(name, "")
if value is not None and str(value).strip():
return str(value).strip()
return ""
def _route_lane_key(start_uuid, end_uuid):
endpoints = sorted(
value
for value in (
str(start_uuid or "").strip(),
str(end_uuid or "").strip(),
)
if value
)
return tuple(endpoints)
def _route_segment_key(segment):
if not isinstance(segment, dict):
return None
carrier = segment.get("carrier") or {}
carrier_name = str(carrier.get("name", "") or "").strip()
from_key = tuple(segment.get("from_key", []) or [])
to_key = tuple(segment.get("to_key", []) or [])
if not from_key or not to_key:
return None
return (
carrier_name,
tuple(sorted((from_key, to_key))),
)
def _route_segment_keys(result):
route_track = result.get("route_track", {}) if isinstance(result, dict) else {}
return _route_track_segment_keys(route_track)
def _route_track_segment_keys(route_track):
segments = route_track.get("segments", []) if isinstance(route_track, dict) else []
keys = []
for segment in segments or []:
key = _route_segment_key(segment)
if key is not None:
keys.append(key)
return keys
def _incoming_wire_uuids(wires):
wire_uuids = set()
for item in wires or []:
if not isinstance(item, dict):
continue
wire_uuid = _wire_item_value(item, "wire_id", "wire_uuid", "id")
if wire_uuid:
wire_uuids.add(wire_uuid)
return wire_uuids
def _existing_routed_segment_usage(doc, excluded_wire_uuids=None):
excluded_wire_uuids = set(excluded_wire_uuids or [])
usage = {}
for wire in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(wire, "RouteType", "") or "").strip() != "RoutedConnection":
continue
wire_uuid = (getattr(wire, "QetWireUuid", "") or "").strip()
if wire_uuid and wire_uuid in excluded_wire_uuids:
continue
try:
route_track = json.loads((getattr(wire, "QetRouteTrackJson", "") or "").strip() or "{}")
except Exception:
route_track = {}
for segment_key in _route_track_segment_keys(route_track):
usage[segment_key] = usage.get(segment_key, 0) + 1
return usage
def bind_wire_task_terminals_from_payload(doc, payload):
"""Bind local template terminals to QET terminal UUIDs without creating wires."""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not isinstance(payload, dict):
raise AutoRoutingError("Exchange payload must be an object.")
binding_report = _bind_wire_task_terminals(doc, payload)
terminals = index_terminals(doc)
wires = payload.get("wires", []) or []
endpoints = _wire_endpoint_entries(payload)
binding_report.update(
{
"total_wires": len(wires),
"endpoint_terminals": len(endpoints),
"available_terminals": len(terminals),
"local_terminals": sum(
1
for terminal_uuid in terminals
if TerminalObjects.is_local_terminal_uuid(terminal_uuid)
),
}
)
return binding_report
def format_terminal_binding_report(report):
message = "工程端子检查/绑定完成:更新 {0} 个,新建 {1} 个,跳过 {2} 个;当前端子 {3} 个,本地端子 {4} 个。".format(
report.get("bound", 0),
report.get("created", 0),
report.get("skipped", 0),
report.get("available_terminals", 0),
report.get("local_terminals", 0),
)
warnings = report.get("warnings", []) or []
if warnings:
message += "\n首个问题:{0}".format(warnings[0])
if report.get("total_wires", 0) <= 0:
message += "\n没有导线任务,无法按 QET terminal_uuid 绑定工程端子。"
return message
def route_eplan_connections_from_payload(doc, payload, options=None, prepared_layout=None):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not isinstance(payload, dict):
raise AutoRoutingError("Exchange payload must be an object.")
terminal_binding_report = bind_wire_task_terminals_from_payload(doc, payload)
terminals = index_terminals(doc)
local_terminal_count = sum(
1
for terminal_uuid in terminals
if TerminalObjects.is_local_terminal_uuid(terminal_uuid)
)
wires = payload.get("wires", []) or []
report = {
"total_wires": len(wires),
"available_terminals": len(terminals),
"local_terminals": local_terminal_count,
"auto_bound_terminals": terminal_binding_report["bound"],
"auto_created_terminals": terminal_binding_report["created"],
"auto_terminal_binding_warnings": terminal_binding_report["warnings"],
"routed": 0,
"collision_warnings": 0,
"total_length_mm": 0.0,
"skipped_missing_terminal": 0,
"skipped_invalid": 0,
"missing_endpoint_uuids": [],
"missing_endpoint_samples": [],
"collision_samples": [],
"errors": [],
"error_samples": [],
"route_status_counts": {},
"routes": [],
}
if isinstance(prepared_layout, dict):
report["prepared_layout"] = prepared_layout
missing_endpoint_uuids = set()
lane_indexes_by_pair = {}
lane_indexes_by_segment = {}
segment_usage_costs = _existing_routed_segment_usage(
doc,
excluded_wire_uuids=_incoming_wire_uuids(wires),
)
def add_status(status):
key = str(status or "").strip() or "Unknown"
report["route_status_counts"][key] = report["route_status_counts"].get(key, 0) + 1
def create_route(route_lane_index, item, start_terminal, end_terminal, endpoint_metadata):
route_options = dict(options or {})
if isinstance(item, dict) and "__segment_usage_costs" in item:
route_options["segment_usage_costs"] = item.get("__segment_usage_costs", {})
return route_eplan_connection_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=route_lane_index,
options=route_options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
endpoint_metadata=endpoint_metadata,
)
for item in wires:
if not isinstance(item, dict):
report["skipped_invalid"] += 1
add_status("Invalid")
continue
start_uuid = _wire_item_value(item, "start_terminal_uuid")
end_uuid = _wire_item_value(item, "end_terminal_uuid")
start_terminal = terminals.get(start_uuid)
end_terminal = terminals.get(end_uuid)
if start_terminal is None or end_terminal is None:
report["skipped_missing_terminal"] += 1
add_status("MissingTerminal")
for terminal_uuid in (start_uuid, end_uuid):
if terminal_uuid and terminal_uuid not in terminals:
missing_endpoint_uuids.add(terminal_uuid)
# 这里只保留少量样例,避免面板状态被大量导线任务刷屏。
if len(report["missing_endpoint_samples"]) < 8:
report["missing_endpoint_samples"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"start_found": start_terminal is not None,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"end_terminal_uuid": end_uuid,
"end_found": end_terminal is not None,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
}
)
continue
lane_key = _route_lane_key(start_uuid, end_uuid)
route_lane_index = lane_indexes_by_pair.get(lane_key, 0)
try:
endpoint_metadata = {
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
}
result = create_route(
route_lane_index,
dict(item, __segment_usage_costs=segment_usage_costs),
start_terminal,
end_terminal,
endpoint_metadata,
)
route_segment_keys = _route_segment_keys(result)
shared_lane_index = max(
[lane_indexes_by_segment.get(key, 0) for key in route_segment_keys] or [0]
)
final_lane_index = max(route_lane_index, shared_lane_index)
if final_lane_index != route_lane_index:
initial_wire = result.get("wire") if isinstance(result, dict) else None
try:
result = create_route(
final_lane_index,
dict(item, __segment_usage_costs=segment_usage_costs),
start_terminal,
end_terminal,
endpoint_metadata,
)
except Exception:
if initial_wire is not None:
_remove_routing_connection_objects(doc, [initial_wire])
raise
route_segment_keys = _route_segment_keys(result)
except Exception as exc:
error_text = str(exc)
report["errors"].append(error_text)
add_status("Error")
if len(report["error_samples"]) < 8:
report["error_samples"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_terminal_uuid": end_uuid,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
"error": error_text,
}
)
continue
lane_indexes_by_pair[lane_key] = max(
lane_indexes_by_pair.get(lane_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
for segment_key in route_segment_keys:
lane_indexes_by_segment[segment_key] = max(
lane_indexes_by_segment.get(segment_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
segment_usage_costs[segment_key] = segment_usage_costs.get(segment_key, 0) + 1
if result["route_status"] == "CollisionWarning":
report["collision_warnings"] += 1
add_status(result["route_status"])
route_collision_samples = []
for collision in list(result.get("collisions", []) or [])[:3]:
sample = dict(collision)
sample.update(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"end_terminal_uuid": end_uuid,
}
)
route_collision_samples.append(sample)
if len(report["collision_samples"]) < 8:
report["collision_samples"].append(sample)
report["routed"] += 1
route_length = float(result.get("length_mm", 0.0) or 0.0)
report["total_length_mm"] += route_length
report["routes"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"wire_style_id": _wire_item_value(item, "wire_style_id"),
"start_terminal_uuid": start_uuid,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_terminal_uuid": end_uuid,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
"algorithm": result["algorithm"],
"route_status": result["route_status"],
"length_mm": route_length,
"lane": result.get("lane", {}),
"network": result.get("network", {}),
"route_track": result.get("route_track", {}),
"collision_count": result["collision_count"],
"collision_samples": route_collision_samples,
}
)
report["missing_endpoint_uuids"] = sorted(missing_endpoint_uuids)
_write_routing_connection_batch_diagnostic(doc, report)
return report
def _missing_endpoint_label(sample, side):
terminal_uuid = str(sample.get("{0}_terminal_uuid".format(side), "") or "").strip()
element_uuid = str(sample.get("{0}_element_uuid".format(side), "") or "").strip()
terminal_display = str(sample.get("{0}_terminal_display".format(side), "") or "").strip()
if element_uuid and terminal_display:
label = "{0}/{1}".format(element_uuid, terminal_display)
elif terminal_display:
label = terminal_display
elif element_uuid:
label = element_uuid
else:
return terminal_uuid
if terminal_uuid and terminal_uuid != label:
return "{0} ({1})".format(label, terminal_uuid)
return label
def _missing_endpoint_side_summary(sample):
missing_sides = []
if sample.get("start_found") is False:
missing_sides.append("起点")
if sample.get("end_found") is False:
missing_sides.append("终点")
if not missing_sides:
return ""
if len(missing_sides) == 2:
return "(缺失:两端)"
return "(缺失:{0}".format(missing_sides[0])
def _wire_sample_text(sample):
return (
str(sample.get("wire_label", "") or "").strip()
or str(sample.get("wire_uuid", "") or "").strip()
or "未知导线"
)
def _endpoint_pair_text(sample):
endpoint_label = str(sample.get("endpoint_label", "") or "").strip()
if endpoint_label:
return endpoint_label
return "{0} -> {1}".format(
_missing_endpoint_label(sample, "start"),
_missing_endpoint_label(sample, "end"),
)
def _route_source_labels(route_track, limit=5):
labels = []
seen = set()
if not isinstance(route_track, dict):
return labels
for segment in route_track.get("segments", []) or []:
carrier = segment.get("carrier", {}) if isinstance(segment, dict) else {}
if not isinstance(carrier, dict):
continue
label = (
str(carrier.get("source_label", "") or "").strip()
or str(carrier.get("source_name", "") or "").strip()
)
if not label or label in seen:
continue
seen.add(label)
labels.append(label)
if len(labels) >= int(limit or 0):
break
return labels
def _route_source_sample_text(report):
for route in report.get("routes", []) or []:
if not isinstance(route, dict):
continue
labels = _route_source_labels(route.get("route_track", {}))
if not labels:
continue
return "路径示例:导线 {0} 经过 {1}".format(
_wire_sample_text(route),
"".join(labels),
)
return ""
def _route_network_metric_max(report, key):
maximum = 0
for route in report.get("routes", []) or []:
if not isinstance(route, dict):
continue
network = route.get("network", {})
if not isinstance(network, dict):
continue
try:
maximum = max(maximum, int(network.get(key, 0) or 0))
except Exception:
continue
return maximum
def _route_lane_summary(report):
max_lane_index = 0
lane_spacing = 0.0
for route in report.get("routes", []) or []:
if not isinstance(route, dict):
continue
lane = route.get("lane", {})
if not isinstance(lane, dict):
continue
try:
lane_index = int(lane.get("index", 0) or 0)
except Exception:
lane_index = 0
if lane_index <= max_lane_index:
continue
max_lane_index = lane_index
try:
lane_spacing = float(lane.get("spacing_mm", 0.0) or 0.0)
except Exception:
lane_spacing = 0.0
if max_lane_index <= 0:
return {}
return {
"max_lane_index": max_lane_index,
"spacing_mm": lane_spacing,
}
def format_eplan_connection_route_report(report):
message = "批量生成布线连接完成routed={0}, collision_warnings={1}, missing_terminals={2}".format(
report.get("routed", 0),
report.get("collision_warnings", 0),
report.get("skipped_missing_terminal", 0),
)
status_counts = report.get("route_status_counts", {})
if isinstance(status_counts, dict) and status_counts:
status_labels = {
"Routed": "正常",
"CollisionWarning": "碰撞告警",
"Error": "错误",
"MissingTerminal": "缺失端子",
"Invalid": "无效任务",
}
def status_count_value(value):
try:
return int(value or 0)
except Exception:
return 0
status_parts = []
for key in ("Routed", "CollisionWarning", "Error", "MissingTerminal", "Invalid"):
value = status_count_value(status_counts.get(key, 0))
if value > 0:
status_parts.append("{0} {1}".format(status_labels[key], value))
for key, value in sorted(status_counts.items()):
value = status_count_value(value)
if key in status_labels or value <= 0:
continue
status_parts.append("{0} {1}".format(key, value))
if status_parts:
message += "\n结果状态:{0}".format("".join(status_parts))
prepared_layout = report.get("prepared_layout")
if isinstance(prepared_layout, dict):
message += "\n布线布局空间:线槽路径 {0} 条,布线面 {1} 条,端子接入 {2} 条。".format(
prepared_layout.get("wire_duct_carriers", 0),
prepared_layout.get("surface_carriers", 0),
prepared_layout.get("terminal_access_carriers", 0),
)
total_length_mm = float(report.get("total_length_mm", 0.0) or 0.0)
if total_length_mm > 0.0:
message += "\n布线连接总长度:{0:.1f} mm。".format(total_length_mm)
bridged_segments = _route_network_metric_max(report, "bridged_segments")
blocked_segments = _route_network_metric_max(report, "blocked_segments")
network_parts = []
if bridged_segments > 0:
network_parts.append("自动桥接 {0} 段相邻线槽".format(bridged_segments))
if blocked_segments > 0:
network_parts.append("避障屏蔽 {0}".format(blocked_segments))
if network_parts:
message += "\n路径网络:{0}".format("".join(network_parts))
lane_summary = _route_lane_summary(report)
if lane_summary:
message += "\n并行错位:最大 lane {0},间距 {1:.1f} mm。".format(
lane_summary.get("max_lane_index", 0),
float(lane_summary.get("spacing_mm", 0.0) or 0.0),
)
route_source_sample = _route_source_sample_text(report)
if route_source_sample:
message += "\n{0}".format(route_source_sample)
errors = report.get("errors", []) or []
if errors:
message += "\n首个错误:{0}".format(str(errors[0]))
error_sample = (report.get("error_samples") or [None])[0]
if error_sample:
message += "\n错误示例:导线 {0}{1}{2}".format(
_wire_sample_text(error_sample),
_endpoint_pair_text(error_sample),
error_sample.get("error", ""),
)
collision_sample = (report.get("collision_samples") or [None])[0]
if collision_sample:
obstacle_text = (
collision_sample.get("obstacle_label")
or collision_sample.get("obstacle_name")
or "未知对象"
)
wire_text = (
collision_sample.get("wire_label")
or collision_sample.get("wire_uuid")
or "未知导线"
)
if collision_sample.get("collision_kind") == "ClearanceWarning":
message += "\n碰撞示例:导线 {0} 进入 {1} 的安全间隙。".format(
wire_text,
obstacle_text,
)
else:
message += "\n碰撞示例:导线 {0} 碰到 {1}".format(
wire_text,
obstacle_text,
)
auto_bound = report.get("auto_bound_terminals", 0)
auto_created = report.get("auto_created_terminals", 0)
if auto_bound or auto_created:
message += "\n已按导线任务绑定 3D 工程端子:更新 {0} 个,新建 {1} 个。".format(
auto_bound,
auto_created,
)
if report.get("routed", 0) == 0 and report.get("skipped_missing_terminal", 0) > 0:
message += (
"\n端子匹配失败:当前 3D 可布线端子 {0} 个,其中本地模板端子 {1} 个;"
"导线任务引用的 QET terminal_uuid 没有绑定到这些 3D 工程端子。"
).format(
report.get("available_terminals", 0),
report.get("local_terminals", 0),
)
if report.get("local_terminals", 0) > 0:
message += " 请先从 QET 重新导入/更新工程端子,使端子 UUID 不再是 local:...。"
sample = (report.get("missing_endpoint_samples") or [None])[0]
if sample:
message += "\n缺失示例:{0} -> {1}{2}".format(
_missing_endpoint_label(sample, "start"),
_missing_endpoint_label(sample, "end"),
_missing_endpoint_side_summary(sample),
)
return message
def _clear_routing_connection_batch_diagnostics(doc):
group = WiringObjects.ensure_diagnostic_group(doc, _project_uuid(doc))
removed = 0
for obj in list(getattr(group, "Group", []) or []):
if (getattr(obj, "QetDiagnosticKind", "") or "").strip() != "RoutingConnectionBatch":
continue
try:
group.removeObject(obj)
except Exception:
try:
group.Group = [
candidate
for candidate in list(getattr(group, "Group", []) or [])
if candidate is not obj
]
except Exception:
pass
try:
if doc.getObject(getattr(obj, "Name", "")) is not None:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def _write_routing_connection_batch_diagnostic(doc, report):
if doc is None or not isinstance(report, dict):
return None
project_uuid = _project_uuid(doc)
group = WiringObjects.ensure_diagnostic_group(doc, project_uuid)
_clear_routing_connection_batch_diagnostics(doc)
if (
report.get("total_wires", 0) <= 0
and not report.get("routes")
and not report.get("errors")
and not report.get("missing_endpoint_uuids")
and report.get("collision_warnings", 0) <= 0
):
return None
diagnostic = doc.addObject("App::DocumentObjectGroup", _unique_name(doc, "QETRoutingConnectionDiagnostic"))
diagnostic.Label = "QET Routing Connection Diagnostic"
_set_string(diagnostic, "QetDiagnosticKind", "RoutingConnectionBatch", "QET diagnostic kind")
_set_string(
diagnostic,
"QetDiagnosticJson",
json.dumps(report, ensure_ascii=False),
"QET routing connection batch diagnostic payload",
)
group.addObject(diagnostic)
return diagnostic
def _iter_wire_tasks(doc):
try:
task_group = doc.getObject("QETWiring_01_Tasks")
except Exception:
task_group = None
if task_group is None:
return []
return [
task
for task in list(getattr(task_group, "Group", []) or [])
if (getattr(task, "RouteType", "") or "").strip() == "Task"
]
def _wire_tasks_payload(doc):
payload = {"project_uuid": _project_uuid(doc), "wires": []}
for task in _iter_wire_tasks(doc):
payload["wires"].append(
{
"wire_id": (getattr(task, "QetWireUuid", "") or "").strip(),
"wire_label": (getattr(task, "QetWireLabel", "") or "").strip(),
"wire_mark": (getattr(task, "QetWireMark", "") or "").strip(),
"wire_mark_is_manual": bool(getattr(task, "QetWireMarkIsManual", False)),
"wire_style_id": (getattr(task, "QetWireStyleId", "") or "").strip(),
"net_uuid": (getattr(task, "QetNetUuid", "") or "").strip(),
"group_uuid": (getattr(task, "QetGroupUuid", "") or "").strip(),
"start_element_uuid": (getattr(task, "QetStartElementUuid", "") or "").strip(),
"start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(),
"start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(),
"start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(),
"start_device_label": (getattr(task, "QetStartDeviceLabel", "") or "").strip(),
"end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(),
"end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(),
"end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(),
"end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(),
"end_device_label": (getattr(task, "QetEndDeviceLabel", "") or "").strip(),
"endpoint_label": (getattr(task, "QetEndpointLabel", "") or "").strip(),
}
)
return payload
def bind_wire_task_terminals_from_tasks(doc):
return bind_wire_task_terminals_from_payload(doc, _wire_tasks_payload(doc))
def route_eplan_connection_tasks(doc, options=None, prepared_layout=None):
payload = _wire_tasks_payload(doc)
return route_eplan_connections_from_payload(doc, payload, options=options, prepared_layout=prepared_layout)
def prepare_eplan_layout_space(doc, project_uuid=""):
"""Prepare the FreeCAD document as an EPLAN-style layout space.
This step marks layout-space source objects and wiring buckets, but does
not generate the routing path network. In EPLAN terms, the layout space is
the 3D installation context in which the network is later generated.
"""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
target_project_uuid = (project_uuid or "").strip() or _project_uuid(doc)
if not target_project_uuid:
try:
target_project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
target_project_uuid = ""
return RoutingNetwork.prepare_layout_space_sources_from_document(
doc,
project_uuid=target_project_uuid,
)
def generate_eplan_routing_path_network(doc, project_uuid="", options=None, selection_ex=None):
"""Generate the routing path network for the current layout space."""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
opts = _merged_options(options)
target_project_uuid = (project_uuid or "").strip() or _project_uuid(doc)
if not target_project_uuid:
try:
target_project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
target_project_uuid = ""
return RoutingNetwork.create_routing_path_network_from_document(
doc,
project_uuid=target_project_uuid,
selection_ex=selection_ex,
terminal_exit_length=float(opts.get("terminal_exit_length", 20.0) or 0.0),
terminal_access_max_distance=float(opts.get("terminal_access_max_distance", 1000.0) or 0.0),
adjoining_duct_tolerance=float(opts.get("adjoining_duct_tolerance", 0.0) or 0.0),
)
def check_eplan_routing_path_network(doc, project_uuid="", options=None):
"""Write and return routing path network diagnostics for the layout space."""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
opts = _merged_options(options)
target_project_uuid = (project_uuid or "").strip() or _project_uuid(doc)
if not target_project_uuid:
try:
target_project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
target_project_uuid = ""
result = RoutingNetwork.write_routing_path_network_diagnostic(
doc,
project_uuid=target_project_uuid,
terminal_exit_length=float(opts.get("terminal_exit_length", 20.0) or 0.0),
terminal_access_max_distance=float(opts.get("terminal_access_max_distance", 1000.0) or 0.0),
adjoining_duct_tolerance=float(opts.get("adjoining_duct_tolerance", 0.0) or 0.0),
)
diagnostic = result.get("diagnostic", {}) if isinstance(result, dict) else {}
return {
"diagnostic": diagnostic,
"diagnostic_object": result.get("diagnostic_object") if isinstance(result, dict) else None,
"ok": bool(diagnostic.get("ok", False)) if isinstance(diagnostic, dict) else False,
"issue_count": len(diagnostic.get("issues", []) or []) if isinstance(diagnostic, dict) else 0,
}
def _format_distance_mm(value):
try:
return "{0:.1f} mm".format(float(value))
except Exception:
return "未知距离"
def _format_point_text(point):
if not isinstance(point, dict):
return "未知位置"
try:
return "({0:.1f}, {1:.1f}, {2:.1f})".format(
float(point.get("x", 0.0)),
float(point.get("y", 0.0)),
float(point.get("z", 0.0)),
)
except Exception:
return "未知位置"
def _diagnostic_terminal_text(item):
if not isinstance(item, dict):
return "未知端子"
return (
item.get("terminal_uuid")
or item.get("label")
or item.get("name")
or "未知端子"
)
def _dict_items(value):
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def format_routing_path_network_report(diagnostic):
"""Return an actionable Chinese summary for routing path network diagnostics."""
if not isinstance(diagnostic, dict):
return "布线路径网络检查失败:诊断结果无效。"
summary = diagnostic.get("summary", {}) if isinstance(diagnostic.get("summary", {}), dict) else {}
issues = _dict_items(diagnostic.get("issues", []) or [])
if not issues:
message = "布线路径网络检查通过:{0} 条 carrier / {1} 段 / {2} 个节点。".format(
summary.get("carriers", 0),
summary.get("segments", 0),
summary.get("nodes", 0),
)
bridged_segments = int(summary.get("bridged_segments", 0) or 0)
if bridged_segments > 0:
message += " 自动桥接 {0} 段相邻线槽。".format(bridged_segments)
return message
message = "布线路径网络检查发现 {0} 类问题。".format(len(issues))
unconnected = _dict_items(diagnostic.get("unconnected_terminals", []) or [])
if unconnected:
sample = unconnected[0]
message += "\n端子未接入:{0},距离最近网络 {1},当前端子接入最大距离 {2}。请重新生成布线路径网络,或补一段线槽/辅助路径到该端子。".format(
_diagnostic_terminal_text(sample),
_format_distance_mm(sample.get("nearest_network_distance_mm")),
_format_distance_mm(sample.get("terminal_access_max_distance_mm")),
)
possible_breaks = _dict_items(diagnostic.get("possible_breaks", []) or [])
if possible_breaks:
sample = possible_breaks[0]
carrier = sample.get("carrier", {}) if isinstance(sample.get("carrier", {}), dict) else {}
carrier_text = carrier.get("label") or carrier.get("name") or "未知线槽"
message += "\n线槽端点疑似断开:{0} @ {1}。请补齐相邻线槽、开口或辅助路径。".format(
carrier_text,
_format_point_text(sample.get("point")),
)
isolated = _dict_items(diagnostic.get("isolated_components", []) or [])
if isolated:
sample = isolated[0]
carriers = sample.get("carrier_labels") or sample.get("carrier_names") or []
carrier_text = "".join([str(item) for item in carriers[:3]]) if carriers else "未知 carrier"
message += "\n存在孤立路径网络:{0}。请用线槽/辅助路径把孤立网络接入主网络。".format(carrier_text)
if not (unconnected or possible_breaks or isolated):
first_issue = issues[0]
message += "\n首个问题:{0} ({1})。".format(
first_issue.get("code", "unknown"),
first_issue.get("count", 0),
)
return message
def update_eplan_routing_path_network(doc, project_uuid="", options=None, selection_ex=None):
"""Update the routing path network before EPLAN-style Route."""
return generate_eplan_routing_path_network(
doc,
project_uuid=project_uuid,
options=options,
selection_ex=selection_ex,
)
def route_eplan_connections(
doc,
payload=None,
options=None,
project_uuid="",
selection_ex=None,
update_network=True,
):
"""Route QET wire tasks through the EPLAN-style routing path network."""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
prepared_network = None
if update_network:
prepared_network = update_eplan_routing_path_network(
doc,
project_uuid=project_uuid,
options=options,
selection_ex=selection_ex,
)
target_payload = payload
if target_payload is None:
target_payload = getattr(App, "_qet_exchange_payload", None)
if isinstance(target_payload, dict) and target_payload.get("wires"):
report = route_eplan_connections_from_payload(
doc,
target_payload,
options=options,
prepared_layout=prepared_network,
)
else:
report = route_eplan_connection_tasks(
doc,
options=options,
prepared_layout=prepared_network,
)
report["routing_method"] = "eplan-route-v1"
report["routing_path_network_updated"] = bool(update_network)
if isinstance(prepared_network, dict):
report["routing_path_network"] = prepared_network
return report
def wire_task_count(doc):
return len(_iter_wire_tasks(doc))
def clear_routing_connections(doc):
removed = 0
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection":
continue
try:
_detach_object_from_groups(doc, obj)
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
try:
doc.recompute()
except Exception:
pass
return removed
def _console_message(message):
try:
App.Console.PrintMessage("[FreeCADExchange] {0}\n".format(message))
except Exception:
pass
def _console_error(message):
try:
App.Console.PrintError("[FreeCADExchange] {0}\n".format(message))
except Exception:
pass
class CommandRouteEplanConnections:
def GetResources(self):
return {
"MenuText": "生成布线连接(全部导线)",
"ToolTip": "按布线路径网络生成全部 3D 布线连接",
}
def IsActive(self):
return getattr(App, "ActiveDocument", None) is not None
def Activated(self):
doc = getattr(App, "ActiveDocument", None)
try:
payload = getattr(App, "_qet_exchange_payload", None)
report = route_eplan_connections(
doc,
payload=payload if isinstance(payload, dict) and payload.get("wires") else None,
update_network=True,
)
if report.get("total_wires", 0) <= 0:
_console_error("没有导线任务。生成布线连接需要 QET wires[] 或 QETWiring_01_Tasks。")
return
_console_message(format_eplan_connection_route_report(report))
except Exception as exc:
_console_error("批量生成布线连接失败:{0}".format(exc))
_COMMANDS_REGISTERED = False
def register_commands():
global _COMMANDS_REGISTERED
if _COMMANDS_REGISTERED:
return
if Gui is None or not hasattr(Gui, "addCommand"):
return
Gui.addCommand("QET_Exchange_RouteEplanConnections", CommandRouteEplanConnections())
_COMMANDS_REGISTERED = True
register_commands()