You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
996 lines
36 KiB
Python
996 lines
36 KiB
Python
import json
|
|
import traceback
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import FreeCAD as App
|
|
import FreeCADGui as Gui
|
|
import DeviceImport
|
|
import DevicePreview
|
|
try:
|
|
import ExchangeWriteBack
|
|
except Exception:
|
|
ExchangeWriteBack = None
|
|
try:
|
|
import TerminalImport
|
|
except Exception:
|
|
TerminalImport = None
|
|
try:
|
|
import WiringObjects
|
|
except Exception:
|
|
WiringObjects = None
|
|
try:
|
|
import WiringImport
|
|
except Exception:
|
|
WiringImport = None
|
|
|
|
try:
|
|
from PySide6 import QtCore, QtWidgets
|
|
except ImportError:
|
|
try:
|
|
from PySide2 import QtCore, QtWidgets
|
|
except ImportError:
|
|
from PySide import QtCore
|
|
from PySide import QtGui as QtWidgets
|
|
|
|
|
|
ENV_JSON_PATH = "QET_2D_TO_3D_JSON"
|
|
ENV_SCENE_PATH = "QET_FREECAD_SCENE_FILE"
|
|
STATE_FLAG = "_qet_exchange_bootstrapped"
|
|
STATE_PAYLOAD = "_qet_exchange_payload"
|
|
STATE_SUMMARY = "_qet_exchange_summary"
|
|
STATE_IMPORT_REPORT = "_qet_exchange_import_report"
|
|
STATE_TERMINAL_IMPORT_REPORT = "_qet_exchange_terminal_import_report"
|
|
STATE_WIRING_IMPORT_REPORT = "_qet_exchange_wiring_import_report"
|
|
STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report"
|
|
STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled"
|
|
STATE_TREE_FILTER = "_qet_exchange_tree_filter"
|
|
STATE_TREE_SIGNAL_CONNECTIONS = "_qet_exchange_tree_signal_connections"
|
|
TREE_FILTER_MARKER = "_qet_exchange_tree_filter_installed"
|
|
TREE_SIGNAL_MARKER = "_qet_exchange_tree_signal_installed"
|
|
IMPORT_READY_DELAY_MS = 1500
|
|
IMPORT_READY_RETRY_DELAY_MS = 1000
|
|
IMPORT_READY_MAX_RETRIES = 10
|
|
|
|
|
|
class ExchangeValidationError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _debug_log_path():
|
|
local_app_data = os.environ.get("LOCALAPPDATA", "").strip()
|
|
if local_app_data:
|
|
return os.path.join(local_app_data, "QETDeps", "freecad_exchange_bootstrap.log")
|
|
return os.path.join(str(Path.home()), "AppData", "Local", "QETDeps", "freecad_exchange_bootstrap.log")
|
|
|
|
|
|
def _append_debug_log(message):
|
|
try:
|
|
log_path = _debug_log_path()
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
|
with open(log_path, "a", encoding="utf-8") as handle:
|
|
handle.write(message + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _reset_debug_log():
|
|
try:
|
|
log_path = _debug_log_path()
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
|
with open(log_path, "w", encoding="utf-8") as handle:
|
|
handle.write("[QET Exchange] new debug session\n")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _get_main_window():
|
|
try:
|
|
return Gui.getMainWindow()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _show_info(title, message):
|
|
QtWidgets.QMessageBox.information(_get_main_window(), title, message)
|
|
|
|
|
|
def _show_error(title, message):
|
|
QtWidgets.QMessageBox.critical(_get_main_window(), title, message)
|
|
|
|
|
|
def _terminal_report_not_available():
|
|
return {
|
|
"imported_terminals": 0,
|
|
"updated_terminals": 0,
|
|
"removed_terminals": 0,
|
|
"skipped_terminals": 0,
|
|
"warnings": ["3D terminal import module is not available."],
|
|
}
|
|
|
|
|
|
def _qt_class_name(widget):
|
|
try:
|
|
return widget.metaObject().className()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _has_tree_widget_parent(widget):
|
|
current = widget
|
|
while current is not None:
|
|
class_name = _qt_class_name(current)
|
|
if "TreeWidget" in class_name or class_name.endswith("QTreeWidget") or class_name.endswith("QTreeView"):
|
|
return True
|
|
current = current.parent()
|
|
return False
|
|
|
|
|
|
class _DeviceTreeDoubleClickFilter(QtCore.QObject):
|
|
def eventFilter(self, watched, event):
|
|
try:
|
|
if event.type() != QtCore.QEvent.MouseButtonDblClick:
|
|
return False
|
|
_append_debug_log(
|
|
"tree double click captured: watched_class={0}".format(
|
|
_qt_class_name(watched)
|
|
)
|
|
)
|
|
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
|
|
except Exception as exc:
|
|
_append_debug_log(
|
|
"tree double click filter failed: {0}".format(exc)
|
|
)
|
|
return False
|
|
|
|
|
|
class _DeviceTreeSignalBridge(QtCore.QObject):
|
|
def on_item_double_clicked(self, *args):
|
|
_append_debug_log("tree double click signal received: itemDoubleClicked")
|
|
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
|
|
|
|
def on_index_double_clicked(self, *args):
|
|
_append_debug_log("tree double click signal received: doubleClicked")
|
|
QtCore.QTimer.singleShot(0, _open_selected_device_preview_if_needed)
|
|
|
|
|
|
def _install_tree_double_click_filter():
|
|
main_window = _get_main_window()
|
|
if main_window is None:
|
|
return
|
|
if getattr(Gui, STATE_TREE_FILTER, None) is not None:
|
|
tree_filter = getattr(Gui, STATE_TREE_FILTER)
|
|
else:
|
|
tree_filter = _DeviceTreeDoubleClickFilter(main_window)
|
|
setattr(Gui, STATE_TREE_FILTER, tree_filter)
|
|
|
|
if getattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS, None) is not None:
|
|
signal_bridge = getattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS)
|
|
else:
|
|
signal_bridge = _DeviceTreeSignalBridge(main_window)
|
|
setattr(Gui, STATE_TREE_SIGNAL_CONNECTIONS, signal_bridge)
|
|
|
|
installed_count = 0
|
|
signal_count = 0
|
|
for widget in main_window.findChildren(QtWidgets.QWidget):
|
|
class_name = _qt_class_name(widget)
|
|
if "TreeWidget" not in class_name and not class_name.endswith("QTreeWidget") and not class_name.endswith("QTreeView"):
|
|
continue
|
|
|
|
targets = [widget]
|
|
viewport = getattr(widget, "viewport", lambda: None)()
|
|
if viewport is not None:
|
|
targets.append(viewport)
|
|
|
|
for target in targets:
|
|
if target is None:
|
|
continue
|
|
if bool(target.property(TREE_FILTER_MARKER)):
|
|
continue
|
|
target.installEventFilter(tree_filter)
|
|
target.setProperty(TREE_FILTER_MARKER, True)
|
|
installed_count += 1
|
|
_append_debug_log(
|
|
"tree double click filter attached: class={0}".format(
|
|
_qt_class_name(target)
|
|
)
|
|
)
|
|
|
|
if not bool(widget.property(TREE_SIGNAL_MARKER)):
|
|
connected = False
|
|
item_double_clicked = getattr(widget, "itemDoubleClicked", None)
|
|
if item_double_clicked is not None:
|
|
try:
|
|
item_double_clicked.connect(signal_bridge.on_item_double_clicked)
|
|
connected = True
|
|
signal_count += 1
|
|
_append_debug_log(
|
|
"tree double click signal connected: class={0}, signal=itemDoubleClicked".format(
|
|
class_name
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
_append_debug_log(
|
|
"tree double click signal connect failed: class={0}, signal=itemDoubleClicked, error={1}".format(
|
|
class_name, exc
|
|
)
|
|
)
|
|
|
|
view_double_clicked = getattr(widget, "doubleClicked", None)
|
|
if view_double_clicked is not None:
|
|
try:
|
|
view_double_clicked.connect(signal_bridge.on_index_double_clicked)
|
|
connected = True
|
|
signal_count += 1
|
|
_append_debug_log(
|
|
"tree double click signal connected: class={0}, signal=doubleClicked".format(
|
|
class_name
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
_append_debug_log(
|
|
"tree double click signal connect failed: class={0}, signal=doubleClicked, error={1}".format(
|
|
class_name, exc
|
|
)
|
|
)
|
|
|
|
if connected:
|
|
widget.setProperty(TREE_SIGNAL_MARKER, True)
|
|
|
|
_append_debug_log(
|
|
"tree double click filter install pass complete: attached={0}, signal_connections={1}".format(
|
|
installed_count,
|
|
signal_count,
|
|
)
|
|
)
|
|
|
|
|
|
def _open_selected_device_preview_if_needed():
|
|
try:
|
|
selection = Gui.Selection.getSelection()
|
|
_append_debug_log(
|
|
"tree double click selection count={0}".format(len(selection))
|
|
)
|
|
if len(selection) != 1:
|
|
return
|
|
obj = selection[0]
|
|
_append_debug_log(
|
|
"tree double click selected object: name={0}, label={1}, type={2}, doc={3}".format(
|
|
getattr(obj, "Name", ""),
|
|
getattr(obj, "Label", ""),
|
|
getattr(obj, "TypeId", ""),
|
|
getattr(getattr(obj, "Document", None), "Name", ""),
|
|
)
|
|
)
|
|
|
|
device_obj = DevicePreview.find_parent_qet_device_object(obj)
|
|
_append_debug_log(
|
|
"tree double click resolved device object: name={0}, label={1}, doc={2}".format(
|
|
getattr(device_obj, "Name", "") if device_obj else "",
|
|
getattr(device_obj, "Label", "") if device_obj else "",
|
|
getattr(getattr(device_obj, "Document", None), "Name", "") if device_obj else "",
|
|
)
|
|
)
|
|
if device_obj is None:
|
|
return
|
|
|
|
if DevicePreview.is_preview_document_name(
|
|
getattr(getattr(device_obj, "Document", None), "Name", "")
|
|
):
|
|
_append_debug_log("tree double click ignored inside preview document")
|
|
return
|
|
|
|
DevicePreview.open_preview_for_device_object(device_obj)
|
|
_append_debug_log("tree double click preview open requested")
|
|
except Exception as exc:
|
|
_append_debug_log(
|
|
"open selected device preview failed: {0}".format(exc)
|
|
)
|
|
|
|
|
|
def _is_gui_ready():
|
|
main_window = _get_main_window()
|
|
if main_window is None:
|
|
return False
|
|
try:
|
|
return bool(main_window.isVisible())
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _require_string(payload, field_name):
|
|
value = payload.get(field_name)
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ExchangeValidationError(
|
|
"Field '{0}' must be a non-empty string.".format(field_name)
|
|
)
|
|
return value.strip()
|
|
|
|
|
|
def _normalize_instance_id(item):
|
|
value = item.get("instance_id", "")
|
|
if value is None:
|
|
return ""
|
|
if not isinstance(value, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'instance_id' must be a string when present."
|
|
)
|
|
return value.strip()
|
|
|
|
|
|
def _normalize_devices(payload):
|
|
devices = payload.get("devices", [])
|
|
if not isinstance(devices, list):
|
|
raise ExchangeValidationError("Field 'devices' must be a list.")
|
|
|
|
normalized = []
|
|
for index, item in enumerate(devices):
|
|
if not isinstance(item, dict):
|
|
raise ExchangeValidationError(
|
|
"Device entry #{0} must be an object.".format(index)
|
|
)
|
|
element_uuid = _require_string(item, "element_uuid")
|
|
display_tag = item.get("display_tag", "")
|
|
if display_tag and not isinstance(display_tag, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'display_tag' in device entry #{0} must be a string.".format(
|
|
index
|
|
)
|
|
)
|
|
normalized.append(
|
|
{
|
|
"element_uuid": element_uuid,
|
|
"instance_id": _normalize_instance_id(item),
|
|
"display_tag": display_tag.strip() if isinstance(display_tag, str) else "",
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _normalize_terminals(payload):
|
|
terminals = payload.get("terminals", [])
|
|
if not isinstance(terminals, list):
|
|
raise ExchangeValidationError("Field 'terminals' must be a list.")
|
|
|
|
normalized = []
|
|
for index, item in enumerate(terminals):
|
|
if not isinstance(item, dict):
|
|
raise ExchangeValidationError(
|
|
"Terminal entry #{0} must be an object.".format(index)
|
|
)
|
|
terminal_uuid = _require_string(item, "terminal_uuid")
|
|
element_uuid = item.get("element_uuid", "")
|
|
if element_uuid and not isinstance(element_uuid, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'element_uuid' in terminal entry #{0} must be a string.".format(
|
|
index
|
|
)
|
|
)
|
|
normalized.append(
|
|
{
|
|
"terminal_uuid": terminal_uuid,
|
|
"instance_id": _normalize_instance_id(item),
|
|
"element_uuid": element_uuid.strip() if isinstance(element_uuid, str) else "",
|
|
"terminal_display": _optional_string(item, "terminal_display", "terminal entry #{0}".format(index)),
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _optional_string(item, field_name, entry_label):
|
|
value = item.get(field_name, "")
|
|
if value is None:
|
|
return ""
|
|
if value and not isinstance(value, str):
|
|
raise ExchangeValidationError(
|
|
"Field '{0}' in {1} must be a string.".format(field_name, entry_label)
|
|
)
|
|
return value.strip() if isinstance(value, str) else ""
|
|
|
|
|
|
def _normalize_conductor_uuids(item, entry_label):
|
|
values = item.get("conductor_uuids", [])
|
|
if values is None:
|
|
return []
|
|
if not isinstance(values, list):
|
|
raise ExchangeValidationError(
|
|
"Field 'conductor_uuids' in {0} must be a list.".format(entry_label)
|
|
)
|
|
normalized = []
|
|
for conductor_index, value in enumerate(values):
|
|
if not isinstance(value, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'conductor_uuids[{0}]' in {1} must be a string.".format(
|
|
conductor_index,
|
|
entry_label,
|
|
)
|
|
)
|
|
value = value.strip()
|
|
if value:
|
|
normalized.append(value)
|
|
return normalized
|
|
|
|
|
|
def _normalize_wires(payload):
|
|
wires = payload.get("wires", [])
|
|
if wires is None:
|
|
return []
|
|
if not isinstance(wires, list):
|
|
raise ExchangeValidationError("Field 'wires' must be a list.")
|
|
|
|
normalized = []
|
|
for index, item in enumerate(wires):
|
|
entry_label = "wire entry #{0}".format(index)
|
|
if not isinstance(item, dict):
|
|
raise ExchangeValidationError(
|
|
"Wire entry #{0} must be an object.".format(index)
|
|
)
|
|
wire_id = _require_string(item, "wire_id")
|
|
wire_mark_is_manual = item.get("wire_mark_is_manual", False)
|
|
if not isinstance(wire_mark_is_manual, bool):
|
|
raise ExchangeValidationError(
|
|
"Field 'wire_mark_is_manual' in {0} must be a bool.".format(
|
|
entry_label
|
|
)
|
|
)
|
|
normalized.append(
|
|
{
|
|
"wire_id": wire_id,
|
|
"net_uuid": _optional_string(item, "net_uuid", entry_label),
|
|
"group_uuid": _optional_string(item, "group_uuid", entry_label),
|
|
"wire_mark": _optional_string(item, "wire_mark", entry_label),
|
|
"wire_mark_is_manual": wire_mark_is_manual,
|
|
"start_element_uuid": _optional_string(item, "start_element_uuid", entry_label),
|
|
"start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label),
|
|
"end_element_uuid": _optional_string(item, "end_element_uuid", entry_label),
|
|
"end_terminal_uuid": _optional_string(item, "end_terminal_uuid", entry_label),
|
|
"start_terminal_display": _optional_string(item, "start_terminal_display", entry_label),
|
|
"end_terminal_display": _optional_string(item, "end_terminal_display", entry_label),
|
|
"conductor_uuids": _normalize_conductor_uuids(item, entry_label),
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _normalize_device_models(payload):
|
|
models = payload.get("device_models", [])
|
|
if not isinstance(models, list):
|
|
raise ExchangeValidationError("Field 'device_models' must be a list.")
|
|
|
|
normalized = []
|
|
for index, item in enumerate(models):
|
|
if not isinstance(item, dict):
|
|
raise ExchangeValidationError(
|
|
"Device model entry #{0} must be an object.".format(index)
|
|
)
|
|
element_uuid = _require_string(item, "element_uuid")
|
|
parts_3d = item.get("parts_3d", "")
|
|
if parts_3d and not isinstance(parts_3d, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'parts_3d' in device model entry #{0} must be a string.".format(
|
|
index
|
|
)
|
|
)
|
|
resolved_model_path = item.get("resolved_model_path", "")
|
|
if resolved_model_path and not isinstance(resolved_model_path, str):
|
|
raise ExchangeValidationError(
|
|
"Field 'resolved_model_path' in device model entry #{0} must be a string.".format(
|
|
index
|
|
)
|
|
)
|
|
|
|
device_id = item.get("device_id")
|
|
if device_id is not None and not isinstance(device_id, int):
|
|
raise ExchangeValidationError(
|
|
"Field 'device_id' in device model entry #{0} must be an integer.".format(
|
|
index
|
|
)
|
|
)
|
|
|
|
normalized.append(
|
|
{
|
|
"element_uuid": element_uuid,
|
|
"device_id": device_id,
|
|
"parts_3d": parts_3d.strip() if isinstance(parts_3d, str) else "",
|
|
"resolved_model_path": (
|
|
resolved_model_path.strip()
|
|
if isinstance(resolved_model_path, str)
|
|
else ""
|
|
),
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _normalize_cabinet(payload):
|
|
cabinet = payload.get("cabinet")
|
|
if cabinet is None:
|
|
return None
|
|
if not isinstance(cabinet, dict):
|
|
raise ExchangeValidationError("Field 'cabinet' must be an object when present.")
|
|
|
|
location_id = cabinet.get("location_id")
|
|
if location_id is not None and not isinstance(location_id, int):
|
|
raise ExchangeValidationError("Field 'location_id' in cabinet must be an integer.")
|
|
|
|
normalized = {
|
|
"location_id": location_id,
|
|
"label": "",
|
|
"name": "",
|
|
"display_text": "",
|
|
"associated_fileset": "",
|
|
"three_d_relative_path": "",
|
|
"resolved_scene_path": "",
|
|
}
|
|
for field_name in (
|
|
"label",
|
|
"name",
|
|
"display_text",
|
|
"associated_fileset",
|
|
"three_d_relative_path",
|
|
"resolved_scene_path",
|
|
):
|
|
value = cabinet.get(field_name, "")
|
|
if value and not isinstance(value, str):
|
|
raise ExchangeValidationError(
|
|
"Field '{0}' in cabinet must be a string.".format(field_name)
|
|
)
|
|
normalized[field_name] = value.strip() if isinstance(value, str) else ""
|
|
return normalized
|
|
|
|
|
|
def load_exchange_payload(json_path):
|
|
try:
|
|
raw_text = Path(json_path).read_text(encoding="utf-8")
|
|
except OSError as exc:
|
|
raise ExchangeValidationError(
|
|
"Failed to read exchange file:\n{0}".format(exc)
|
|
) from exc
|
|
|
|
try:
|
|
payload = json.loads(raw_text)
|
|
except json.JSONDecodeError as exc:
|
|
raise ExchangeValidationError(
|
|
"Exchange JSON is invalid:\n{0}".format(exc)
|
|
) from exc
|
|
|
|
if not isinstance(payload, dict):
|
|
raise ExchangeValidationError("Exchange JSON root must be an object.")
|
|
|
|
project_uuid = _require_string(payload, "project_uuid")
|
|
schema_version = payload.get("schema_version", "1.0")
|
|
if not isinstance(schema_version, str) or not schema_version.strip():
|
|
raise ExchangeValidationError("Field 'schema_version' must be a string.")
|
|
|
|
normalized = {
|
|
"schema_version": schema_version.strip(),
|
|
"project_uuid": project_uuid,
|
|
"generated_at": payload.get("generated_at", ""),
|
|
"source": payload.get("source", {}),
|
|
"cabinet": _normalize_cabinet(payload),
|
|
"devices": _normalize_devices(payload),
|
|
"terminals": _normalize_terminals(payload),
|
|
"device_models": _normalize_device_models(payload),
|
|
"wires": _normalize_wires(payload),
|
|
}
|
|
return normalized
|
|
|
|
|
|
def _build_summary(payload, json_path):
|
|
devices = payload["devices"]
|
|
terminals = payload["terminals"]
|
|
device_models = payload["device_models"]
|
|
wires = payload.get("wires", [])
|
|
cabinet = payload.get("cabinet")
|
|
missing_device_instances = sum(1 for item in devices if not item["instance_id"])
|
|
missing_terminal_instances = sum(
|
|
1 for item in terminals if not item["instance_id"]
|
|
)
|
|
with_model_paths = sum(
|
|
1 for item in device_models if item["resolved_model_path"] or item["parts_3d"]
|
|
)
|
|
|
|
return {
|
|
"json_path": json_path,
|
|
"project_uuid": payload["project_uuid"],
|
|
"device_count": len(devices),
|
|
"terminal_count": len(terminals),
|
|
"wire_count": len(wires),
|
|
"device_model_count": len(device_models),
|
|
"device_models_with_parts": with_model_paths,
|
|
"missing_device_instances": missing_device_instances,
|
|
"missing_terminal_instances": missing_terminal_instances,
|
|
"cabinet": cabinet,
|
|
"scene_path": os.environ.get(ENV_SCENE_PATH, "").strip(),
|
|
}
|
|
|
|
|
|
def _initialize_wiring_scene(payload):
|
|
if WiringObjects is None:
|
|
_append_debug_log("wiring scene initialization skipped: WiringObjects module unavailable")
|
|
return None
|
|
|
|
try:
|
|
return WiringObjects.initialize_wiring_scene(
|
|
App.ActiveDocument,
|
|
payload.get("project_uuid", "") if isinstance(payload, dict) else "",
|
|
)
|
|
except Exception as exc:
|
|
_append_debug_log("wiring scene initialization failed: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
return None
|
|
|
|
|
|
def _import_wiring_tasks(payload):
|
|
if WiringImport is None:
|
|
_append_debug_log("wire task import skipped: WiringImport module unavailable")
|
|
return None
|
|
|
|
try:
|
|
return WiringImport.import_wire_tasks_from_payload(
|
|
payload,
|
|
App.ActiveDocument,
|
|
)
|
|
except Exception as exc:
|
|
_append_debug_log("wire task import failed: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
return None
|
|
|
|
|
|
def _summary_message(summary, import_report=None, terminal_report=None, writeback_report=None, wiring_report=None):
|
|
lines = [
|
|
"QET exchange file loaded successfully.",
|
|
"",
|
|
"Project UUID: {0}".format(summary["project_uuid"]),
|
|
"Exchange file: {0}".format(summary["json_path"]),
|
|
"Devices: {0}".format(summary["device_count"]),
|
|
"Terminals: {0}".format(summary["terminal_count"]),
|
|
"Wires: {0}".format(summary["wire_count"]),
|
|
"Device models: {0}".format(summary["device_model_count"]),
|
|
"Resolved model paths: {0}".format(summary["device_models_with_parts"]),
|
|
]
|
|
|
|
cabinet = summary.get("cabinet")
|
|
if isinstance(cabinet, dict):
|
|
cabinet_name = cabinet.get("display_text") or cabinet.get("label") or cabinet.get("name")
|
|
if cabinet_name:
|
|
lines.append("Cabinet: {0}".format(cabinet_name))
|
|
if cabinet.get("associated_fileset"):
|
|
lines.append("Cabinet fileset: {0}".format(cabinet["associated_fileset"]))
|
|
if cabinet.get("three_d_relative_path"):
|
|
lines.append(
|
|
"Cabinet 3D relative path: {0}".format(
|
|
cabinet["three_d_relative_path"]
|
|
)
|
|
)
|
|
if cabinet.get("resolved_scene_path"):
|
|
lines.append(
|
|
"Cabinet resolved scene path: {0}".format(
|
|
cabinet["resolved_scene_path"]
|
|
)
|
|
)
|
|
|
|
if summary["missing_device_instances"]:
|
|
lines.append(
|
|
"Devices without instance_id yet: {0}".format(
|
|
summary["missing_device_instances"]
|
|
)
|
|
)
|
|
if summary["missing_terminal_instances"]:
|
|
lines.append(
|
|
"Terminals without instance_id yet: {0}".format(
|
|
summary["missing_terminal_instances"]
|
|
)
|
|
)
|
|
if summary["scene_path"]:
|
|
lines.append("Scene file: {0}".format(summary["scene_path"]))
|
|
|
|
if import_report:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"3D device import summary:",
|
|
"Target document: {0}".format(import_report["document_name"]),
|
|
"Imported cabinet models: {0}".format(import_report["cabinet_imported"]),
|
|
"Imported devices: {0}".format(import_report["imported_devices"]),
|
|
"Updated devices: {0}".format(import_report["updated_devices"]),
|
|
]
|
|
)
|
|
if import_report["imported_without_instance_id"]:
|
|
lines.append(
|
|
"Imported without instance_id yet: {0}".format(
|
|
import_report["imported_without_instance_id"]
|
|
)
|
|
)
|
|
if import_report["skipped_missing_model"]:
|
|
lines.append(
|
|
"Skipped without resolved model path: {0}".format(
|
|
import_report["skipped_missing_model"]
|
|
)
|
|
)
|
|
if import_report["skipped_missing_file"]:
|
|
lines.append(
|
|
"Skipped missing model file: {0}".format(
|
|
import_report["skipped_missing_file"]
|
|
)
|
|
)
|
|
if import_report["skipped_unsupported_format"]:
|
|
lines.append(
|
|
"Skipped unsupported model format: {0}".format(
|
|
import_report["skipped_unsupported_format"]
|
|
)
|
|
)
|
|
if import_report["skipped_import_error"]:
|
|
lines.append(
|
|
"Skipped after import errors: {0}".format(
|
|
import_report["skipped_import_error"]
|
|
)
|
|
)
|
|
warnings = import_report.get("warnings", [])
|
|
if warnings:
|
|
lines.append("")
|
|
lines.append("Warnings:")
|
|
lines.extend("- {0}".format(item) for item in warnings[:10])
|
|
if len(warnings) > 10:
|
|
lines.append("- ... ({0} more)".format(len(warnings) - 10))
|
|
|
|
if terminal_report:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"3D terminal import summary:",
|
|
"Document: {0}".format(terminal_report["document_name"]),
|
|
"Imported terminals: {0}".format(terminal_report["imported_terminals"]),
|
|
"Updated terminals: {0}".format(terminal_report["updated_terminals"]),
|
|
"Removed stale terminals: {0}".format(terminal_report["removed_terminals"]),
|
|
]
|
|
)
|
|
warnings = terminal_report.get("warnings", [])
|
|
if warnings:
|
|
lines.append("Warnings:")
|
|
lines.extend("- {0}".format(item) for item in warnings[:10])
|
|
if len(warnings) > 10:
|
|
lines.append("- ... ({0} more)".format(len(warnings) - 10))
|
|
|
|
if writeback_report:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"3D write-back:",
|
|
"Output file: {0}".format(writeback_report["output_path"]),
|
|
"Instances written: {0}".format(len(writeback_report["instances"])),
|
|
"Terminals written: {0}".format(len(writeback_report["terminals"])),
|
|
]
|
|
)
|
|
|
|
if wiring_report:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"3D wire tasks:",
|
|
"Imported tasks: {0}".format(wiring_report.get("imported_tasks", 0)),
|
|
"Updated tasks: {0}".format(wiring_report.get("updated_tasks", 0)),
|
|
]
|
|
)
|
|
warnings = wiring_report.get("warnings", [])
|
|
if warnings:
|
|
lines.append("Wire task warnings:")
|
|
lines.extend("- {0}".format(item) for item in warnings[:10])
|
|
if len(warnings) > 10:
|
|
lines.append("- ... ({0} more)".format(len(warnings) - 10))
|
|
|
|
lines.append("")
|
|
lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.")
|
|
lines.append("3D terminal import and write-back are enabled.")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _run_scheduled_device_import(attempt=0):
|
|
_append_debug_log(
|
|
"scheduled device import invoked: attempt={0}".format(attempt)
|
|
)
|
|
|
|
if not _is_gui_ready():
|
|
if attempt < IMPORT_READY_MAX_RETRIES:
|
|
_append_debug_log(
|
|
"scheduled device import postponed: gui not ready, retrying"
|
|
)
|
|
QtCore.QTimer.singleShot(
|
|
IMPORT_READY_RETRY_DELAY_MS,
|
|
lambda: _run_scheduled_device_import(attempt + 1),
|
|
)
|
|
return
|
|
|
|
_append_debug_log("scheduled device import aborted: gui never became ready")
|
|
_show_error(
|
|
"QET Exchange",
|
|
"FreeCAD main window did not finish initializing before device import.",
|
|
)
|
|
return
|
|
|
|
payload = getattr(App, STATE_PAYLOAD, None)
|
|
summary = getattr(App, STATE_SUMMARY, None)
|
|
if not isinstance(payload, dict) or not isinstance(summary, dict):
|
|
_append_debug_log(
|
|
"scheduled device import aborted: cached payload/summary missing"
|
|
)
|
|
return
|
|
|
|
scene_path = os.environ.get(ENV_SCENE_PATH, "").strip()
|
|
_append_debug_log(
|
|
"scheduled device import starting with scene_path={0}".format(scene_path)
|
|
)
|
|
try:
|
|
import_report = DeviceImport.import_devices_from_payload(payload, scene_path)
|
|
except DeviceImport.DeviceImportError as exc:
|
|
_append_debug_log("device import failed: {0}".format(exc))
|
|
_show_error("QET Exchange", str(exc))
|
|
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
|
|
return
|
|
|
|
except Exception as exc:
|
|
_append_debug_log("unexpected device import exception: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
_show_error("QET Exchange", "Failed to import 3D devices:\n{0}".format(exc))
|
|
App.Console.PrintError(
|
|
"[FreeCADExchange] Failed to import devices: {0}\n".format(exc)
|
|
)
|
|
return
|
|
|
|
setattr(App, STATE_IMPORT_REPORT, import_report)
|
|
_append_debug_log(
|
|
"device import summary: imported={0}, updated={1}, skipped_missing_model={2}, skipped_missing_file={3}, skipped_import_error={4}".format(
|
|
import_report["imported_devices"],
|
|
import_report["updated_devices"],
|
|
import_report["skipped_missing_model"],
|
|
import_report["skipped_missing_file"],
|
|
import_report["skipped_import_error"],
|
|
)
|
|
)
|
|
|
|
App.Console.PrintMessage(
|
|
"[FreeCADExchange] Loaded exchange payload from {0}\n".format(
|
|
summary["json_path"]
|
|
)
|
|
)
|
|
App.Console.PrintMessage(
|
|
"[FreeCADExchange] Devices: {0}, Terminals: {1}, Device models: {2}\n".format(
|
|
summary["device_count"],
|
|
summary["terminal_count"],
|
|
summary["device_model_count"],
|
|
)
|
|
)
|
|
App.Console.PrintMessage(
|
|
"[FreeCADExchange] Imported devices: {0}, updated: {1}, skipped without model: {2}\n".format(
|
|
import_report["imported_devices"],
|
|
import_report["updated_devices"],
|
|
import_report["skipped_missing_model"],
|
|
)
|
|
)
|
|
if import_report.get("generated_instance_ids"):
|
|
App.Console.PrintMessage(
|
|
"[FreeCADExchange] Generated device instance IDs: {0}\n".format(
|
|
import_report["generated_instance_ids"]
|
|
)
|
|
)
|
|
|
|
if TerminalImport is None:
|
|
_append_debug_log("terminal import skipped: TerminalImport module unavailable")
|
|
terminal_report = _terminal_report_not_available()
|
|
else:
|
|
try:
|
|
terminal_report = TerminalImport.import_terminals_from_payload(payload, scene_path)
|
|
except TerminalImport.TerminalImportError as exc:
|
|
_append_debug_log("terminal import failed: {0}".format(exc))
|
|
_show_error("QET Exchange", str(exc))
|
|
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
|
|
return
|
|
except Exception as exc:
|
|
_append_debug_log("unexpected terminal import exception: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
_show_error("QET Exchange", "Failed to import 3D terminals:\n{0}".format(exc))
|
|
App.Console.PrintError(
|
|
"[FreeCADExchange] Failed to import terminals: {0}\n".format(exc)
|
|
)
|
|
return
|
|
|
|
setattr(App, STATE_TERMINAL_IMPORT_REPORT, terminal_report)
|
|
|
|
_initialize_wiring_scene(payload)
|
|
wiring_report = _import_wiring_tasks(payload)
|
|
if wiring_report is not None:
|
|
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
|
|
|
|
if ExchangeWriteBack is None:
|
|
_append_debug_log("write-back skipped: ExchangeWriteBack module unavailable")
|
|
writeback_report = None
|
|
else:
|
|
try:
|
|
writeback_report = ExchangeWriteBack.write_back_document(
|
|
App.ActiveDocument, scene_path=scene_path, payload=payload
|
|
)
|
|
except Exception as exc:
|
|
_append_debug_log("write-back failed after import: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
writeback_report = None
|
|
else:
|
|
setattr(App, STATE_WRITEBACK_REPORT, writeback_report)
|
|
|
|
App.Console.PrintMessage(
|
|
"[FreeCADExchange] Imported terminals: {0}, updated: {1}, removed: {2}\n".format(
|
|
terminal_report["imported_terminals"],
|
|
terminal_report["updated_terminals"],
|
|
terminal_report["removed_terminals"],
|
|
)
|
|
)
|
|
|
|
_show_info(
|
|
"QET Exchange",
|
|
_summary_message(summary, import_report, terminal_report, writeback_report, wiring_report),
|
|
)
|
|
_append_debug_log("summary dialog shown")
|
|
|
|
|
|
def bootstrap_if_requested():
|
|
if not getattr(App, STATE_FLAG, False):
|
|
_reset_debug_log()
|
|
_append_debug_log("bootstrap_if_requested entered")
|
|
_install_tree_double_click_filter()
|
|
if getattr(App, STATE_FLAG, False):
|
|
_append_debug_log("bootstrap_if_requested skipped: already bootstrapped")
|
|
return
|
|
|
|
json_path = os.environ.get(ENV_JSON_PATH, "").strip()
|
|
_append_debug_log("ENV QET_2D_TO_3D_JSON={0}".format(json_path))
|
|
_append_debug_log("ENV QET_FREECAD_SCENE_FILE={0}".format(os.environ.get(ENV_SCENE_PATH, "").strip()))
|
|
if not json_path:
|
|
_append_debug_log("bootstrap_if_requested skipped: env missing")
|
|
return
|
|
|
|
setattr(App, STATE_FLAG, True)
|
|
_append_debug_log("STATE_FLAG set")
|
|
|
|
if not os.path.isfile(json_path):
|
|
_append_debug_log("exchange file missing: {0}".format(json_path))
|
|
_show_error(
|
|
"QET Exchange",
|
|
"Environment variable {0} points to a missing file:\n{1}".format(
|
|
ENV_JSON_PATH, json_path
|
|
),
|
|
)
|
|
return
|
|
|
|
try:
|
|
payload = load_exchange_payload(json_path)
|
|
except ExchangeValidationError as exc:
|
|
_append_debug_log("payload validation failed: {0}".format(exc))
|
|
_show_error("QET Exchange", str(exc))
|
|
App.Console.PrintError("[FreeCADExchange] {0}\n".format(exc))
|
|
return
|
|
except Exception as exc:
|
|
_append_debug_log("unexpected payload exception: {0}".format(exc))
|
|
_append_debug_log(traceback.format_exc())
|
|
raise
|
|
|
|
summary = _build_summary(payload, json_path)
|
|
_append_debug_log(
|
|
"payload loaded: devices={0}, terminals={1}, models={2}, cabinet={3}".format(
|
|
summary["device_count"],
|
|
summary["terminal_count"],
|
|
summary["device_model_count"],
|
|
"yes" if summary.get("cabinet") else "no",
|
|
)
|
|
)
|
|
setattr(App, STATE_PAYLOAD, payload)
|
|
setattr(App, STATE_SUMMARY, summary)
|
|
if not getattr(App, STATE_IMPORT_SCHEDULED, False):
|
|
setattr(App, STATE_IMPORT_SCHEDULED, True)
|
|
_append_debug_log(
|
|
"device import scheduled after startup delay: {0} ms".format(
|
|
IMPORT_READY_DELAY_MS
|
|
)
|
|
)
|
|
QtCore.QTimer.singleShot(
|
|
IMPORT_READY_DELAY_MS, lambda: _run_scheduled_device_import(0)
|
|
)
|