You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1384 lines
48 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# FreeCADExchange 3D automatic wiring.
#
# 第一版不碰 C++,也不把 3D 走线结果写进数据库。
# 它只读取 FreeCAD 文档里的端子、走线网络和几何障碍,
# 然后在 QETWiring_04_Routed 下生成一条可见的折线导线。
import json
import math
import FreeCAD as App
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
import RoutingNetwork
import TerminalObjects
import TemplateSemantics
import WiringObjects
DEFAULT_OPTIONS = {
# 端子出来先走一小段,避免导线贴着设备外壳起步。
"terminal_exit_length": 20.0,
# 没有线槽网络时,退回到这个方向抬高/偏移后做正交折线。
"clearance_axis": "z",
"clearance": 80.0,
"lane_axis": "y",
"lane_spacing": 10.0,
# 线槽网络相关参数。
"use_routing_network": True,
"network_entry_max_distance": 1000.0,
"bend_penalty": 25.0,
# EPLAN/SOLIDWORKS 风格:线槽/路由路径最优先,辅助面域只作为过渡/兜底区域。
"carrier_kind_cost_factors": {
"WireDuct": 1.0,
"RoutingPath": 1.0,
"UserPath": 1.0,
"AuxiliaryPath": 2.0,
"TerminalAccess": 2.0,
"RoutingRange": 8.0,
"SurfaceGrid": 8.0,
},
# 默认不再生成长距离悬空 fallback主干必须走 carrier/贴面网络。
"allow_floating_fallback": False,
# 障碍包围盒会按这个距离膨胀,用于提前发现贴碰风险。
"obstacle_clearance": 5.0,
# 防止坐标异常或端子离路由网络过远时生成超长接入线,把 FreeCAD
# 视图包围盒拉得过大,导致旋转时模型被裁剪到看不见。
"terminal_access_max_distance": 1000.0,
# 先把穿过障碍包围盒的路由网络边从 Dijkstra 图中移除;如果没有安全
# 替代路径,再退回原图并用 CollisionWarning 告诉用户当前网络不足。
"avoid_obstacles": True,
"replace_existing": True,
}
class AutoRoutingError(RuntimeError):
pass
def _merged_options(options):
merged = dict(DEFAULT_OPTIONS)
if isinstance(options, dict):
merged.update(options)
return merged
def _vector(point):
if isinstance(point, App.Vector):
return App.Vector(point.x, point.y, point.z)
if isinstance(point, (list, tuple)) and len(point) >= 3:
return App.Vector(float(point[0]), float(point[1]), float(point[2]))
if isinstance(point, dict):
return App.Vector(
float(point.get("x", 0.0)),
float(point.get("y", 0.0)),
float(point.get("z", 0.0)),
)
if all(hasattr(point, name) for name in ("x", "y", "z")):
return App.Vector(float(point.x), float(point.y), float(point.z))
return App.Vector(0, 0, 0)
def _distance(left, right):
dx = float(left.x) - float(right.x)
dy = float(left.y) - float(right.y)
dz = float(left.z) - float(right.z)
return (dx * dx + dy * dy + dz * dz) ** 0.5
def _vector_close(left, right, tolerance=0.000001):
return _distance(left, right) <= tolerance
def _point_payload(point):
return {
"x": float(point.x),
"y": float(point.y),
"z": float(point.z),
}
def _route_length(points):
total = 0.0
normalized = [_vector(point) for point in points or []]
for index in range(len(normalized) - 1):
total += _distance(normalized[index], normalized[index + 1])
return total
def _is_finite_point(point):
try:
return all(
math.isfinite(float(getattr(point, axis, 0.0)))
for axis in ("x", "y", "z")
)
except Exception:
return False
def _append_unique(points, point):
vector = _vector(point)
if not _is_finite_point(vector):
return
if not points or not _vector_close(points[-1], vector):
points.append(vector)
def _axis_value(point, axis):
return float(getattr(point, axis, 0.0))
def _with_axis(point, axis, value):
return App.Vector(
float(value) if axis == "x" else float(point.x),
float(value) if axis == "y" else float(point.y),
float(value) if axis == "z" else float(point.z),
)
def _orthogonal_points(start_point, end_point, preferred_axis=None):
if _vector_close(start_point, end_point):
return [start_point]
# 每一段只沿一个坐标轴移动,这样生成的线天然是机柜布线常见的折线。
axis_order = sorted(
("x", "y", "z"),
key=lambda axis: abs(_axis_value(end_point, axis) - _axis_value(start_point, axis)),
reverse=True,
)
if preferred_axis in {"x", "y", "z"}:
axis_order = [axis for axis in axis_order if axis != preferred_axis]
axis_order.append(preferred_axis)
points = [start_point]
current = start_point
for axis in axis_order:
target = _axis_value(end_point, axis)
if abs(_axis_value(current, axis) - target) <= 0.000001:
continue
current = _with_axis(current, axis, target)
_append_unique(points, current)
_append_unique(points, end_point)
return points
def _append_orthogonal(points, target_point, preferred_axis=None):
if not points:
_append_unique(points, target_point)
return
segment = _orthogonal_points(points[-1], _vector(target_point), preferred_axis)
for point in segment[1:]:
_append_unique(points, point)
def _offset(point, direction, distance):
return App.Vector(
float(point.x) + float(direction.x) * float(distance),
float(point.y) + float(direction.y) * float(distance),
float(point.z) + float(direction.z) * float(distance),
)
def _terminal_origin(terminal):
return _vector(TerminalObjects.terminal_origin(terminal))
def _terminal_direction(terminal):
try:
return _vector(TerminalObjects.terminal_direction(terminal))
except Exception:
return App.Vector(0, 0, 1)
def _project_uuid(doc, start_terminal=None, end_terminal=None):
for obj in (start_terminal, end_terminal):
value = (getattr(obj, "QetProjectUuid", "") or "").strip()
if value:
return value
try:
return (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
return ""
def index_terminals(doc):
"""Return {terminal_uuid: terminal_object} for routable engineering terminals."""
if doc is None:
return {}
terminals = []
root = None
try:
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
except Exception:
root = None
if root is not None:
terminals.extend(TerminalObjects.collect_terminal_objects(root))
terminals.extend(
obj
for obj in list(getattr(doc, "Objects", []) or [])
if TerminalObjects.is_terminal_object(obj)
)
indexed = {}
for terminal in terminals:
terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip()
if terminal_uuid and terminal_uuid not in indexed:
indexed[terminal_uuid] = terminal
return indexed
def _normalized_match_token(value):
return (value or "").strip().lower().replace(" ", "")
def _device_group_for_wire_endpoint(doc, instance_id, element_uuid):
device_group = TerminalObjects.find_device_group_by_instance_id(doc, instance_id)
if device_group is None:
device_group = TerminalObjects.find_device_group(doc, element_uuid)
return device_group
def _terminal_match_tokens(obj):
tokens = []
for value in (
getattr(obj, "QetTemplateSlotName", ""),
getattr(obj, "QetTerminalLabel", ""),
getattr(obj, "Label", ""),
getattr(obj, "Name", ""),
):
token = _normalized_match_token(value)
if token and token not in tokens:
tokens.append(token)
return tokens
def _slot_match_tokens(slot):
tokens = []
for value in (
slot.get("name", ""),
slot.get("label", ""),
):
token = _normalized_match_token(value)
if token and token not in tokens:
tokens.append(token)
return tokens
def _matching_local_terminal(terminal_group, terminal_display, used_objects):
local_terminals = []
display_token = _normalized_match_token(terminal_display)
for terminal in TerminalObjects.collect_terminal_objects(terminal_group):
if terminal in used_objects:
continue
terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip()
if not TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
local_terminals.append(terminal)
if display_token and display_token in _terminal_match_tokens(terminal):
return terminal
if not display_token and len(local_terminals) == 1:
return local_terminals[0]
return None
def _matching_template_slot(device_group, terminal_display, used_slot_tokens):
display_token = _normalized_match_token(terminal_display)
if not display_token:
return None
for slot in TemplateSemantics.collect_terminal_hints(device_group):
slot_tokens = _slot_match_tokens(slot)
if display_token not in slot_tokens:
continue
slot_token = slot_tokens[0] if slot_tokens else ""
if slot_token and slot_token in used_slot_tokens:
continue
return slot
return None
def _slot_placement(slot):
base = slot.get("base")
if not isinstance(base, App.Vector):
base = App.Vector(0, 0, 0)
rotation = App.Rotation()
rotation_value = slot.get("rotation")
if isinstance(rotation_value, dict):
axis = rotation_value.get("axis")
angle = rotation_value.get("angle")
if isinstance(axis, App.Vector) and angle is not None:
try:
rotation = App.Rotation(axis, float(angle))
except Exception:
rotation = App.Rotation()
return App.Placement(base, rotation)
def _wire_endpoint_entries(payload):
entries = []
seen = set()
for item in payload.get("wires", []) or []:
if not isinstance(item, dict):
continue
for prefix in ("start", "end"):
terminal_uuid = _wire_item_value(item, "{0}_terminal_uuid".format(prefix))
if not terminal_uuid or TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
if terminal_uuid in seen:
continue
seen.add(terminal_uuid)
entries.append(
{
"terminal_uuid": terminal_uuid,
"element_uuid": _wire_item_value(item, "{0}_element_uuid".format(prefix)),
"instance_id": _wire_item_value(item, "{0}_instance_id".format(prefix)),
"terminal_display": _wire_item_value(
item,
"{0}_terminal_display".format(prefix),
"{0}_terminal_label".format(prefix),
),
}
)
return entries
def _bind_wire_task_terminals(doc, payload):
"""Promote matching local template terminals to QET terminal UUIDs before routing."""
report = {
"bound": 0,
"created": 0,
"skipped": 0,
"warnings": [],
}
if doc is None or not isinstance(payload, dict):
return report
project_uuid = (payload.get("project_uuid") or "").strip()
if not project_uuid:
try:
project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
project_uuid = ""
indexed = index_terminals(doc)
used_objects = set()
used_slot_tokens = set()
for entry in _wire_endpoint_entries(payload):
terminal_uuid = entry["terminal_uuid"]
if terminal_uuid in indexed:
continue
device_group = _device_group_for_wire_endpoint(
doc,
entry.get("instance_id", ""),
entry.get("element_uuid", ""),
)
if device_group is None:
report["skipped"] += 1
report["warnings"].append(
"端子 {0} 找不到所属 3D 设备实例。".format(terminal_uuid)
)
continue
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
terminal_group = TerminalObjects.ensure_terminal_group(
doc,
device_group,
project_uuid=project_uuid,
instance_id=instance_id,
)
terminal_display = entry.get("terminal_display", "")
terminal_obj = _matching_local_terminal(terminal_group, terminal_display, used_objects)
if terminal_obj is None:
slot = _matching_template_slot(device_group, terminal_display, used_slot_tokens)
if slot is None:
report["skipped"] += 1
report["warnings"].append(
"端子 {0} 没有匹配到模板槽位 {1}".format(
terminal_uuid,
terminal_display or "<empty>",
)
)
continue
slot_name = (slot.get("name") or terminal_display or terminal_uuid).strip()
terminal_obj = TerminalObjects.create_lcs_object(
doc,
"QETTerminal_{0}".format(TerminalObjects.safe_token(terminal_uuid)),
placement=_slot_placement(slot),
label=terminal_display or terminal_uuid,
)
terminal_group.addObject(terminal_obj)
source_obj = slot.get("source_object")
if source_obj is not None:
try:
source_obj.ViewObject.Visibility = False
except Exception:
pass
report["created"] += 1
else:
slot_name = (getattr(terminal_obj, "QetTemplateSlotName", "") or terminal_display).strip()
report["bound"] += 1
TerminalObjects.set_terminal_semantics(
terminal_obj,
project_uuid,
element_uuid,
terminal_uuid,
instance_id,
label=terminal_display or getattr(terminal_obj, "Label", "") or terminal_uuid,
slot_name=slot_name,
)
used_objects.add(terminal_obj)
slot_token = _normalized_match_token(slot_name)
if slot_token:
used_slot_tokens.add(slot_token)
if report["bound"] or report["created"]:
try:
doc.recompute()
except Exception:
pass
return report
def _wire_object_name(start_terminal, end_terminal, wire_uuid=""):
if wire_uuid:
return "QETAutoWire_{0}".format(TerminalObjects.safe_token(wire_uuid))
return "QETAutoWire_{0}_{1}".format(
TerminalObjects.safe_token(getattr(start_terminal, "QetTerminalUuid", "")),
TerminalObjects.safe_token(getattr(end_terminal, "QetTerminalUuid", "")),
)
def _unique_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _create_wire_geometry(doc, name, points):
# Use a plain Part edge for generated auto wires. Draft wires are convenient for
# editing, but in large imported assemblies they can trigger view-provider redraw
# glitches while rotating the 3D scene.
try:
import Part
obj = doc.addObject("Part::Feature", name)
obj.Shape = Part.makePolygon(points)
_set_points(obj, points)
return obj
except Exception:
pass
if getattr(App, "ActiveDocument", None) is doc:
try:
import Draft
obj = Draft.make_wire(
points,
closed=False,
placement=None,
face=False,
support=None,
bs2wire=False,
)
if obj is not None:
try:
obj.MakeFace = False
except Exception:
pass
_set_points(obj, points)
return obj
except Exception:
pass
obj = doc.addObject("App::FeaturePython", name)
_set_points(obj, points)
return obj
def _set_points(obj, points):
try:
if "Points" not in getattr(obj, "PropertiesList", []):
obj.addProperty("App::PropertyVectorList", "Points", "QET Wiring", "Auto route points")
obj.Points = list(points)
except Exception:
pass
def _set_string(obj, name, value, description="Auto-routing property"):
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
def _route_payload(route_data, collisions, wire_style_id=""):
points = route_data.get("points", [])
return {
"algorithm": route_data.get("algorithm", ""),
"length_mm": _route_length(points),
"wire_style_id": str(wire_style_id or "").strip(),
"points": [_point_payload(point) for point in points],
"collision_count": len(collisions),
"collisions": collisions,
"network": route_data.get("network", {}),
}
def _set_auto_metadata(wire, route_data, collisions, wire_style_id=""):
length_mm = _route_length(route_data.get("points", []))
_set_string(
wire,
"QetAutoRouteAlgorithm",
route_data.get("algorithm", ""),
"Auto-routing algorithm used for this wire",
)
_set_string(
wire,
"QetAutoRouteLengthMm",
"{0:.3f}".format(length_mm),
"Auto route length in millimeters",
)
_set_string(
wire,
"QetWireStyleId",
str(wire_style_id or "").strip(),
"QET wire style ID",
)
_set_string(
wire,
"QetAutoRouteDiagnosticsJson",
json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False),
"Auto-routing diagnostics",
)
if route_data.get("network"):
_set_string(
wire,
"QetAutoRouteNetworkJson",
json.dumps(route_data.get("network", {}), ensure_ascii=False),
"Route network metadata used by this wire",
)
def build_orthogonal_route(start_terminal, end_terminal, route_index=0, options=None):
opts = _merged_options(options)
start_origin = _terminal_origin(start_terminal)
end_origin = _terminal_origin(end_terminal)
exit_length = max(float(opts.get("terminal_exit_length", 0.0) or 0.0), 0.0)
start_exit = _offset(start_origin, _terminal_direction(start_terminal), exit_length)
end_exit = _offset(end_origin, _terminal_direction(end_terminal), exit_length)
clearance_axis = (opts.get("clearance_axis") or "z").lower()
if clearance_axis not in {"x", "y", "z"}:
clearance_axis = "z"
lane_axis = (opts.get("lane_axis") or "y").lower()
if lane_axis not in {"x", "y", "z"}:
lane_axis = "y"
clearance_value = max(
_axis_value(start_exit, clearance_axis),
_axis_value(end_exit, clearance_axis),
) + float(opts.get("clearance", 0.0) or 0.0)
lane_offset = float(route_index or 0) * float(opts.get("lane_spacing", 0.0) or 0.0)
lane_point = _with_axis(start_exit, clearance_axis, clearance_value)
lane_point = _with_axis(lane_point, lane_axis, _axis_value(lane_point, lane_axis) + lane_offset)
end_lane = _with_axis(end_exit, clearance_axis, clearance_value)
end_lane = _with_axis(end_lane, lane_axis, _axis_value(end_lane, lane_axis) + lane_offset)
points = []
_append_unique(points, start_origin)
_append_unique(points, start_exit)
_append_orthogonal(points, lane_point, preferred_axis=clearance_axis)
_append_orthogonal(points, end_lane)
_append_orthogonal(points, end_exit, preferred_axis=clearance_axis)
_append_unique(points, end_origin)
return {
"algorithm": "orthogonal-v1",
"points": points,
"network": {},
}
def build_network_route(start_terminal, end_terminal, route_index=0, options=None, doc=None):
opts = _merged_options(options)
if not opts.get("use_routing_network", True):
return None
if doc is None:
doc = getattr(start_terminal, "Document", None) or getattr(App, "ActiveDocument", None)
if doc is None:
return None
exit_length = max(float(opts.get("terminal_exit_length", 0.0) or 0.0), 0.0)
start_origin = _terminal_origin(start_terminal)
end_origin = _terminal_origin(end_terminal)
start_exit = _offset(start_origin, _terminal_direction(start_terminal), exit_length)
end_exit = _offset(end_origin, _terminal_direction(end_terminal), exit_length)
def route_on_network(network, obstacle_aware=False):
if network.get("segment_count", 0) <= 0:
return None
start_key, start_distance = RoutingNetwork.nearest_node(network, start_exit)
end_key, end_distance = RoutingNetwork.nearest_node(network, end_exit)
if start_key is None or end_key is None:
return None
max_distance = float(opts.get("network_entry_max_distance", 0.0) or 0.0)
if max_distance > 0.0 and (
float(start_distance or 0.0) > max_distance
or float(end_distance or 0.0) > max_distance
):
return None
path_keys = RoutingNetwork.shortest_path(
network,
start_key,
end_key,
bend_penalty=float(opts.get("bend_penalty", 0.0) or 0.0),
kind_cost_factors=opts.get("carrier_kind_cost_factors", {}),
)
if not path_keys:
return None
carrier_points = RoutingNetwork.path_points(network, path_keys)
if not carrier_points:
return None
points = []
_append_unique(points, start_origin)
_append_unique(points, start_exit)
_append_orthogonal(points, carrier_points[0])
for point in carrier_points[1:]:
_append_unique(points, point)
_append_orthogonal(points, end_exit)
_append_unique(points, end_origin)
return {
"algorithm": "network-dijkstra-v1",
"points": points,
"network": {
"carriers": int(network.get("carrier_count", 0)),
"segments": int(network.get("segment_count", 0)),
"blocked_segments": int(network.get("blocked_segment_count", 0)),
"nodes": len(network.get("nodes", {})),
"entry_distance": float(start_distance or 0.0),
"exit_distance": float(end_distance or 0.0),
"obstacle_aware": bool(obstacle_aware),
},
}
use_obstacle_avoidance = bool(opts.get("avoid_obstacles", True))
obstacles = []
if use_obstacle_avoidance:
obstacles = collect_obstacles(doc, exclude=[start_terminal, end_terminal], options=opts)
blocked_bboxes = [obstacle["bbox"] for obstacle in obstacles if obstacle.get("bbox")]
if blocked_bboxes:
obstacle_aware_network = RoutingNetwork.build_route_graph(doc, blocked_bboxes=blocked_bboxes)
route_data = route_on_network(obstacle_aware_network, obstacle_aware=True)
if route_data is not None:
return route_data
network = RoutingNetwork.build_route_graph(doc)
return route_on_network(network, obstacle_aware=False)
def _is_group(obj):
try:
return bool(obj.isDerivedFrom("App::DocumentObjectGroup"))
except Exception:
return False
def _is_origin_helper(obj):
type_id = (getattr(obj, "TypeId", "") or "").lower()
name = (getattr(obj, "Name", "") or "").lower()
label = (getattr(obj, "Label", "") or "").lower()
compact_name = "".join(ch for ch in name if not ch.isdigit()).replace("-", "_")
compact_label = "".join(ch for ch in label if not ch.isdigit()).replace("-", "_")
helper_names = {
"origin",
"xy_plane",
"xz_plane",
"yz_plane",
"x_axis",
"y_axis",
"z_axis",
}
if "origin" in type_id or compact_name in helper_names:
return True
return compact_label in helper_names or compact_label.replace(" ", "_") in helper_names
def _bbox_payload(obj, clearance=0.0):
shape = getattr(obj, "Shape", None)
bbox = getattr(shape, "BoundBox", None)
if bbox is None:
return None
return {
"xmin": float(bbox.XMin) - clearance,
"xmax": float(bbox.XMax) + clearance,
"ymin": float(bbox.YMin) - clearance,
"ymax": float(bbox.YMax) + clearance,
"zmin": float(bbox.ZMin) - clearance,
"zmax": float(bbox.ZMax) + clearance,
}
def collect_obstacles(doc, exclude=None, options=None):
opts = _merged_options(options)
excluded = set(id(obj) for obj in (exclude or []) if obj is not None)
clearance = float(opts.get("obstacle_clearance", 0.0) or 0.0)
obstacles = []
for obj in list(getattr(doc, "Objects", []) or []):
if id(obj) in excluded:
continue
obstacle_mode = (getattr(obj, "QetRoutingObstacleMode", "") or "").strip()
if obstacle_mode in {"PassThrough", "WireDuctPassThrough", "SupportSurface"}:
continue
if _is_group(obj) or _is_origin_helper(obj):
continue
if TerminalObjects.is_lcs_like(obj) or TerminalObjects.is_terminal_object(obj):
continue
if RoutingNetwork.is_route_carrier(obj) or WiringObjects.is_routed_wire_object(obj):
continue
bbox = _bbox_payload(obj, clearance=clearance)
if bbox is None:
continue
obstacles.append(
{
"name": getattr(obj, "Name", ""),
"label": getattr(obj, "Label", ""),
"type_id": getattr(obj, "TypeId", ""),
"bbox": bbox,
}
)
return obstacles
def _segment_intersects_bbox(start, end, bbox):
# Slab intersection: 把线段参数化为 start + t*(end-start)t 在 [0, 1] 内相交即命中。
t_min = 0.0
t_max = 1.0
for axis, min_key, max_key in (
("x", "xmin", "xmax"),
("y", "ymin", "ymax"),
("z", "zmin", "zmax"),
):
start_value = _axis_value(start, axis)
end_value = _axis_value(end, axis)
delta = end_value - start_value
low = float(bbox[min_key])
high = float(bbox[max_key])
if abs(delta) <= 0.000001:
if start_value < low or start_value > high:
return False
continue
inv = 1.0 / delta
near = (low - start_value) * inv
far = (high - start_value) * inv
if near > far:
near, far = far, near
t_min = max(t_min, near)
t_max = min(t_max, far)
if t_min > t_max:
return False
return True
def detect_collisions(points, obstacles):
collisions = []
for index in range(max(len(points) - 1, 0)):
start = points[index]
end = points[index + 1]
for obstacle in obstacles:
if _segment_intersects_bbox(start, end, obstacle["bbox"]):
collisions.append(
{
"segment_index": index,
"obstacle_name": obstacle.get("name", ""),
"obstacle_label": obstacle.get("label", ""),
}
)
return collisions
def _remove_existing_auto_routes(doc, start_uuid, end_uuid, wire_uuid=""):
removed = 0
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "AutoSuggested":
continue
if wire_uuid and (getattr(obj, "QetWireUuid", "") or "").strip() != wire_uuid:
continue
same_direction = (
(getattr(obj, "QetStartTerminalUuid", "") or "").strip() == start_uuid
and (getattr(obj, "QetEndTerminalUuid", "") or "").strip() == end_uuid
)
reverse_direction = (
(getattr(obj, "QetStartTerminalUuid", "") or "").strip() == end_uuid
and (getattr(obj, "QetEndTerminalUuid", "") or "").strip() == start_uuid
)
if not same_direction and not reverse_direction:
continue
try:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def _find_task_by_wire_uuid(doc, wire_uuid):
if not wire_uuid:
return None
try:
task_group = doc.getObject("QETWiring_01_Tasks")
except Exception:
task_group = None
if task_group is None:
return None
for task in list(getattr(task_group, "Group", []) or []):
if (getattr(task, "QetWireUuid", "") or "").strip() == wire_uuid:
return task
return None
def _set_task_status(task, status):
if task is None:
return
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
status,
)
def _style_wire(wire, collision_count=0):
try:
wire.ViewObject.Visibility = True
wire.ViewObject.LineWidth = 5.0
if hasattr(wire.ViewObject, "DrawStyle"):
wire.ViewObject.DrawStyle = "Solid"
if hasattr(wire.ViewObject, "DisplayMode"):
wire.ViewObject.DisplayMode = "Wireframe"
if collision_count:
wire.ViewObject.LineColor = (1.0, 0.1, 0.0)
else:
wire.ViewObject.LineColor = (0.0, 0.35, 1.0)
except Exception:
pass
def route_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=0,
options=None,
wire_uuid="",
wire_label="",
net_uuid="",
group_uuid="",
wire_mark="",
wire_mark_is_manual=False,
wire_style_id="",
):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not TerminalObjects.is_terminal_object(start_terminal):
raise AutoRoutingError("Start object is not a routable terminal.")
if not TerminalObjects.is_terminal_object(end_terminal):
raise AutoRoutingError("End object is not a routable terminal.")
if start_terminal == end_terminal:
raise AutoRoutingError("Start and end terminal must be different.")
opts = _merged_options(options)
effective_wire_style_id = str(wire_style_id or opts.get("wire_style_id", "") or "").strip()
start_uuid = (getattr(start_terminal, "QetTerminalUuid", "") or "").strip()
end_uuid = (getattr(end_terminal, "QetTerminalUuid", "") or "").strip()
project_uuid = _project_uuid(doc, start_terminal, end_terminal)
if not project_uuid:
raise AutoRoutingError("Project UUID is required for auto-routing.")
if opts.get("replace_existing", True):
_remove_existing_auto_routes(doc, start_uuid, end_uuid, wire_uuid=wire_uuid)
route_data = build_network_route(
start_terminal,
end_terminal,
route_index=route_index,
options=opts,
doc=doc,
)
if route_data is None:
if not opts.get("allow_floating_fallback", False):
raise AutoRoutingError(
"没有可用的线槽/路由路径网络;请先自动识别线槽生成路径,或选择线槽实体生成中心路径。"
)
route_data = build_orthogonal_route(
start_terminal,
end_terminal,
route_index=route_index,
options=opts,
)
points = route_data.get("points", [])
if len(points) < 2:
raise AutoRoutingError("Auto-routing produced fewer than two points.")
obstacles = collect_obstacles(doc, exclude=[start_terminal, end_terminal], options=opts)
collisions = detect_collisions(points, obstacles)
status = "CollisionWarning" if collisions else "Routed"
wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid))
wire = _create_wire_geometry(doc, wire_name, points)
wire.Label = wire_label or wire_mark or wire_uuid or "QET Auto Wire"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="AutoSuggested",
route_status=status,
route_mode="Auto",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_auto_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try:
routed_group.ViewObject.Visibility = True
except Exception:
pass
_style_wire(wire, collision_count=len(collisions))
task = _find_task_by_wire_uuid(doc, wire_uuid)
_set_task_status(task, status)
try:
doc.recompute()
except Exception:
pass
return {
"wire": wire,
"route_status": status,
"algorithm": route_data.get("algorithm", ""),
"network": route_data.get("network", {}),
"points": points,
"collision_count": len(collisions),
"collisions": collisions,
}
def _wire_item_value(item, *names):
if not isinstance(item, dict):
return ""
for name in names:
value = item.get(name, "")
if value is not None and str(value).strip():
return str(value).strip()
return ""
def bind_wire_task_terminals_from_payload(doc, payload):
"""Bind local template terminals to QET terminal UUIDs without creating wires."""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not isinstance(payload, dict):
raise AutoRoutingError("Exchange payload must be an object.")
binding_report = _bind_wire_task_terminals(doc, payload)
terminals = index_terminals(doc)
wires = payload.get("wires", []) or []
endpoints = _wire_endpoint_entries(payload)
binding_report.update(
{
"total_wires": len(wires),
"endpoint_terminals": len(endpoints),
"available_terminals": len(terminals),
"local_terminals": sum(
1
for terminal_uuid in terminals
if TerminalObjects.is_local_terminal_uuid(terminal_uuid)
),
}
)
return binding_report
def format_terminal_binding_report(report):
message = "工程端子检查/绑定完成:更新 {0} 个,新建 {1} 个,跳过 {2} 个;当前端子 {3} 个,本地端子 {4} 个。".format(
report.get("bound", 0),
report.get("created", 0),
report.get("skipped", 0),
report.get("available_terminals", 0),
report.get("local_terminals", 0),
)
warnings = report.get("warnings", []) or []
if warnings:
message += "\n首个问题:{0}".format(warnings[0])
if report.get("total_wires", 0) <= 0:
message += "\n没有导线任务,无法按 QET terminal_uuid 绑定工程端子。"
return message
def route_all_from_payload(doc, payload, options=None):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
if not isinstance(payload, dict):
raise AutoRoutingError("Exchange payload must be an object.")
terminal_binding_report = bind_wire_task_terminals_from_payload(doc, payload)
terminals = index_terminals(doc)
local_terminal_count = sum(
1
for terminal_uuid in terminals
if TerminalObjects.is_local_terminal_uuid(terminal_uuid)
)
wires = payload.get("wires", []) or []
report = {
"total_wires": len(wires),
"available_terminals": len(terminals),
"local_terminals": local_terminal_count,
"auto_bound_terminals": terminal_binding_report["bound"],
"auto_created_terminals": terminal_binding_report["created"],
"auto_terminal_binding_warnings": terminal_binding_report["warnings"],
"routed": 0,
"collision_warnings": 0,
"skipped_missing_terminal": 0,
"skipped_invalid": 0,
"missing_endpoint_uuids": [],
"missing_endpoint_samples": [],
"errors": [],
"routes": [],
}
missing_endpoint_uuids = set()
for index, item in enumerate(wires):
if not isinstance(item, dict):
report["skipped_invalid"] += 1
continue
start_uuid = _wire_item_value(item, "start_terminal_uuid")
end_uuid = _wire_item_value(item, "end_terminal_uuid")
start_terminal = terminals.get(start_uuid)
end_terminal = terminals.get(end_uuid)
if start_terminal is None or end_terminal is None:
report["skipped_missing_terminal"] += 1
for terminal_uuid in (start_uuid, end_uuid):
if terminal_uuid and terminal_uuid not in terminals:
missing_endpoint_uuids.add(terminal_uuid)
# 这里只保留少量样例,避免面板状态被大量导线任务刷屏。
if len(report["missing_endpoint_samples"]) < 8:
report["missing_endpoint_samples"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"start_found": start_terminal is not None,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"end_terminal_uuid": end_uuid,
"end_found": end_terminal is not None,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
}
)
continue
try:
result = route_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=index,
options=options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
)
except Exception as exc:
report["errors"].append(str(exc))
continue
if result["route_status"] == "CollisionWarning":
report["collision_warnings"] += 1
report["routed"] += 1
report["routes"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"algorithm": result["algorithm"],
"route_status": result["route_status"],
"collision_count": result["collision_count"],
}
)
report["missing_endpoint_uuids"] = sorted(missing_endpoint_uuids)
_write_auto_route_batch_diagnostic(doc, report)
return report
def format_route_all_report(report):
message = "批量自动布线完成routed={0}, collision_warnings={1}, missing_terminals={2}".format(
report.get("routed", 0),
report.get("collision_warnings", 0),
report.get("skipped_missing_terminal", 0),
)
prepared_layout = report.get("prepared_layout")
if isinstance(prepared_layout, dict):
message += "\n布线布局空间:线槽路径 {0} 条,布线面 {1} 条,端子接入 {2} 条。".format(
prepared_layout.get("wire_duct_carriers", 0),
prepared_layout.get("surface_carriers", 0),
prepared_layout.get("terminal_access_carriers", 0),
)
errors = report.get("errors", []) or []
if errors:
message += "\n首个错误:{0}".format(str(errors[0]))
auto_bound = report.get("auto_bound_terminals", 0)
auto_created = report.get("auto_created_terminals", 0)
if auto_bound or auto_created:
message += "\n已按导线任务绑定 3D 工程端子:更新 {0} 个,新建 {1} 个。".format(
auto_bound,
auto_created,
)
if report.get("routed", 0) == 0 and report.get("skipped_missing_terminal", 0) > 0:
message += (
"\n端子匹配失败:当前 3D 可布线端子 {0} 个,其中本地模板端子 {1} 个;"
"导线任务引用的 QET terminal_uuid 没有绑定到这些 3D 工程端子。"
).format(
report.get("available_terminals", 0),
report.get("local_terminals", 0),
)
if report.get("local_terminals", 0) > 0:
message += " 请先从 QET 重新导入/更新工程端子,使端子 UUID 不再是 local:...。"
sample = (report.get("missing_endpoint_samples") or [None])[0]
if sample:
message += "\n缺失示例:{0} -> {1}".format(
sample.get("start_terminal_uuid", ""),
sample.get("end_terminal_uuid", ""),
)
return message
def _clear_auto_route_batch_diagnostics(doc):
group = WiringObjects.ensure_diagnostic_group(doc, _project_uuid(doc))
removed = 0
for obj in list(getattr(group, "Group", []) or []):
if (getattr(obj, "QetDiagnosticKind", "") or "").strip() != "AutoRouteBatch":
continue
try:
group.removeObject(obj)
except Exception:
try:
group.Group = [
candidate
for candidate in list(getattr(group, "Group", []) or [])
if candidate is not obj
]
except Exception:
pass
try:
if doc.getObject(getattr(obj, "Name", "")) is not None:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def _write_auto_route_batch_diagnostic(doc, report):
if doc is None or not isinstance(report, dict):
return None
if not report.get("errors") and not report.get("missing_endpoint_uuids") and report.get("collision_warnings", 0) <= 0:
return None
project_uuid = _project_uuid(doc)
group = WiringObjects.ensure_diagnostic_group(doc, project_uuid)
_clear_auto_route_batch_diagnostics(doc)
diagnostic = doc.addObject("App::DocumentObjectGroup", _unique_name(doc, "QETAutoRouteDiagnostic"))
diagnostic.Label = "QET Auto Route Diagnostic"
_set_string(diagnostic, "QetDiagnosticKind", "AutoRouteBatch", "QET diagnostic kind")
_set_string(
diagnostic,
"QetDiagnosticJson",
json.dumps(report, ensure_ascii=False),
"QET auto-routing batch diagnostic payload",
)
group.addObject(diagnostic)
return diagnostic
def _iter_wire_tasks(doc):
try:
task_group = doc.getObject("QETWiring_01_Tasks")
except Exception:
task_group = None
if task_group is None:
return []
return [
task
for task in list(getattr(task_group, "Group", []) or [])
if (getattr(task, "RouteType", "") or "").strip() == "Task"
]
def _wire_tasks_payload(doc):
payload = {"project_uuid": _project_uuid(doc), "wires": []}
for task in _iter_wire_tasks(doc):
payload["wires"].append(
{
"wire_id": (getattr(task, "QetWireUuid", "") or "").strip(),
"wire_label": (getattr(task, "QetWireLabel", "") or "").strip(),
"wire_mark": (getattr(task, "QetWireMark", "") or "").strip(),
"wire_mark_is_manual": bool(getattr(task, "QetWireMarkIsManual", False)),
"wire_style_id": (getattr(task, "QetWireStyleId", "") or "").strip(),
"net_uuid": (getattr(task, "QetNetUuid", "") or "").strip(),
"group_uuid": (getattr(task, "QetGroupUuid", "") or "").strip(),
"start_element_uuid": (getattr(task, "QetStartElementUuid", "") or "").strip(),
"start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(),
"start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(),
"start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(),
"end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(),
"end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(),
"end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(),
"end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(),
}
)
return payload
def bind_wire_task_terminals_from_tasks(doc):
return bind_wire_task_terminals_from_payload(doc, _wire_tasks_payload(doc))
def route_all_tasks(doc, options=None):
payload = _wire_tasks_payload(doc)
return route_all_from_payload(doc, payload, options=options)
def prepare_eplan_style_layout(doc, project_uuid="", options=None):
"""Prepare the whole document for production auto-routing.
EPLAN/SW 的操作语义是“对布线布局空间执行布线”,不是要求用户先点面、
画草图或手工补每个端子的接入线。这里统一生成:线槽中心路径、柜内
可布线面,以及端子到路由网络的自动接入 carrier。
"""
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
opts = _merged_options(options)
target_project_uuid = (project_uuid or "").strip() or _project_uuid(doc)
if not target_project_uuid:
try:
target_project_uuid = (getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "") or "").strip()
except Exception:
target_project_uuid = ""
return RoutingNetwork.create_layout_space_from_document(
doc,
project_uuid=target_project_uuid,
terminal_exit_length=float(opts.get("terminal_exit_length", 20.0) or 0.0),
terminal_access_max_distance=float(opts.get("terminal_access_max_distance", 1000.0) or 0.0),
)
def wire_task_count(doc):
return len(_iter_wire_tasks(doc))
def clear_auto_routes(doc):
removed = 0
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "AutoSuggested":
continue
try:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
try:
doc.recompute()
except Exception:
pass
return removed
def _console_message(message):
try:
App.Console.PrintMessage("[FreeCADExchange] {0}\n".format(message))
except Exception:
pass
def _console_error(message):
try:
App.Console.PrintError("[FreeCADExchange] {0}\n".format(message))
except Exception:
pass
class CommandAutoRouteAll:
def GetResources(self):
return {
"MenuText": "一键自动布线(全部导线)",
"ToolTip": "自动识别线槽/安装板并生成全部 3D 布线路径",
}
def IsActive(self):
return getattr(App, "ActiveDocument", None) is not None
def Activated(self):
doc = getattr(App, "ActiveDocument", None)
try:
prepared_layout = prepare_eplan_style_layout(doc)
payload = getattr(App, "_qet_exchange_payload", None)
if isinstance(payload, dict) and payload.get("wires"):
report = route_all_from_payload(doc, payload)
else:
report = route_all_tasks(doc)
report["prepared_layout"] = prepared_layout
if report.get("total_wires", 0) <= 0:
_console_error("没有导线任务。一键自动布线需要 QET wires[] 或 QETWiring_01_Tasks。")
return
_console_message(format_route_all_report(report))
except Exception as exc:
_console_error("批量自动布线失败:{0}".format(exc))
_COMMANDS_REGISTERED = False
def register_commands():
global _COMMANDS_REGISTERED
if _COMMANDS_REGISTERED:
return
if Gui is None or not hasattr(Gui, "addCommand"):
return
Gui.addCommand("QET_Exchange_AutoRouteAll", CommandAutoRouteAll())
_COMMANDS_REGISTERED = True
register_commands()