From fef4298c86fb210b31e2684082b338f4ce1bc1aa Mon Sep 17 00:00:00 2001 From: zhanghao <2024138486@qq.com> Date: Tue, 23 Jun 2026 17:01:23 +0800 Subject: [PATCH] =?UTF-8?q?feature/=E5=AF=BC=E7=BA=BF=E6=A0=B7=E5=BC=8F?= =?UTF-8?q?=E5=8F=98=E6=9B=B4=E5=90=8C=E6=AD=A5=E5=88=B0LightWork=203D-zh-?= =?UTF-8?q?0623?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Mod/FreeCADExchange/ExchangeBootstrap.py | 187 ++++++++++++++++ src/Mod/FreeCADExchange/WiringImport.py | 213 ++++++++++++++++++- 2 files changed, 399 insertions(+), 1 deletion(-) diff --git a/src/Mod/FreeCADExchange/ExchangeBootstrap.py b/src/Mod/FreeCADExchange/ExchangeBootstrap.py index 782d36d..0dd3865 100644 --- a/src/Mod/FreeCADExchange/ExchangeBootstrap.py +++ b/src/Mod/FreeCADExchange/ExchangeBootstrap.py @@ -53,11 +53,15 @@ STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report" STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled" STATE_TREE_FILTER = "_qet_exchange_tree_filter" STATE_TREE_SIGNAL_CONNECTIONS = "_qet_exchange_tree_signal_connections" +STATE_SIGNAL_TIMER = "_qet_exchange_signal_timer" +STATE_SIGNAL_REVISION = "_qet_exchange_signal_revision" +STATE_JSON_PATH = "_qet_exchange_json_path" TREE_FILTER_MARKER = "_qet_exchange_tree_filter_installed" TREE_SIGNAL_MARKER = "_qet_exchange_tree_signal_installed" IMPORT_READY_DELAY_MS = 1500 IMPORT_READY_RETRY_DELAY_MS = 1000 IMPORT_READY_MAX_RETRIES = 10 +SIGNAL_POLL_INTERVAL_MS = 1500 class ExchangeValidationError(RuntimeError): @@ -678,6 +682,18 @@ def _normalize_conductor_uuids(item, entry_label): return normalized +def _normalize_wire_style(item, entry_label): + value = item.get("wire_style", {}) + if value is None: + return {} + if not isinstance(value, dict): + _append_debug_log( + "Field 'wire_style' in {0} is not an object; ignored.".format(entry_label) + ) + return {} + return dict(value) + + def _normalize_wires(payload): wires = payload.get("wires", []) if wires is None: @@ -709,6 +725,7 @@ def _normalize_wires(payload): "wire_mark": _optional_string(item, "wire_mark", entry_label), "wire_mark_is_manual": wire_mark_is_manual, "wire_style_id": _optional_text(item, "wire_style_id"), + "wire_style": _normalize_wire_style(item, entry_label), "start_element_uuid": _optional_string(item, "start_element_uuid", entry_label), "start_instance_id": _optional_string(item, "start_instance_id", entry_label), "start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label), @@ -996,6 +1013,160 @@ def _import_wiring_tasks(payload): return None +def _signal_path_for_json(json_path): + try: + return str(Path(json_path).with_name("qet_exchange_signal.json")) + except Exception: + return "" + + +def _load_exchange_signal(signal_path): + path = Path(signal_path) + if not path.is_file(): + return None + try: + raw_text = path.read_text(encoding="utf-8") + except OSError as exc: + _append_debug_log("exchange signal read failed: {0}".format(exc)) + return None + try: + signal_payload = json.loads(raw_text) + except Exception as exc: + _append_debug_log("exchange signal JSON parse failed: {0}".format(exc)) + return None + if not isinstance(signal_payload, dict): + _append_debug_log("exchange signal ignored: root is not an object") + return None + return signal_payload + + +def _signal_revision(signal_payload): + try: + return int(signal_payload.get("revision", 0) or 0) + except Exception: + return 0 + + +def reload_exchange_payload_from_signal(json_path, signal_payload): + if WiringImport is None: + _append_debug_log("signal reload skipped: WiringImport module unavailable") + return None + + current_payload = getattr(App, STATE_PAYLOAD, None) + expected_project_uuid = "" + if isinstance(current_payload, dict): + expected_project_uuid = str(current_payload.get("project_uuid", "") or "").strip() + signal_project_uuid = str(signal_payload.get("project_uuid", "") or "").strip() + if expected_project_uuid and signal_project_uuid and signal_project_uuid != expected_project_uuid: + message = "exchange signal project_uuid mismatch: signal={0}, current={1}".format( + signal_project_uuid, + expected_project_uuid, + ) + _append_debug_log(message) + App.Console.PrintError("[FreeCADExchange] {0}\n".format(message)) + return None + + payload = load_exchange_payload(json_path) + summary = _build_summary(payload, json_path) + setattr(App, STATE_PAYLOAD, payload) + setattr(App, STATE_SUMMARY, summary) + + wiring_report = WiringImport.import_wire_tasks_from_payload(payload, App.ActiveDocument) + style_report = WiringImport.apply_wire_styles_from_payload(payload, App.ActiveDocument) + setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report) + try: + App.ActiveDocument.recompute() + except Exception as exc: + _append_debug_log("signal reload recompute failed: {0}".format(exc)) + + revision = _signal_revision(signal_payload) + _append_debug_log( + "exchange signal reload complete: revision={0}, tasks_updated={1}, routed_matched={2}, view_updates={3}".format( + revision, + wiring_report.get("updated_tasks", 0) if isinstance(wiring_report, dict) else 0, + style_report.get("routed_wires_matched", 0) if isinstance(style_report, dict) else 0, + style_report.get("updated_view", 0) if isinstance(style_report, dict) else 0, + ) + ) + App.Console.PrintMessage( + "[FreeCADExchange] Reloaded QET wire styles: revision {0}, tasks updated {1}, routed wires matched {2}, view updates {3}\n".format( + revision, + wiring_report.get("updated_tasks", 0) if isinstance(wiring_report, dict) else 0, + style_report.get("routed_wires_matched", 0) if isinstance(style_report, dict) else 0, + style_report.get("updated_view", 0) if isinstance(style_report, dict) else 0, + ) + ) + return {"wiring_report": wiring_report, "style_report": style_report} + + +def _check_exchange_signal(json_path): + signal_path = _signal_path_for_json(json_path) + if not signal_path: + return + signal_payload = _load_exchange_signal(signal_path) + if not isinstance(signal_payload, dict): + return + + event_name = str(signal_payload.get("event", "") or "").strip() + if event_name and event_name not in {"wire_style_changed"}: + _append_debug_log("exchange signal event treated as reload: {0}".format(event_name)) + + revision = _signal_revision(signal_payload) + if revision <= 0: + return + last_revision = int(getattr(App, STATE_SIGNAL_REVISION, 0) or 0) + if revision <= last_revision: + return + + try: + reload_exchange_payload_from_signal(json_path, signal_payload) + except ExchangeValidationError as exc: + _append_debug_log("exchange signal reload validation failed: {0}".format(exc)) + App.Console.PrintError("[FreeCADExchange] Signal reload failed: {0}\n".format(exc)) + return + except Exception as exc: + _append_debug_log("exchange signal reload failed: {0}".format(exc)) + _append_debug_log(traceback.format_exc()) + App.Console.PrintError("[FreeCADExchange] Signal reload failed: {0}\n".format(exc)) + return + + setattr(App, STATE_SIGNAL_REVISION, revision) + + +def _install_exchange_signal_timer(json_path): + json_path = str(json_path or "").strip() + if not json_path: + return + existing_timer = getattr(App, STATE_SIGNAL_TIMER, None) + if existing_timer is not None: + try: + existing_timer.stop() + except Exception: + pass + + signal_path = _signal_path_for_json(json_path) + initial_signal = _load_exchange_signal(signal_path) if signal_path else None + if isinstance(initial_signal, dict): + setattr(App, STATE_SIGNAL_REVISION, _signal_revision(initial_signal)) + else: + setattr(App, STATE_SIGNAL_REVISION, 0) + + # 中文注释:保持 QTimer 引用,避免 Python 对象被回收;FreeCAD 后续收到 QET 信号后 + # 只重载 wiring task / routed wire 样式,不重新创建全部设备。 + timer = QtCore.QTimer() + timer.setInterval(SIGNAL_POLL_INTERVAL_MS) + timer.timeout.connect(lambda: _check_exchange_signal(json_path)) + timer.start() + setattr(App, STATE_SIGNAL_TIMER, timer) + setattr(App, STATE_JSON_PATH, json_path) + _append_debug_log( + "exchange signal timer installed: path={0}, interval_ms={1}".format( + signal_path, + SIGNAL_POLL_INTERVAL_MS, + ) + ) + + DEFAULT_SCENE_FILE_NAME = "QETScene.FCStd" @@ -1648,6 +1819,21 @@ def _run_scheduled_device_import(attempt=0): wiring_report = _import_wiring_tasks(payload) if wiring_report is not None: setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report) + if WiringImport is not None: + try: + style_report = WiringImport.apply_wire_styles_from_payload( + payload, + App.ActiveDocument, + ) + _append_debug_log( + "initial routed wire style apply: matched={0}, view_updates={1}".format( + style_report.get("routed_wires_matched", 0), + style_report.get("updated_view", 0), + ) + ) + except Exception as exc: + _append_debug_log("initial routed wire style apply failed: {0}".format(exc)) + _append_debug_log(traceback.format_exc()) _log_document_state( "scheduled device import after wiring import", App.ActiveDocument, @@ -1768,6 +1954,7 @@ def bootstrap_if_requested(): ) setattr(App, STATE_PAYLOAD, payload) setattr(App, STATE_SUMMARY, summary) + _install_exchange_signal_timer(json_path) if not getattr(App, STATE_IMPORT_SCHEDULED, False): setattr(App, STATE_IMPORT_SCHEDULED, True) _append_debug_log( diff --git a/src/Mod/FreeCADExchange/WiringImport.py b/src/Mod/FreeCADExchange/WiringImport.py index e0272c1..928e16b 100644 --- a/src/Mod/FreeCADExchange/WiringImport.py +++ b/src/Mod/FreeCADExchange/WiringImport.py @@ -1,6 +1,7 @@ # FreeCADExchange wire task import helpers. import json +import re import FreeCAD as App @@ -42,6 +43,86 @@ def _conductor_uuids(item): return [str(value).strip() for value in values if str(value).strip()] +def _wire_style_payload(item): + wire_style = item.get("wire_style", {}) + if not isinstance(wire_style, dict): + return {} + return dict(wire_style) + + +def _canonical_wire_style_json(wire_style): + if not isinstance(wire_style, dict): + wire_style = {} + return json.dumps(wire_style, ensure_ascii=False, sort_keys=True) + + +def _style_text(wire_style, key): + if not isinstance(wire_style, dict): + return "" + value = wire_style.get(key, "") + if value is None: + return "" + return str(value).strip() + + +def _style_float_text(wire_style, key): + text = _style_text(wire_style, key) + if not text: + return "" + try: + return "{0:g}".format(float(text)) + except Exception: + return text + + +def _parse_line_color(value): + text = str(value or "").strip() + if not text: + return None + + match = re.fullmatch(r"#?([0-9a-fA-F]{6})", text) + if match: + raw = match.group(1) + return tuple(int(raw[index:index + 2], 16) / 255.0 for index in (0, 2, 4)) + + if "," in text: + try: + numbers = [float(part.strip()) for part in text.split(",")] + except Exception: + numbers = [] + if len(numbers) == 3: + if max(numbers) > 1.0: + numbers = [number / 255.0 for number in numbers] + return tuple(max(0.0, min(1.0, number)) for number in numbers) + + return None + + +def _line_width_value(wire_style): + try: + value = float(_style_text(wire_style, "line_width") or 0) + except Exception: + value = 0.0 + if value <= 0.0: + return None + return max(0.1, min(20.0, value)) + + +def _geometry_style_fields(wire_style): + return { + key: _style_text(wire_style, key) + for key in ("diameter_mm", "bend_radius_mm", "bend_radius_factor", "area_or_spec") + } + + +def _json_dict(text): + try: + value = json.loads(text or "{}") + except Exception: + return {} + return value if isinstance(value, dict) else {} + + def _device_display_map(payload): labels = {} for item in payload.get("devices", []) or []: @@ -99,6 +180,7 @@ def _normalize_wire_entry(item, index, device_labels=None): _endpoint_text(start_device_label, start_terminal_display, start_terminal_uuid), _endpoint_text(end_device_label, end_terminal_display, end_terminal_uuid), ) + wire_style = _wire_style_payload(item) return { "wire_uuid": wire_uuid, "wire_label": wire_label, @@ -107,6 +189,7 @@ def _normalize_wire_entry(item, index, device_labels=None): "wire_mark": wire_mark, "wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"), "wire_style_id": _int_text_value(item, "wire_style_id"), + "wire_style": wire_style, "start_element_uuid": start_element_uuid, "start_terminal_uuid": start_terminal_uuid, "start_instance_id": _string_value(item, "start_instance_id"), @@ -157,10 +240,84 @@ def _ensure_string_property(obj, prop_name, value, description="QET wire task pr ) +def _ensure_bool_property(obj, prop_name, value, description="QET wire task property"): + TerminalObjects.ensure_bool_property( + obj, + prop_name, + "QET Wiring", + description, + value, + ) + + +def _set_wire_style_properties(obj, wire_style_id, wire_style): + style = wire_style if isinstance(wire_style, dict) else {} + new_style_json = _canonical_wire_style_json(style) + old_style_json = (getattr(obj, "QetWireStyleJson", "") or "").strip() + old_style = _json_dict(old_style_json) + changed = old_style_json != new_style_json + + _ensure_string_property(obj, "QetWireStyleId", str(wire_style_id or "").strip()) + _ensure_string_property( + obj, + "QetWireStyleJson", + new_style_json, + "QET wire style JSON from 2D payload", + ) + _ensure_string_property(obj, "QetWireStyleName", _style_text(style, "name")) + _ensure_string_property(obj, "QetWireLineColor", _style_text(style, "line_color")) + _ensure_string_property(obj, "QetWireLineWidth", _style_float_text(style, "line_width")) + _ensure_string_property(obj, "QetWireDiameterMm", _style_float_text(style, "diameter_mm")) + _ensure_string_property(obj, "QetWireBendRadiusMm", _style_float_text(style, "bend_radius_mm")) + + if old_style_json and changed and _geometry_style_fields(old_style) != _geometry_style_fields(style): + _ensure_bool_property( + obj, + "QetWireStyleAffectsGeometry", + True, + "Whether latest QET wire style changes geometry-sensitive fields", + ) + elif "QetWireStyleAffectsGeometry" not in getattr(obj, "PropertiesList", []): + _ensure_bool_property( + obj, + "QetWireStyleAffectsGeometry", + False, + "Whether latest QET wire style changes geometry-sensitive fields", + ) + return changed + + +def _apply_wire_style_view(obj, wire_style): + changed = False + view = getattr(obj, "ViewObject", None) + if view is None: + return changed + + color = _parse_line_color(_style_text(wire_style, "line_color")) + if color is not None and hasattr(view, "LineColor"): + try: + if tuple(getattr(view, "LineColor", ())) != tuple(color): + view.LineColor = color + changed = True + except Exception: + pass + + width = _line_width_value(wire_style) + if width is not None and hasattr(view, "LineWidth"): + try: + if float(getattr(view, "LineWidth", 0.0) or 0.0) != width: + view.LineWidth = width + changed = True + except Exception: + pass + + return changed + + def _set_task_extra_properties(task, entry): _ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"]) _ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"]) - _ensure_string_property(task, "QetWireStyleId", entry["wire_style_id"]) + _set_wire_style_properties(task, entry["wire_style_id"], entry["wire_style"]) _ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"]) _ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"]) _ensure_string_property(task, "QetStartDeviceLabel", entry["start_device_label"]) @@ -258,3 +415,57 @@ def import_wire_tasks_from_payload(payload, doc=None): report["updated_tasks"] += 1 return report + + +def apply_wire_styles_from_payload(payload, doc=None): + if doc is None: + doc = getattr(App, "ActiveDocument", None) + if doc is None: + raise WiringImportError("No active FreeCAD document is available.") + if not isinstance(payload, dict): + raise WiringImportError("Exchange payload must be an object.") + + style_by_wire = {} + for item in payload.get("wires", []) or []: + if not isinstance(item, dict): + continue + wire_uuid = ( + _string_value(item, "wire_id") + or _string_value(item, "wire_uuid") + or _string_value(item, "id") + ) + if not wire_uuid: + continue + style_by_wire[wire_uuid] = { + "wire_style_id": _int_text_value(item, "wire_style_id"), + "wire_style": _wire_style_payload(item), + } + + report = { + "total_styles": len(style_by_wire), + "routed_wires_seen": 0, + "routed_wires_matched": 0, + "updated_properties": 0, + "updated_view": 0, + "warnings": [], + } + + for obj in list(WiringObjects.iter_routed_wire_objects(doc)): + report["routed_wires_seen"] += 1 + wire_uuid = (getattr(obj, "QetWireUuid", "") or "").strip() + if not wire_uuid or wire_uuid not in style_by_wire: + continue + report["routed_wires_matched"] += 1 + entry = style_by_wire[wire_uuid] + prop_changed = _set_wire_style_properties( + obj, + entry["wire_style_id"], + entry["wire_style"], + ) + view_changed = _apply_wire_style_view(obj, entry["wire_style"]) + if prop_changed: + report["updated_properties"] += 1 + if view_changed: + report["updated_view"] += 1 + + return report