You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1012 lines
36 KiB
Python

import json
import traceback
import os
from pathlib import Path
import FreeCAD as App
import FreeCADGui as Gui
import DeviceImport
import DevicePreview
try:
import ExchangeWriteBack
except Exception:
ExchangeWriteBack = None
try:
import TerminalImport
except Exception:
TerminalImport = None
try:
import WiringObjects
except Exception:
WiringObjects = None
try:
import WiringImport
except Exception:
WiringImport = None
try:
from PySide6 import QtCore, QtWidgets
except ImportError:
try:
from PySide2 import QtCore, QtWidgets
except ImportError:
from PySide import QtCore
from PySide import QtGui as QtWidgets
ENV_JSON_PATH = "QET_2D_TO_3D_JSON"
ENV_SCENE_PATH = "QET_FREECAD_SCENE_FILE"
STATE_FLAG = "_qet_exchange_bootstrapped"
STATE_PAYLOAD = "_qet_exchange_payload"
STATE_SUMMARY = "_qet_exchange_summary"
STATE_IMPORT_REPORT = "_qet_exchange_import_report"
STATE_TERMINAL_IMPORT_REPORT = "_qet_exchange_terminal_import_report"
STATE_WIRING_IMPORT_REPORT = "_qet_exchange_wiring_import_report"
STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report"
STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled"
STATE_TREE_FILTER = "_qet_exchange_tree_filter"
STATE_TREE_SIGNAL_CONNECTIONS = "_qet_exchange_tree_signal_connections"
TREE_FILTER_MARKER = "_qet_exchange_tree_filter_installed"
TREE_SIGNAL_MARKER = "_qet_exchange_tree_signal_installed"
IMPORT_READY_DELAY_MS = 1500
IMPORT_READY_RETRY_DELAY_MS = 1000
IMPORT_READY_MAX_RETRIES = 10
class ExchangeValidationError(RuntimeError):
pass
def _debug_log_path():
local_app_data = os.environ.get("LOCALAPPDATA", "").strip()
if local_app_data:
return os.path.join(local_app_data, "QETDeps", "freecad_exchange_bootstrap.log")
return os.path.join(str(Path.home()), "AppData", "Local", "QETDeps", "freecad_exchange_bootstrap.log")
def _append_debug_log(message):
try:
log_path = _debug_log_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as handle:
handle.write(message + "\n")
except Exception:
pass
def _reset_debug_log():
try:
log_path = _debug_log_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "w", encoding="utf-8") as handle:
handle.write("[QET Exchange] new debug session\n")
except Exception:
pass
def _get_main_window():
try:
return Gui.getMainWindow()
except Exception:
return None
def _show_info(title, message):
QtWidgets.QMessageBox.information(_get_main_window(), title, message)
def _show_error(title, message):
QtWidgets.QMessageBox.critical(_get_main_window(), title, message)
def _terminal_report_not_available():
return {
"imported_terminals": 0,
"updated_terminals": 0,
"removed_terminals": 0,
"skipped_terminals": 0,
"warnings": ["3D terminal import module is not available."],
}
def _qt_class_name(widget):
try:
return widget.metaObject().className()
except Exception:
return ""
def _has_tree_widget_parent(widget):
current = widget
while current is not None:
class_name = _qt_class_name(current)
if "TreeWidget" in class_name or class_name.endswith("QTreeWidget") or class_name.endswith("QTreeView"):
return True
current = current.parent()
return False
class _DeviceTreeDoubleClickFilter(QtCore.QObject):
def eventFilter(self, watched, event):
try:
if event.type() != QtCore.QEvent.MouseButtonDblClick:
return False
_append_debug_log(
"tree double click captured: watched_class={0}".format(
_qt_class_name(watched)
)
)
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
except Exception as exc:
_append_debug_log(
"tree double click filter failed: {0}".format(exc)
)
return False
class _DeviceTreeSignalBridge(QtCore.QObject):
def on_item_double_clicked(self, *args):
_append_debug_log("tree double click signal received: itemDoubleClicked")
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
def on_index_double_clicked(self, *args):
_append_debug_log("tree double click signal received: doubleClicked")
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
def _install_tree_double_click_filter():
main_window = _get_main_window()
if main_window is None:
return
if getattr(Gui, STATE_TREE_FILTER, None) is not None:
tree_filter = getattr(Gui, STATE_TREE_FILTER)
else:
tree_filter = _DeviceTreeDoubleClickFilter(main_window)
setattr(Gui, STATE_TREE_FILTER, tree_filter)
if getattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS, None) is not None:
signal_bridge = getattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS)
else:
signal_bridge = _DeviceTreeSignalBridge(main_window)
setattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS, signal_bridge)
installed_count = 0
signal_count = 0
for widget in main_window.findChildren(QtWidgets.QWidget):
class_name = _qt_class_name(widget)
if "TreeWidget" not in class_name and not class_name.endswith("QTreeWidget") and not class_name.endswith("QTreeView"):
continue
targets = [widget]
viewport = getattr(widget, "viewport", lambda: None)()
if viewport is not None:
targets.append(viewport)
for target in targets:
if target is None:
continue
if bool(target.property(TREE_FILTER_MARKER)):
continue
target.installEventFilter(tree_filter)
target.setProperty(TREE_FILTER_MARKER, True)
installed_count += 1
_append_debug_log(
"tree double click filter attached: class={0}".format(
_qt_class_name(target)
)
)
if not bool(widget.property(TREE_SIGNAL_MARKER)):
connected = False
item_double_clicked = getattr(widget, "itemDoubleClicked", None)
if item_double_clicked is not None:
try:
item_double_clicked.connect(signal_bridge.on_item_double_clicked)
connected = True
signal_count += 1
_append_debug_log(
"tree double click signal connected: class={0}, signal=itemDoubleClicked".format(
class_name
)
)
except Exception as exc:
_append_debug_log(
"tree double click signal connect failed: class={0}, signal=itemDoubleClicked, error={1}".format(
class_name, exc
)
)
view_double_clicked = getattr(widget, "doubleClicked", None)
if view_double_clicked is not None:
try:
view_double_clicked.connect(signal_bridge.on_index_double_clicked)
connected = True
signal_count += 1
_append_debug_log(
"tree double click signal connected: class={0}, signal=doubleClicked".format(
class_name
)
)
except Exception as exc:
_append_debug_log(
"tree double click signal connect failed: class={0}, signal=doubleClicked, error={1}".format(
class_name, exc
)
)
if connected:
widget.setProperty(TREE_SIGNAL_MARKER, True)
_append_debug_log(
"tree double click filter install pass complete: attached={0}, signal_connections={1}".format(
installed_count,
signal_count,
)
)
def _open_selected_device_preview_if_needed():
try:
selection = Gui.Selection.getSelection()
_append_debug_log(
"tree double click selection count={0}".format(len(selection))
)
if len(selection) != 1:
return
obj = selection[0]
_append_debug_log(
"tree double click selected object: name={0}, label={1}, type={2}, doc={3}".format(
getattr(obj, "Name", ""),
getattr(obj, "Label", ""),
getattr(obj, "TypeId", ""),
getattr(getattr(obj, "Document", None), "Name", ""),
)
)
device_obj = DevicePreview.find_parent_qet_device_object(obj)
_append_debug_log(
"tree double click resolved device object: name={0}, label={1}, doc={2}".format(
getattr(device_obj, "Name", "") if device_obj else "",
getattr(device_obj, "Label", "") if device_obj else "",
getattr(getattr(device_obj, "Document", None), "Name", "") if device_obj else "",
)
)
if device_obj is None:
return
if DevicePreview.is_preview_document_name(
getattr(getattr(device_obj, "Document", None), "Name", "")
):
_append_debug_log("tree double click ignored inside preview document")
return
DevicePreview.open_preview_for_device_object(device_obj)
_append_debug_log("tree double click preview open requested")
except Exception as exc:
_append_debug_log(
"open selected device preview failed: {0}".format(exc)
)
def _is_gui_ready():
main_window = _get_main_window()
if main_window is None:
return False
try:
return bool(main_window.isVisible())
except Exception:
return False
def _require_string(payload, field_name):
value = payload.get(field_name)
if not isinstance(value, str) or not value.strip():
raise ExchangeValidationError(
"Field '{0}' must be a non-empty string.".format(field_name)
)
return value.strip()
def _normalize_instance_id(item):
value = item.get("instance_id", "")
if value is None:
return ""
if not isinstance(value, str):
raise ExchangeValidationError(
"Field 'instance_id' must be a string when present."
)
return value.strip()
def _normalize_devices(payload):
devices = payload.get("devices", [])
if not isinstance(devices, list):
raise ExchangeValidationError("Field 'devices' must be a list.")
normalized = []
for index, item in enumerate(devices):
if not isinstance(item, dict):
raise ExchangeValidationError(
"Device entry #{0} must be an object.".format(index)
)
element_uuid = _require_string(item, "element_uuid")
display_tag = item.get("display_tag", "")
if display_tag and not isinstance(display_tag, str):
raise ExchangeValidationError(
"Field 'display_tag' in device entry #{0} must be a string.".format(
index
)
)
device_terminals = item.get("terminals", [])
if device_terminals is None:
device_terminals = []
if not isinstance(device_terminals, list):
raise ExchangeValidationError(
"Field 'terminals' in device entry #{0} must be a list.".format(index)
)
normalized_terminals = []
for terminal_index, terminal_item in enumerate(device_terminals):
terminal_entry_label = "device entry #{0} terminal entry #{1}".format(
index, terminal_index
)
if not isinstance(terminal_item, dict):
raise ExchangeValidationError(
"{0} must be an object.".format(terminal_entry_label.capitalize())
)
terminal_uuid = _require_string(terminal_item, "terminal_uuid")
terminal_element_uuid = _optional_string(
terminal_item, "element_uuid", terminal_entry_label
) or element_uuid
normalized_terminals.append(
{
"terminal_uuid": terminal_uuid,
"instance_id": _normalize_instance_id(terminal_item)
or _normalize_instance_id(item),
"element_uuid": terminal_element_uuid,
"terminal_display": _optional_string(
terminal_item, "terminal_display", terminal_entry_label
),
}
)
normalized.append(
{
"element_uuid": element_uuid,
"instance_id": _normalize_instance_id(item),
"display_tag": display_tag.strip() if isinstance(display_tag, str) else "",
"terminals": normalized_terminals,
}
)
return normalized
def _normalize_terminals(devices):
normalized = []
for device in devices:
for terminal in device.get("terminals", []) or []:
normalized.append(dict(terminal))
return normalized
def _optional_string(item, field_name, entry_label):
value = item.get(field_name, "")
if value is None:
return ""
if value and not isinstance(value, str):
raise ExchangeValidationError(
"Field '{0}' in {1} must be a string.".format(field_name, entry_label)
)
return value.strip() if isinstance(value, str) else ""
def _normalize_conductor_uuids(item, entry_label):
values = item.get("conductor_uuids", [])
if values is None:
return []
if not isinstance(values, list):
raise ExchangeValidationError(
"Field 'conductor_uuids' in {0} must be a list.".format(entry_label)
)
normalized = []
for conductor_index, value in enumerate(values):
if not isinstance(value, str):
raise ExchangeValidationError(
"Field 'conductor_uuids[{0}]' in {1} must be a string.".format(
conductor_index,
entry_label,
)
)
value = value.strip()
if value:
normalized.append(value)
return normalized
def _normalize_wires(payload):
wires = payload.get("wires", [])
if wires is None:
return []
if not isinstance(wires, list):
raise ExchangeValidationError("Field 'wires' must be a list.")
normalized = []
for index, item in enumerate(wires):
entry_label = "wire entry #{0}".format(index)
if not isinstance(item, dict):
raise ExchangeValidationError(
"Wire entry #{0} must be an object.".format(index)
)
wire_id = _require_string(item, "wire_id")
wire_mark_is_manual = item.get("wire_mark_is_manual", False)
if not isinstance(wire_mark_is_manual, bool):
raise ExchangeValidationError(
"Field 'wire_mark_is_manual' in {0} must be a bool.".format(
entry_label
)
)
normalized.append(
{
"wire_id": wire_id,
"net_uuid": _optional_string(item, "net_uuid", entry_label),
"group_uuid": _optional_string(item, "group_uuid", entry_label),
"wire_mark": _optional_string(item, "wire_mark", entry_label),
"wire_mark_is_manual": wire_mark_is_manual,
"start_element_uuid": _optional_string(item, "start_element_uuid", entry_label),
"start_instance_id": _optional_string(item, "start_instance_id", entry_label),
"start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label),
"end_element_uuid": _optional_string(item, "end_element_uuid", entry_label),
"end_instance_id": _optional_string(item, "end_instance_id", entry_label),
"end_terminal_uuid": _optional_string(item, "end_terminal_uuid", entry_label),
"start_terminal_display": _optional_string(item, "start_terminal_display", entry_label),
"end_terminal_display": _optional_string(item, "end_terminal_display", entry_label),
"conductor_uuids": _normalize_conductor_uuids(item, entry_label),
}
)
return normalized
def _normalize_device_models(payload):
models = payload.get("device_models", [])
if not isinstance(models, list):
raise ExchangeValidationError("Field 'device_models' must be a list.")
normalized = []
for index, item in enumerate(models):
if not isinstance(item, dict):
raise ExchangeValidationError(
"Device model entry #{0} must be an object.".format(index)
)
element_uuid = _require_string(item, "element_uuid")
parts_3d = item.get("parts_3d", "")
if parts_3d and not isinstance(parts_3d, str):
raise ExchangeValidationError(
"Field 'parts_3d' in device model entry #{0} must be a string.".format(
index
)
)
resolved_model_path = item.get("resolved_model_path", "")
if resolved_model_path and not isinstance(resolved_model_path, str):
raise ExchangeValidationError(
"Field 'resolved_model_path' in device model entry #{0} must be a string.".format(
index
)
)
device_id = item.get("device_id")
if device_id is not None and not isinstance(device_id, int):
raise ExchangeValidationError(
"Field 'device_id' in device model entry #{0} must be an integer.".format(
index
)
)
normalized.append(
{
"element_uuid": element_uuid,
"device_id": device_id,
"parts_3d": parts_3d.strip() if isinstance(parts_3d, str) else "",
"resolved_model_path": (
resolved_model_path.strip()
if isinstance(resolved_model_path, str)
else ""
),
}
)
return normalized
def _normalize_cabinet(payload):
cabinet = payload.get("cabinet")
if cabinet is None:
return None
if not isinstance(cabinet, dict):
raise ExchangeValidationError("Field 'cabinet' must be an object when present.")
location_id = cabinet.get("location_id")
if location_id is not None and not isinstance(location_id, int):
raise ExchangeValidationError("Field 'location_id' in cabinet must be an integer.")
normalized = {
"location_id": location_id,
"label": "",
"name": "",
"display_text": "",
"associated_fileset": "",
"three_d_relative_path": "",
"resolved_scene_path": "",
}
for field_name in (
"label",
"name",
"display_text",
"associated_fileset",
"three_d_relative_path",
"resolved_scene_path",
):
value = cabinet.get(field_name, "")
if value and not isinstance(value, str):
raise ExchangeValidationError(
"Field '{0}' in cabinet must be a string.".format(field_name)
)
normalized[field_name] = value.strip() if isinstance(value, str) else ""
return normalized
def load_exchange_payload(json_path):
try:
raw_text = Path(json_path).read_text(encoding="utf-8")
except OSError as exc:
raise ExchangeValidationError(
"Failed to read exchange file:\n{0}".format(exc)
) from exc
try:
payload = json.loads(raw_text)
except json.JSONDecodeError as exc:
raise ExchangeValidationError(
"Exchange JSON is invalid:\n{0}".format(exc)
) from exc
if not isinstance(payload, dict):
raise ExchangeValidationError("Exchange JSON root must be an object.")
project_uuid = _require_string(payload, "project_uuid")
schema_version = payload.get("schema_version", "1.0")
if not isinstance(schema_version, str) or not schema_version.strip():
raise ExchangeValidationError("Field 'schema_version' must be a string.")
normalized_devices = _normalize_devices(payload)
normalized = {
"schema_version": schema_version.strip(),
"project_uuid": project_uuid,
"generated_at": payload.get("generated_at", ""),
"source": payload.get("source", {}),
"cabinet": _normalize_cabinet(payload),
"devices": normalized_devices,
"terminals": _normalize_terminals(normalized_devices),
"device_models": _normalize_device_models(payload),
"wires": _normalize_wires(payload),
}
return normalized
def _build_summary(payload, json_path):
devices = payload["devices"]
terminals = payload["terminals"]
device_models = payload["device_models"]
wires = payload.get("wires", [])
cabinet = payload.get("cabinet")
missing_device_instances = sum(1 for item in devices if not item["instance_id"])
missing_terminal_instances = sum(
1 for item in terminals if not item["instance_id"]
)
with_model_paths = sum(
1 for item in device_models if item["resolved_model_path"] or item["parts_3d"]
)
return {
"json_path": json_path,
"project_uuid": payload["project_uuid"],
"device_count": len(devices),
"terminal_count": len(terminals),
"wire_count": len(wires),
"device_model_count": len(device_models),
"device_models_with_parts": with_model_paths,
"missing_device_instances": missing_device_instances,
"missing_terminal_instances": missing_terminal_instances,
"cabinet": cabinet,
"scene_path": os.environ.get(ENV_SCENE_PATH, "").strip(),
}
def _initialize_wiring_scene(payload):
if WiringObjects is None:
_append_debug_log("wiring scene initialization skipped: WiringObjects module unavailable")
return None
try:
return WiringObjects.initialize_wiring_scene(
App.ActiveDocument,
payload.get("project_uuid", "") if isinstance(payload, dict) else "",
)
except Exception as exc:
_append_debug_log("wiring scene initialization failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
return None
def _import_wiring_tasks(payload):
if WiringImport is None:
_append_debug_log("wire task import skipped: WiringImport module unavailable")
return None
try:
return WiringImport.import_wire_tasks_from_payload(
payload,
App.ActiveDocument,
)
except Exception as exc:
_append_debug_log("wire task import failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
return None
def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None):
lines = [
"QET exchange file loaded successfully.",
"",
"Project UUID: {0}".format(summary["project_uuid"]),
"Exchange file: {0}".format(summary["json_path"]),
"Devices: {0}".format(summary["device_count"]),
"Terminals: {0}".format(summary["terminal_count"]),
"Wires: {0}".format(summary["wire_count"]),
"Device models: {0}".format(summary["device_model_count"]),
"Resolved model paths: {0}".format(summary["device_models_with_parts"]),
]
cabinet = summary.get("cabinet")
if isinstance(cabinet, dict):
cabinet_name = cabinet.get("display_text") or cabinet.get("label") or cabinet.get("name")
if cabinet_name:
lines.append("Cabinet: {0}".format(cabinet_name))
if cabinet.get("associated_fileset"):
lines.append("Cabinet fileset: {0}".format(cabinet["associated_fileset"]))
if cabinet.get("three_d_relative_path"):
lines.append(
"Cabinet 3D relative path: {0}".format(
cabinet["three_d_relative_path"]
)
)
if cabinet.get("resolved_scene_path"):
lines.append(
"Cabinet resolved scene path: {0}".format(
cabinet["resolved_scene_path"]
)
)
if summary["missing_device_instances"]:
lines.append(
"Devices without instance_id yet: {0}".format(
summary["missing_device_instances"]
)
)
if summary["missing_terminal_instances"]:
lines.append(
"Terminals without instance_id yet: {0}".format(
summary["missing_terminal_instances"]
)
)
if summary["scene_path"]:
lines.append("Scene file: {0}".format(summary["scene_path"]))
if import_report:
lines.extend(
[
"",
"3D device import summary:",
"Target document: {0}".format(import_report["document_name"]),
"Imported cabinet models: {0}".format(import_report["cabinet_imported"]),
"Imported devices: {0}".format(import_report["imported_devices"]),
"Updated devices: {0}".format(import_report["updated_devices"]),
]
)
if import_report["imported_without_instance_id"]:
lines.append(
"Imported without instance_id yet: {0}".format(
import_report["imported_without_instance_id"]
)
)
if import_report["skipped_missing_model"]:
lines.append(
"Skipped without resolved model path: {0}".format(
import_report["skipped_missing_model"]
)
)
if import_report["skipped_missing_file"]:
lines.append(
"Skipped missing model file: {0}".format(
import_report["skipped_missing_file"]
)
)
if import_report["skipped_unsupported_format"]:
lines.append(
"Skipped unsupported model format: {0}".format(
import_report["skipped_unsupported_format"]
)
)
if import_report["skipped_import_error"]:
lines.append(
"Skipped after import errors: {0}".format(
import_report["skipped_import_error"]
)
)
warnings = import_report.get("warnings", [])
if warnings:
lines.append("")
lines.append("Warnings:")
lines.extend("- {0}".format(item) for item in warnings[:10])
if len(warnings) > 10:
lines.append("- ... ({0} more)".format(len(warnings) - 10))
if terminal_report:
lines.extend(
[
"",
"3D terminal import summary:",
"Document: {0}".format(terminal_report["document_name"]),
"Imported terminals: {0}".format(terminal_report["imported_terminals"]),
"Updated terminals: {0}".format(terminal_report["updated_terminals"]),
"Removed stale terminals: {0}".format(terminal_report["removed_terminals"]),
]
)
warnings = terminal_report.get("warnings", [])
if warnings:
lines.append("Warnings:")
lines.extend("- {0}".format(item) for item in warnings[:10])
if len(warnings) > 10:
lines.append("- ... ({0} more)".format(len(warnings) - 10))
if writeback_report:
lines.extend(
[
"",
"3D write-back:",
"Output file: {0}".format(writeback_report["output_path"]),
"Instances written: {0}".format(len(writeback_report["instances"])),
"Terminals written: {0}".format(len(writeback_report["terminals"])),
]
)
if wiring_report:
lines.extend(
[
"",
"3D wire tasks:",
"Imported tasks: {0}".format(wiring_report.get("imported_tasks", 0)),
"Updated tasks: {0}".format(wiring_report.get("updated_tasks", 0)),
]
)
warnings = wiring_report.get("warnings", [])
if warnings:
lines.append("Wire task warnings:")
lines.extend("- {0}".format(item) for item in warnings[:10])
if len(warnings) > 10:
lines.append("- ... ({0} more)".format(len(warnings) - 10))
lines.append("")
lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.")
lines.append("3D terminal import and write-back are enabled.")
return "\n".join(lines)
def _run_scheduled_device_import(attempt=0):
_append_debug_log(
"scheduled device import invoked: attempt={0}".format(attempt)
)
if not _is_gui_ready():
if attempt < IMPORT_READY_MAX_RETRIES:
_append_debug_log(
"scheduled device import postponed: gui not ready, retrying"
)
QtCore.QTimer.singleShot(
IMPORT_READY_RETRY_DELAY_MS,
lambda: _run_scheduled_device_import(attempt + 1),
)
return
_append_debug_log("scheduled device import aborted: gui never became ready")
_show_error(
"QET Exchange",
"FreeCAD main window did not finish initializing before device import.",
)
return
payload = getattr(App, STATE_PAYLOAD, None)
summary = getattr(App, STATE_SUMMARY, None)
if not isinstance(payload, dict) or not isinstance(summary, dict):
_append_debug_log(
"scheduled device import aborted: cached payload/summary missing"
)
return
scene_path = os.environ.get(ENV_SCENE_PATH, "").strip()
_append_debug_log(
"scheduled device import starting with scene_path={0}".format(scene_path)
)
try:
import_report = DeviceImport.import_devices_from_payload(payload, scene_path)
except DeviceImport.DeviceImportError as exc:
_append_debug_log("device import failed: {0}".format(exc))
_show_error("QET Exchange", str(exc))
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
return
except Exception as exc:
_append_debug_log("unexpected device import exception: {0}".format(exc))
_append_debug_log(traceback.format_exc())
_show_error("QET Exchange", "Failed to import 3D devices:\n{0}".format(exc))
App.Console.PrintError(
"[FreeCADExchange] Failed to import devices: {0}\n".format(exc)
)
return
setattr(App, STATE_IMPORT_REPORT, import_report)
_append_debug_log(
"device import summary: imported={0}, updated={1}, skipped_missing_model={2}, skipped_missing_file={3}, skipped_import_error={4}".format(
import_report["imported_devices"],
import_report["updated_devices"],
import_report["skipped_missing_model"],
import_report["skipped_missing_file"],
import_report["skipped_import_error"],
)
)
App.Console.PrintMessage(
"[FreeCADExchange] Loaded exchange payload from {0}\n".format(
summary["json_path"]
)
)
App.Console.PrintMessage(
"[FreeCADExchange] Devices: {0}, Terminals: {1}, Device models: {2}\n".format(
summary["device_count"],
summary["terminal_count"],
summary["device_model_count"],
)
)
App.Console.PrintMessage(
"[FreeCADExchange] Imported devices: {0}, updated: {1}, skipped without model: {2}\n".format(
import_report["imported_devices"],
import_report["updated_devices"],
import_report["skipped_missing_model"],
)
)
if import_report.get("generated_instance_ids"):
App.Console.PrintMessage(
"[FreeCADExchange] Generated device instance IDs: {0}\n".format(
import_report["generated_instance_ids"]
)
)
if TerminalImport is None:
_append_debug_log("terminal import skipped: TerminalImport module unavailable")
terminal_report = _terminal_report_not_available()
else:
try:
terminal_report = TerminalImport.import_terminals_from_payload(payload, scene_path)
except TerminalImport.TerminalImportError as exc:
_append_debug_log("terminal import failed: {0}".format(exc))
_show_error("QET Exchange", str(exc))
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
return
except Exception as exc:
_append_debug_log("unexpected terminal import exception: {0}".format(exc))
_append_debug_log(traceback.format_exc())
_show_error("QET Exchange", "Failed to import 3D terminals:\n{0}".format(exc))
App.Console.PrintError(
"[FreeCADExchange] Failed to import terminals: {0}\n".format(exc)
)
return
setattr(App, STATE_TERMINAL_IMPORT_REPORT, terminal_report)
_initialize_wiring_scene(payload)
wiring_report = _import_wiring_tasks(payload)
if wiring_report is not None:
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
if ExchangeWriteBack is None:
_append_debug_log("write-back skipped: ExchangeWriteBack module unavailable")
writeback_report = None
else:
try:
writeback_report = ExchangeWriteBack.write_back_document(
App.ActiveDocument, scene_path=scene_path, payload=payload
)
except Exception as exc:
_append_debug_log("write-back failed after import: {0}".format(exc))
_append_debug_log(traceback.format_exc())
writeback_report = None
else:
setattr(App, STATE_WRITEBACK_REPORT, writeback_report)
App.Console.PrintMessage(
"[FreeCADExchange] Imported terminals: {0}, updated: {1}, removed: {2}\n".format(
terminal_report["imported_terminals"],
terminal_report["updated_terminals"],
terminal_report["removed_terminals"],
)
)
_show_info(
"QET Exchange",
_summary_message(summary, import_report, terminal_report, writeback_report, wiring_report),
)
_append_debug_log("summary dialog shown")
def bootstrap_if_requested():
if not getattr(App, STATE_FLAG, False):
_reset_debug_log()
_append_debug_log("bootstrap_if_requested entered")
_install_tree_double_click_filter()
if getattr(App, STATE_FLAG, False):
_append_debug_log("bootstrap_if_requested skipped: already bootstrapped")
return
json_path = os.environ.get(ENV_JSON_PATH, "").strip()
_append_debug_log("ENV QET_2D_TO_3D_JSON={0}".format(json_path))
_append_debug_log("ENV QET_FREECAD_SCENE_FILE={0}".format(os.environ.get(ENV_SCENE_PATH, "").strip()))
if not json_path:
_append_debug_log("bootstrap_if_requested skipped: env missing")
return
setattr(App, STATE_FLAG, True)
_append_debug_log("STATE_FLAG set")
if not os.path.isfile(json_path):
_append_debug_log("exchange file missing: {0}".format(json_path))
_show_error(
"QET Exchange",
"Environment variable {0} points to a missing file:\n{1}".format(
ENV_JSON_PATH, json_path
),
)
return
try:
payload = load_exchange_payload(json_path)
except ExchangeValidationError as exc:
_append_debug_log("payload validation failed: {0}".format(exc))
_show_error("QET Exchange", str(exc))
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
return
except Exception as exc:
_append_debug_log("unexpected payload exception: {0}".format(exc))
_append_debug_log(traceback.format_exc())
raise
summary = _build_summary(payload, json_path)
_append_debug_log(
"payload loaded: devices={0}, terminals={1}, models={2}, cabinet={3}".format(
summary["device_count"],
summary["terminal_count"],
summary["device_model_count"],
"yes" if summary.get("cabinet") else "no",
)
)
setattr(App, STATE_PAYLOAD, payload)
setattr(App, STATE_SUMMARY, summary)
if not getattr(App, STATE_IMPORT_SCHEDULED, False):
setattr(App, STATE_IMPORT_SCHEDULED, True)
_append_debug_log(
"device import scheduled after startup delay: {0} ms".format(
IMPORT_READY_DELAY_MS
)
)
QtCore.QTimer.singleShot(
IMPORT_READY_DELAY_MS, lambda: _run_scheduled_device_import(0)
)