You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1129 lines
35 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
from pathlib import Path
import uuid
import FreeCAD as App
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
try:
import ImportGui
except ImportError:
ImportGui = None
import DevicePreview
import TemplateSemantics
import TerminalObjects
ROOT_GROUP_NAME = "QETExchangeDevices"
ROOT_GROUP_LABEL = "QET Exchange Devices"
CABINET_MODEL_GROUP_NAME = "QETCabinetModel"
CABINET_GROUP_PREFIX = "QETCabinet_"
DEVICE_GROUP_PREFIX = "QETDevice_"
TERMINAL_GROUP_PREFIX = "QETTerminals_"
WIRE_GROUP_PREFIX = "QETWires_"
GROUP_KIND_TERMINALS = "Terminals"
GROUP_KIND_WIRES = "Wires"
class DeviceImportError(RuntimeError):
pass
def _debug_log_path():
local_app_data = os.environ.get("LOCALAPPDATA", "").strip()
if local_app_data:
return os.path.join(local_app_data, "QETDeps", "freecad_exchange_bootstrap.log")
return os.path.join(str(Path.home()), "AppData", "Local", "QETDeps", "freecad_exchange_bootstrap.log")
def _append_debug_log(message):
try:
log_path = _debug_log_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as handle:
handle.write(message + "\n")
except Exception:
pass
def _safe_token(value):
text = (value or "").strip()
if not text:
return "unknown"
chars = []
for ch in text:
if ch.isalnum():
chars.append(ch)
else:
chars.append("_")
return "".join(chars)
def _unique_object_name(doc, base_name):
base = _safe_token(base_name) or "QETObject"
if doc.getObject(base) is None:
return base
suffix = 1
while doc.getObject("{0}_{1}".format(base, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(base, suffix)
def _native_path(value):
text = (value or "").strip()
if not text:
return ""
return os.path.normpath(os.path.expandvars(os.path.expanduser(text)))
def _normalized_path_key(value):
text = _native_path(value)
if not text:
return ""
return os.path.normcase(os.path.normpath(text))
def _existing_object_names(doc):
return {obj.Name for obj in doc.Objects}
def _new_objects_since(doc, before_names):
return [obj for obj in doc.Objects if obj.Name not in before_names]
def _top_level_imported_objects(imported_objects):
imported_by_name = {obj.Name: obj for obj in imported_objects}
child_names = set()
for obj in imported_objects:
for parent in list(getattr(obj, "InList", []) or []):
if getattr(parent, "Name", None) in imported_by_name:
child_names.add(obj.Name)
for child in list(getattr(obj, "Group", []) or []):
if getattr(child, "Name", None) in imported_by_name:
child_names.add(child.Name)
return [obj for obj in imported_objects if obj.Name not in child_names]
def _top_level_document_objects(doc):
return _top_level_imported_objects(list(getattr(doc, "Objects", []) or []))
def _ensure_string_property(obj, prop_name, group_name, description, value):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty("App::PropertyString", prop_name, group_name, description)
setattr(obj, prop_name, value or "")
def _ensure_bool_property(obj, prop_name, group_name, description, value):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty("App::PropertyBool", prop_name, group_name, description)
setattr(obj, prop_name, bool(value))
def _ensure_child_group(doc, parent_group, element_uuid, instance_id, name_prefix, label, group_kind, project_uuid=""):
target_uuid = (element_uuid or "").strip()
preferred_name = name_prefix + _safe_token(target_uuid)
group = doc.getObject(preferred_name)
if group is None:
for candidate in getattr(parent_group, "Group", []) or []:
if getattr(candidate, "QetGroupKind", "").strip() != group_kind:
continue
if target_uuid and getattr(candidate, "QetElementUuid", "").strip() != target_uuid:
continue
group = candidate
break
if group is None:
group = doc.addObject("App::DocumentObjectGroup", preferred_name)
if group not in getattr(parent_group, "Group", []):
parent_group.addObject(group)
group.Label = label
project_uuid = (project_uuid or "").strip() or getattr(group, "QetProjectUuid", "").strip()
element_uuid = (element_uuid or "").strip() or getattr(group, "QetElementUuid", "").strip()
instance_id = (instance_id or "").strip() or getattr(group, "QetInstanceId", "").strip()
_ensure_string_property(
group,
"QetGroupKind",
"QET Exchange",
"FreeCADExchange group kind",
group_kind,
)
_ensure_string_property(
group,
"QetProjectUuid",
"QET Exchange",
"Project UUID from QET exchange",
project_uuid,
)
_ensure_string_property(
group,
"QetElementUuid",
"QET Exchange",
"Parent element UUID from QET exchange",
element_uuid,
)
_ensure_string_property(
group,
"QetInstanceId",
"QET Exchange",
"Parent instance id from QET exchange",
instance_id,
)
return group
def _ensure_document(scene_path):
preferred_name = _safe_token(Path(scene_path).stem if scene_path else "QETScene")[:48] or "QETScene"
normalized_scene_path = _native_path(scene_path)
if normalized_scene_path and os.path.isfile(normalized_scene_path):
normalized_target = os.path.normcase(os.path.normpath(normalized_scene_path))
for candidate in App.listDocuments().values():
candidate_path = getattr(candidate, "FileName", "") or ""
if candidate_path and os.path.normcase(os.path.normpath(candidate_path)) == normalized_target:
_activate_document(candidate)
return candidate
try:
doc = App.openDocument(normalized_scene_path)
except Exception as exc:
raise DeviceImportError(
"Cannot open existing FreeCAD scene file: {0}".format(normalized_scene_path)
) from exc
if doc is None:
raise DeviceImportError(
"Cannot open existing FreeCAD scene file: {0}".format(normalized_scene_path)
)
_activate_document(doc)
return doc
existing_doc = DevicePreview.find_main_exchange_document(preferred_name)
if existing_doc is not None:
_activate_document(existing_doc)
return existing_doc
doc = App.newDocument(preferred_name)
_activate_document(doc)
return doc
def _activate_document(doc):
if doc is None:
return
setter = getattr(App, "setActiveDocument", None)
if callable(setter):
try:
setter(doc.Name)
except Exception:
pass
try:
App.ActiveDocument = doc
except Exception:
pass
try:
Gui.ActiveDocument = Gui.getDocument(doc.Name)
except Exception:
pass
def _cabinet_label_text(cabinet):
if not isinstance(cabinet, dict):
return "QET Cabinet"
label = (cabinet.get("display_text") or "").strip()
if label:
return label
label = (cabinet.get("label") or "").strip()
if label:
return label
label = (cabinet.get("name") or "").strip()
if label:
return label
return "QET Cabinet"
def _ensure_root_group(doc, cabinet=None, project_uuid=""):
root = doc.getObject(ROOT_GROUP_NAME)
if root is None:
root = doc.addObject("App::DocumentObjectGroup", ROOT_GROUP_NAME)
if isinstance(cabinet, dict):
root.Label = _cabinet_label_text(cabinet)
else:
root.Label = ROOT_GROUP_LABEL
_ensure_string_property(
root,
"QetCabinetLabel",
"QET Exchange",
"Cabinet label from QET exchange",
cabinet.get("label", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetName",
"QET Exchange",
"Cabinet name from QET exchange",
cabinet.get("name", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetDisplayText",
"QET Exchange",
"Cabinet display text from QET exchange",
cabinet.get("display_text", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetFileSet",
"QET Exchange",
"Associated fileset from QET exchange",
cabinet.get("associated_fileset", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetRelativePath",
"QET Exchange",
"Relative 3D cabinet path from QET exchange",
cabinet.get("three_d_relative_path", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetResolvedScenePath",
"QET Exchange",
"Resolved local cabinet scene path from QET exchange",
cabinet.get("resolved_scene_path", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
root,
"QetCabinetLocationId",
"QET Exchange",
"Cabinet location id from QET exchange",
str(cabinet.get("location_id") or "") if isinstance(cabinet, dict) else "",
)
project_uuid = (project_uuid or "").strip() or getattr(root, "QetProjectUuid", "").strip()
_ensure_string_property(
root,
"QetProjectUuid",
"QET Exchange",
"Project UUID from QET exchange",
project_uuid,
)
return root
def _cabinet_instance_id(cabinet):
if not isinstance(cabinet, dict):
return ""
for field_name in ("cabinet_instance_id", "cabinet_uuid", "location_id"):
value = cabinet.get(field_name)
if value is None:
continue
text = str(value).strip()
if text:
return text
return "default"
def _cabinet_group_label(cabinet):
label = _cabinet_label_text(cabinet)
if label:
return label
return "3D机柜"
def _find_cabinet_group(doc, cabinet_instance_id):
target_id = (cabinet_instance_id or "").strip()
preferred_name = CABINET_GROUP_PREFIX + _safe_token(target_id or "default")
group = doc.getObject(preferred_name)
if group is not None:
return group
legacy_group = doc.getObject(CABINET_MODEL_GROUP_NAME)
if legacy_group is not None:
legacy_instance_id = getattr(legacy_group, "QetCabinetInstanceId", "").strip()
if not target_id or not legacy_instance_id or legacy_instance_id == target_id:
return legacy_group
for candidate in getattr(doc, "Objects", []) or []:
if "QetCabinetInstanceId" not in getattr(candidate, "PropertiesList", []):
continue
if getattr(candidate, "QetCabinetInstanceId", "").strip() == target_id:
return candidate
return None
def _ensure_cabinet_model_group(doc, root_group, cabinet=None, project_uuid=""):
cabinet_instance_id = _cabinet_instance_id(cabinet)
group = _find_cabinet_group(doc, cabinet_instance_id)
if group is None:
group = doc.addObject(
"App::DocumentObjectGroup",
CABINET_GROUP_PREFIX + _safe_token(cabinet_instance_id or "default"),
)
group.Label = _cabinet_group_label(cabinet)
if group not in getattr(root_group, "Group", []):
root_group.addObject(group)
_ensure_string_property(
group,
"QetCabinetInstanceId",
"QET Exchange",
"Cabinet instance id from QET exchange",
cabinet_instance_id,
)
_ensure_string_property(
group,
"QetCabinetResolvedScenePath",
"QET Exchange",
"Resolved local cabinet scene path from QET exchange",
cabinet.get("resolved_scene_path", "") if isinstance(cabinet, dict) else "",
)
_ensure_string_property(
group,
"QetProjectUuid",
"QET Exchange",
"Project UUID from QET exchange",
project_uuid,
)
return group
def _find_device_group(doc, element_uuid):
target_uuid = (element_uuid or "").strip()
if not target_uuid:
return None
preferred_name = DEVICE_GROUP_PREFIX + _safe_token(target_uuid)
obj = doc.getObject(preferred_name)
if obj is not None:
return obj
for candidate in doc.Objects:
if "QetElementUuid" in getattr(candidate, "PropertiesList", []):
if getattr(candidate, "QetElementUuid", "").strip() == target_uuid:
return candidate
return None
def _device_label_text(display_tag, instance_id, element_uuid):
label = (display_tag or "").strip()
if label:
return label
fallback = (instance_id or "").strip() or (element_uuid or "").strip()
if fallback:
return fallback
return "QET Device"
def _device_warning_subject(display_tag, element_uuid):
label = (display_tag or "").strip()
element_uuid = (element_uuid or "").strip()
if label and element_uuid:
return "设备 {0} ({1})".format(label, element_uuid)
if label:
return "设备 {0}".format(label)
if element_uuid:
return "设备 {0}".format(element_uuid)
return "设备"
def _ensure_device_group(doc, root_group, element_uuid, instance_id, model_path, display_tag, layout_index):
created_now = False
device_group = _find_device_group(doc, element_uuid)
if device_group is not None and getattr(device_group, "TypeId", "") != "App::Part":
_remove_object_tree(doc, device_group)
device_group = None
if device_group is None:
device_group = doc.addObject(
"App::Part",
DEVICE_GROUP_PREFIX + _safe_token(element_uuid),
)
created_now = True
if device_group not in getattr(root_group, "Group", []):
root_group.addObject(device_group)
device_group.Label = _device_label_text(display_tag, instance_id, element_uuid)
_ensure_string_property(
device_group,
"QetElementUuid",
"QET Exchange",
"2D element UUID from QET",
element_uuid,
)
_ensure_string_property(
device_group,
"QetInstanceId",
"QET Exchange",
"3D instance id from QET/FreeCAD exchange",
instance_id,
)
_ensure_string_property(
device_group,
"QetResolvedModelPath",
"QET Exchange",
"Resolved local model path from QET exchange",
model_path,
)
_ensure_string_property(
device_group,
"QetDisplayTag",
"QET Exchange",
"2D display tag from QET exchange",
display_tag,
)
_ensure_bool_property(
device_group,
"QetAutoPlaced",
"QET Exchange",
"Whether the device has been placed by the QET auto layout.",
created_now,
)
if created_now:
device_group.Placement = App.Placement()
_ensure_string_property(
device_group,
"QetProjectUuid",
"QET Exchange",
"Project UUID from QET exchange",
getattr(root_group, "QetProjectUuid", "").strip(),
)
_ensure_child_group(
doc,
device_group,
element_uuid,
instance_id,
TERMINAL_GROUP_PREFIX,
"QET Terminals",
GROUP_KIND_TERMINALS,
project_uuid=getattr(root_group, "QetProjectUuid", "").strip(),
)
_ensure_child_group(
doc,
device_group,
element_uuid,
instance_id,
WIRE_GROUP_PREFIX,
"QET Wires",
GROUP_KIND_WIRES,
project_uuid=getattr(root_group, "QetProjectUuid", "").strip(),
)
return device_group, created_now
def _remove_object_tree(doc, obj):
if obj is None:
return
obj_name = _object_name(obj)
if not obj_name or doc.getObject(obj_name) is None:
_detach_from_parent_groups(obj)
return
children = list(getattr(obj, "Group", []) or [])
for child in children:
_remove_object_tree(doc, child)
_detach_from_parent_groups(obj)
if doc.getObject(obj_name) is not None:
doc.removeObject(obj_name)
def _object_name(obj):
try:
return getattr(obj, "Name", "")
except Exception:
return ""
def _object_exists(doc, obj):
obj_name = _object_name(obj)
return bool(obj_name and doc.getObject(obj_name) is not None)
def _detach_from_parent_groups(obj):
try:
parents = list(getattr(obj, "InList", []) or [])
except Exception:
return
for parent in parents:
try:
group_children = list(getattr(parent, "Group", []) or [])
except Exception:
continue
if obj not in group_children:
continue
remover = getattr(parent, "removeObject", None)
if callable(remover):
try:
remover(obj)
continue
except Exception:
pass
try:
while obj in getattr(parent, "Group", []):
parent.Group.remove(obj)
except Exception:
pass
def _linked_document_objects(value):
if value is None:
return []
if hasattr(value, "Name") and hasattr(value, "TypeId"):
return [value]
if isinstance(value, dict):
result = []
for item in value.values():
result.extend(_linked_document_objects(item))
return result
if isinstance(value, (list, tuple, set)):
result = []
for item in value:
result.extend(_linked_document_objects(item))
return result
return []
def _remove_template_terminal_hint_object(doc, obj):
linked_objects = []
for prop_name in ("OriginFeatures",):
linked_objects.extend(_linked_document_objects(getattr(obj, prop_name, None)))
_remove_object_tree(doc, obj)
for linked_obj in linked_objects:
if linked_obj is not obj:
_remove_object_tree(doc, linked_obj)
def _remove_template_terminal_hints(doc, container):
removed = 0
if container is None:
return removed
for child in list(getattr(container, "Group", []) or []):
if TerminalObjects.is_template_terminal_object(child):
_remove_template_terminal_hint_object(doc, child)
removed += 1
continue
if hasattr(child, "Group"):
removed += _remove_template_terminal_hints(doc, child)
return removed
def _existing_group_objects(doc, group):
result = []
for child in list(getattr(group, "Group", []) or []):
if _object_exists(doc, child):
result.append(child)
return result
def _is_exchange_sidecar_group(obj):
child_name = _object_name(obj)
if child_name.startswith(TERMINAL_GROUP_PREFIX) or child_name.startswith(WIRE_GROUP_PREFIX):
return True
return getattr(obj, "QetGroupKind", "").strip() in {GROUP_KIND_TERMINALS, GROUP_KIND_WIRES}
def _existing_model_objects(doc, group):
return [
obj
for obj in _existing_group_objects(doc, group)
if not _is_exchange_sidecar_group(obj)
]
def _remove_model_objects(doc, objects):
for obj in list(objects or []):
_remove_object_tree(doc, obj)
def _keep_only_direct_model_children(device_group, direct_model_objects):
allowed_ids = {id(obj) for obj in direct_model_objects if obj is not None}
kept_children = []
for child in list(getattr(device_group, "Group", []) or []):
if id(child) in allowed_ids:
kept_children.append(child)
continue
if _is_exchange_sidecar_group(child):
kept_children.append(child)
try:
device_group.Group = kept_children
except Exception:
try:
device_group.Group[:] = kept_children
except Exception:
pass
def _document_object_links(obj):
linked_objects = []
for prop_name in ("Links", "OutList"):
linked_objects.extend(_linked_document_objects(getattr(obj, prop_name, None)))
return linked_objects
def _shape_copy(shape):
copier = getattr(shape, "copy", None)
if callable(copier):
try:
return copier()
except Exception:
pass
return shape
def _can_materialize_shape(obj):
if getattr(obj, "TypeId", "") == "Part::Feature":
return False
try:
shape = getattr(obj, "Shape", None)
except Exception:
return False
return shape is not None
def _materialize_shape_object(doc, source_obj):
source_name = _object_name(source_obj) or "Model"
target = doc.addObject(
"Part::Feature",
_unique_object_name(doc, source_name + "_Shape"),
)
target.Label = getattr(source_obj, "Label", source_name)
target.Shape = _shape_copy(getattr(source_obj, "Shape", None))
try:
target.Placement = getattr(source_obj, "Placement")
except Exception:
pass
try:
target.ViewObject.Visibility = getattr(source_obj.ViewObject, "Visibility", True)
except Exception:
pass
try:
target.ViewObject.ShapeColor = getattr(source_obj.ViewObject, "ShapeColor")
except Exception:
pass
return target
def _materialize_direct_model_objects(doc, device_group, model_objects):
direct_objects = []
obsolete_objects = []
materialized_labels = []
for obj in model_objects:
if not _object_exists(doc, obj):
continue
if not _can_materialize_shape(obj):
direct_objects.append(obj)
continue
static_obj = _materialize_shape_object(doc, obj)
if static_obj not in getattr(device_group, "Group", []):
device_group.addObject(static_obj)
direct_objects.append(static_obj)
materialized_labels.append((static_obj, getattr(obj, "Label", _object_name(obj))))
obsolete_objects.append(obj)
obsolete_objects.extend(_document_object_links(obj))
direct_names = {_object_name(obj) for obj in direct_objects}
removed_names = set()
for obsolete in obsolete_objects:
obsolete_name = _object_name(obsolete)
if not obsolete_name or obsolete_name in direct_names or obsolete_name in removed_names:
continue
_remove_object_tree(doc, obsolete)
removed_names.add(obsolete_name)
for obj, label in materialized_labels:
try:
obj.Label = label
except Exception:
pass
return direct_objects
def _generate_instance_id(project_uuid, element_uuid):
seed = "QET:{0}:{1}".format((project_uuid or "").strip(), (element_uuid or "").strip())
return str(uuid.uuid5(uuid.NAMESPACE_URL, seed))
def _supported_for_import(model_path):
suffix = Path(model_path).suffix.lower()
return suffix in {
".step",
".stp",
".iges",
".igs",
".brep",
".brp",
".fcstd",
}
def _import_model_into_group(doc, device_group, model_path, merge=False, use_link_group=True):
if Path(model_path).suffix.lower() == ".fcstd":
return _import_fcstd_into_group(doc, device_group, model_path)
before_names = _existing_object_names(doc)
try:
try:
ImportGui.insert(
name=model_path,
docName=doc.Name,
merge=bool(merge),
useLinkGroup=bool(use_link_group),
)
except Exception:
for obj in _new_objects_since(doc, before_names):
_remove_object_tree(doc, obj)
raise
imported_objects = _new_objects_since(doc, before_names)
top_level_objects = _top_level_imported_objects(imported_objects)
for obj in top_level_objects:
if obj not in getattr(device_group, "Group", []):
device_group.addObject(obj)
TemplateSemantics.clear_stored_template_slot_hints(device_group)
TerminalObjects.hide_template_terminal_hints(device_group)
return top_level_objects
finally:
_activate_document(doc)
def _open_fcstd_source_document(model_path):
normalized_target = os.path.normcase(os.path.normpath(model_path))
for candidate in App.listDocuments().values():
candidate_path = getattr(candidate, "FileName", "") or ""
if candidate_path and os.path.normcase(os.path.normpath(candidate_path)) == normalized_target:
return candidate, False
source_doc = App.openDocument(model_path, hidden=True, temporary=True)
return source_doc, True
def _import_fcstd_into_group(doc, device_group, model_path):
source_doc = None
should_close = False
try:
source_doc, should_close = _open_fcstd_source_document(model_path)
if source_doc is None:
raise DeviceImportError("Cannot open FCStd file")
TemplateSemantics.clear_stored_template_slot_hints(device_group)
top_level_objects = _top_level_document_objects(source_doc)
copied_objects = []
for source_obj in top_level_objects:
copied_obj = doc.copyObject(source_obj, True)
if copied_obj not in getattr(device_group, "Group", []):
device_group.addObject(copied_obj)
copied_objects.append(copied_obj)
template_slots = TemplateSemantics.collect_live_terminal_hints(device_group)
TemplateSemantics.store_template_slot_hints(device_group, template_slots)
_remove_template_terminal_hints(doc, device_group)
copied_model_objects = [
obj
for obj in copied_objects
if _object_exists(doc, obj)
]
direct_model_objects = _materialize_direct_model_objects(
doc,
device_group,
copied_model_objects,
)
_keep_only_direct_model_children(device_group, direct_model_objects)
return direct_model_objects
finally:
if should_close and source_doc is not None:
try:
App.closeDocument(source_doc.Name)
except Exception:
pass
_activate_document(doc)
def _model_index(payload):
index = {}
for item in payload.get("device_models", []):
element_uuid = item.get("element_uuid", "").strip()
if element_uuid and element_uuid not in index:
index[element_uuid] = item
return index
def _import_cabinet_model(doc, root_group, cabinet, report):
if not isinstance(cabinet, dict):
return
resolved_scene_path = _native_path(cabinet.get("resolved_scene_path", ""))
_append_debug_log(
"DeviceImport cabinet resolved_scene_path={0}".format(resolved_scene_path)
)
if not resolved_scene_path:
report["cabinet_skipped_missing_model"] += 1
return
if not os.path.isfile(resolved_scene_path):
report["cabinet_skipped_missing_file"] += 1
report["warnings"].append(
"机柜 3D 文件不存在:{0}".format(resolved_scene_path)
)
return
if not _supported_for_import(resolved_scene_path):
report["cabinet_skipped_unsupported_format"] += 1
report["warnings"].append(
"机柜 3D 文件格式暂不支持:{0}".format(resolved_scene_path)
)
return
project_uuid = getattr(root_group, "QetProjectUuid", "").strip()
existing_group = _find_cabinet_group(doc, _cabinet_instance_id(cabinet))
previous_path = ""
if existing_group is not None:
previous_path = getattr(existing_group, "QetCabinetResolvedScenePath", "").strip()
cabinet_group = _ensure_cabinet_model_group(doc, root_group, cabinet, project_uuid)
existing_model_objects = _existing_model_objects(doc, cabinet_group)
same_source = _normalized_path_key(previous_path) == _normalized_path_key(resolved_scene_path)
if existing_model_objects and same_source:
report.setdefault("cabinet_reused", 0)
report["cabinet_reused"] += 1
_append_debug_log(
"DeviceImport cabinet import skipped: reused existing cabinet group for instance_id={0}".format(
getattr(cabinet_group, "QetCabinetInstanceId", "").strip()
)
)
return
had_existing_model = bool(existing_model_objects)
_ensure_string_property(
cabinet_group,
"QetCabinetResolvedScenePath",
"QET Exchange",
"Resolved local cabinet scene path from QET exchange",
resolved_scene_path,
)
try:
_append_debug_log(
"DeviceImport importing cabinet model: {0}".format(
resolved_scene_path
)
)
_import_model_into_group(
doc,
cabinet_group,
resolved_scene_path,
merge=False,
use_link_group=True,
)
_remove_model_objects(doc, existing_model_objects)
report["cabinet_imported"] += 1
if had_existing_model:
report.setdefault("cabinet_reimported", 0)
report["cabinet_reimported"] += 1
else:
report.setdefault("cabinet_added", 0)
report["cabinet_added"] += 1
_append_debug_log("DeviceImport cabinet import succeeded")
except Exception as exc:
if had_existing_model:
_ensure_string_property(
cabinet_group,
"QetCabinetResolvedScenePath",
"QET Exchange",
"Resolved local cabinet scene path from QET exchange",
previous_path,
)
report["cabinet_skipped_import_error"] += 1
report["warnings"].append(
"机柜 3D 导入失败:{0}".format(exc)
)
_append_debug_log(
"DeviceImport cabinet import failed: {0}".format(exc)
)
def import_devices_from_payload(payload, scene_path=""):
_append_debug_log("DeviceImport.import_devices_from_payload entered")
doc = _ensure_document(scene_path)
cabinet = payload.get("cabinet")
project_uuid = (payload.get("project_uuid") or "").strip()
root_group = _ensure_root_group(doc, cabinet, project_uuid)
models_by_element = _model_index(payload)
report = {
"document_name": doc.Name,
"scene_path": scene_path or "",
"total_devices": 0,
"imported_devices": 0,
"updated_devices": 0,
"imported_without_instance_id": 0,
"skipped_missing_model": 0,
"skipped_missing_file": 0,
"skipped_unsupported_format": 0,
"skipped_import_error": 0,
"cabinet_imported": 0,
"cabinet_added": 0,
"cabinet_reimported": 0,
"cabinet_reused": 0,
"cabinet_skipped_missing_model": 0,
"cabinet_skipped_missing_file": 0,
"cabinet_skipped_unsupported_format": 0,
"cabinet_skipped_import_error": 0,
"warnings": [],
}
_import_cabinet_model(doc, root_group, cabinet, report)
for index, device in enumerate(payload.get("devices", [])):
report["total_devices"] += 1
element_uuid = device.get("element_uuid", "").strip()
instance_id = (device.get("instance_id") or "").strip()
display_tag = (device.get("display_tag") or "").strip()
model_info = models_by_element.get(element_uuid, {})
resolved_model_path = _native_path(model_info.get("resolved_model_path", ""))
_append_debug_log(
"DeviceImport device element_uuid={0}, instance_id={1}, display_tag={2}, resolved_model_path={3}".format(
element_uuid, instance_id, display_tag, resolved_model_path
)
)
if not resolved_model_path:
report["skipped_missing_model"] += 1
report["warnings"].append(
"{0} 缺少 resolved_model_path已跳过。".format(
_device_warning_subject(display_tag, element_uuid)
)
)
continue
if not os.path.isfile(resolved_model_path):
report["skipped_missing_file"] += 1
report["warnings"].append(
"{0} 的模型文件不存在:{1}".format(
_device_warning_subject(display_tag, element_uuid),
resolved_model_path,
)
)
continue
if not _supported_for_import(resolved_model_path):
report["skipped_unsupported_format"] += 1
report["warnings"].append(
"{0} 的模型格式暂不支持:{1}".format(
_device_warning_subject(display_tag, element_uuid),
resolved_model_path,
)
)
continue
existing_group = _find_device_group(doc, element_uuid)
if not instance_id:
existing_instance_id = ""
if existing_group is not None:
existing_instance_id = getattr(existing_group, "QetInstanceId", "").strip()
instance_id = existing_instance_id or _generate_instance_id(project_uuid, element_uuid)
report.setdefault("generated_instance_ids", 0)
report["generated_instance_ids"] += 1
previous_model_path = ""
if existing_group is not None:
previous_model_path = getattr(existing_group, "QetResolvedModelPath", "").strip()
device_group, created_now = _ensure_device_group(
doc,
root_group,
element_uuid,
instance_id,
resolved_model_path,
display_tag,
index,
)
existing_model_objects = _existing_model_objects(doc, device_group)
try:
_append_debug_log(
"DeviceImport importing model for element_uuid={0}: {1}".format(
element_uuid, resolved_model_path
)
)
_import_model_into_group(doc, device_group, resolved_model_path)
_append_debug_log(
"DeviceImport import succeeded for element_uuid={0}".format(element_uuid)
)
_remove_model_objects(doc, existing_model_objects)
except Exception as exc:
if existing_model_objects:
_ensure_string_property(
device_group,
"QetResolvedModelPath",
"QET Exchange",
"Resolved local model path from QET exchange",
previous_model_path,
)
report["skipped_import_error"] += 1
report["warnings"].append(
"{0} 导入失败:{1}".format(
_device_warning_subject(display_tag, element_uuid),
exc,
)
)
_append_debug_log(
"DeviceImport import failed for element_uuid={0}: {1}".format(
element_uuid, exc
)
)
continue
if created_now or existing_group is None:
report["imported_devices"] += 1
else:
report["updated_devices"] += 1
if not instance_id:
report["imported_without_instance_id"] += 1
doc.recompute()
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
_append_debug_log(
"DeviceImport finished: cabinet_imported={0}, imported={1}, updated={2}, skipped_missing_model={3}, skipped_missing_file={4}, skipped_import_error={5}".format(
report["cabinet_imported"],
report["imported_devices"],
report["updated_devices"],
report["skipped_missing_model"],
report["skipped_missing_file"],
report["skipped_import_error"],
)
)
return report