diff --git a/api/core/llm_generator/output_parser/structured_output.py b/api/core/llm_generator/output_parser/structured_output.py new file mode 100644 index 0000000000..21e056b9b8 --- /dev/null +++ b/api/core/llm_generator/output_parser/structured_output.py @@ -0,0 +1,326 @@ +import json +from collections.abc import Generator, Mapping, Sequence +from copy import deepcopy +from typing import Any, Literal, Optional, overload + +import json_repair +from pydantic import TypeAdapter, ValidationError + +from core.llm_generator.output_parser.errors import OutputParserError +from core.model_manager import ModelInstance +from core.model_runtime.callbacks.base_callback import Callback +from core.model_runtime.entities.llm_entities import ( + LLMResult, + LLMResultChunk, + LLMResultWithStructuredOutput, + LLMStructuredOutput, +) +from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool, SystemPromptMessage +from core.model_runtime.entities.model_entities import AIModelEntity, ParameterRule +from core.workflow.utils.structured_output.entities import ResponseFormat, SpecialModelType +from core.workflow.utils.structured_output.prompt import STRUCTURED_OUTPUT_PROMPT + + +@overload +def invoke_llm_with_structured_output( + provider: str, + model_schema: AIModelEntity, + model_instance: ModelInstance, + prompt_messages: Sequence[PromptMessage], + json_schema: Mapping[str, Any], + model_parameters: Optional[Mapping] = None, + tools: Sequence[PromptMessageTool] | None = None, + stop: Optional[list[str]] = None, + stream: Literal[True] = True, + user: Optional[str] = None, + callbacks: Optional[list[Callback]] = None, +) -> Generator[LLMResultChunk | LLMStructuredOutput, None, None]: ... + + +@overload +def invoke_llm_with_structured_output( + provider: str, + model_schema: AIModelEntity, + model_instance: ModelInstance, + prompt_messages: Sequence[PromptMessage], + json_schema: Mapping[str, Any], + model_parameters: Optional[Mapping] = None, + tools: Sequence[PromptMessageTool] | None = None, + stop: Optional[list[str]] = None, + stream: Literal[False] = False, + user: Optional[str] = None, + callbacks: Optional[list[Callback]] = None, +) -> LLMResultWithStructuredOutput: ... + + +@overload +def invoke_llm_with_structured_output( + provider: str, + model_schema: AIModelEntity, + model_instance: ModelInstance, + prompt_messages: Sequence[PromptMessage], + json_schema: Mapping[str, Any], + model_parameters: Optional[Mapping] = None, + tools: Sequence[PromptMessageTool] | None = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + callbacks: Optional[list[Callback]] = None, +) -> LLMResultWithStructuredOutput | Generator[LLMResultChunk | LLMStructuredOutput, None, None]: ... + + +def invoke_llm_with_structured_output( + provider: str, + model_schema: AIModelEntity, + model_instance: ModelInstance, + prompt_messages: Sequence[PromptMessage], + json_schema: Mapping[str, Any], + model_parameters: Optional[Mapping] = None, + tools: Sequence[PromptMessageTool] | None = None, + stop: Optional[list[str]] = None, + stream: bool = True, + user: Optional[str] = None, + callbacks: Optional[list[Callback]] = None, +) -> LLMResultWithStructuredOutput | Generator[LLMResultChunk | LLMStructuredOutput, None, None]: + """ + Invoke large language model with structured output + 1. This method invokes model_instance.invoke_llm with json_schema + 2. Try to parse the result as structured output + + :param prompt_messages: prompt messages + :param json_schema: json schema + :param model_parameters: model parameters + :param tools: tools for tool calling + :param stop: stop words + :param stream: is stream response + :param user: unique user id + :param callbacks: callbacks + :return: full response or stream response chunk generator result + """ + + # handle native json schema + model_parameters_with_json_schema: dict[str, Any] = { + **(model_parameters or {}), + } + + if model_schema.support_structure_output: + model_parameters = _handle_native_json_schema( + provider, model_schema, json_schema, model_parameters_with_json_schema, model_schema.parameter_rules + ) + else: + # Set appropriate response format based on model capabilities + _set_response_format(model_parameters_with_json_schema, model_schema.parameter_rules) + + # handle prompt based schema + prompt_messages = _handle_prompt_based_schema( + prompt_messages=prompt_messages, + structured_output_schema=json_schema, + ) + + llm_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), + model_parameters=model_parameters_with_json_schema, + tools=tools, + stop=stop, + stream=stream, + user=user, + callbacks=callbacks, + ) + + if isinstance(llm_result, LLMResult): + if not isinstance(llm_result.message.content, str): + raise OutputParserError( + f"Failed to parse structured output, LLM result is not a string: {llm_result.message.content}" + ) + + return LLMResultWithStructuredOutput( + structured_output=_parse_structured_output(llm_result.message.content), + model=llm_result.model, + message=llm_result.message, + usage=llm_result.usage, + system_fingerprint=llm_result.system_fingerprint, + prompt_messages=llm_result.prompt_messages, + ) + else: + result_text = "" + for event in llm_result: + if isinstance(event, LLMResultChunk): + if isinstance(event.delta.message.content, str): + result_text += event.delta.message.content + + yield LLMStructuredOutput(structured_output=_parse_structured_output(result_text)) + + +def _handle_native_json_schema( + provider: str, + model_schema: AIModelEntity, + structured_output_schema: Mapping, + model_parameters: dict, + rules: list[ParameterRule], +) -> dict: + """ + Handle structured output for models with native JSON schema support. + + :param model_parameters: Model parameters to update + :param rules: Model parameter rules + :return: Updated model parameters with JSON schema configuration + """ + # Process schema according to model requirements + schema_json = _prepare_schema_for_model(provider, model_schema, structured_output_schema) + + # Set JSON schema in parameters + model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False) + + # Set appropriate response format if required by the model + for rule in rules: + if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options: + model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value + + return model_parameters + + +def _set_response_format(model_parameters: dict, rules: list) -> None: + """ + Set the appropriate response format parameter based on model rules. + + :param model_parameters: Model parameters to update + :param rules: Model parameter rules + """ + for rule in rules: + if rule.name == "response_format": + if ResponseFormat.JSON.value in rule.options: + model_parameters["response_format"] = ResponseFormat.JSON.value + elif ResponseFormat.JSON_OBJECT.value in rule.options: + model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value + + +def _handle_prompt_based_schema( + prompt_messages: Sequence[PromptMessage], structured_output_schema: Mapping +) -> list[PromptMessage]: + """ + Handle structured output for models without native JSON schema support. + This function modifies the prompt messages to include schema-based output requirements. + + Args: + prompt_messages: Original sequence of prompt messages + + Returns: + list[PromptMessage]: Updated prompt messages with structured output requirements + """ + # Convert schema to string format + schema_str = json.dumps(structured_output_schema, ensure_ascii=False) + + # Find existing system prompt with schema placeholder + system_prompt = next( + (prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)), + None, + ) + structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str) + # Prepare system prompt content + system_prompt_content = ( + structured_output_prompt + "\n\n" + system_prompt.content + if system_prompt and isinstance(system_prompt.content, str) + else structured_output_prompt + ) + system_prompt = SystemPromptMessage(content=system_prompt_content) + + # Extract content from the last user message + + filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)] + updated_prompt = [system_prompt] + filtered_prompts + + return updated_prompt + + +def _parse_structured_output(result_text: str) -> Mapping[str, Any]: + structured_output: dict[str, Any] = {} + try: + parsed = TypeAdapter(Mapping).validate_json(result_text) + if not isinstance(parsed, dict): + raise OutputParserError(f"Failed to parse structured output: {result_text}") + structured_output = parsed + except ValidationError as e: + # if the result_text is not a valid json, try to repair it + parsed = json_repair.loads(result_text) + if not isinstance(parsed, dict): + # handle reasoning model like deepseek-r1 got '\n\n\n' prefix + if isinstance(parsed, list): + parsed = next((item for item in parsed if isinstance(item, dict)), {}) + else: + raise OutputParserError(f"Failed to parse structured output: {result_text}") + structured_output = parsed + return structured_output + + +def _prepare_schema_for_model(provider: str, model_schema: AIModelEntity, schema: Mapping) -> dict: + """ + Prepare JSON schema based on model requirements. + + Different models have different requirements for JSON schema formatting. + This function handles these differences. + + :param schema: The original JSON schema + :return: Processed schema compatible with the current model + """ + + # Deep copy to avoid modifying the original schema + processed_schema = dict(deepcopy(schema)) + + # Convert boolean types to string types (common requirement) + convert_boolean_to_string(processed_schema) + + # Apply model-specific transformations + if SpecialModelType.GEMINI in model_schema.model: + remove_additional_properties(processed_schema) + return processed_schema + elif SpecialModelType.OLLAMA in provider: + return processed_schema + else: + # Default format with name field + return {"schema": processed_schema, "name": "llm_response"} + + +def remove_additional_properties(schema: dict) -> None: + """ + Remove additionalProperties fields from JSON schema. + Used for models like Gemini that don't support this property. + + :param schema: JSON schema to modify in-place + """ + if not isinstance(schema, dict): + return + + # Remove additionalProperties at current level + schema.pop("additionalProperties", None) + + # Process nested structures recursively + for value in schema.values(): + if isinstance(value, dict): + remove_additional_properties(value) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + remove_additional_properties(item) + + +def convert_boolean_to_string(schema: dict) -> None: + """ + Convert boolean type specifications to string in JSON schema. + + :param schema: JSON schema to modify in-place + """ + if not isinstance(schema, dict): + return + + # Check for boolean type at current level + if schema.get("type") == "boolean": + schema["type"] = "string" + + # Process nested dictionaries and lists recursively + for value in schema.values(): + if isinstance(value, dict): + convert_boolean_to_string(value) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + convert_boolean_to_string(item) diff --git a/api/core/model_runtime/entities/llm_entities.py b/api/core/model_runtime/entities/llm_entities.py index de5a748d4f..94c256fb7d 100644 --- a/api/core/model_runtime/entities/llm_entities.py +++ b/api/core/model_runtime/entities/llm_entities.py @@ -1,7 +1,7 @@ -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from decimal import Decimal from enum import StrEnum -from typing import Optional +from typing import Any, Optional from pydantic import BaseModel, Field @@ -101,6 +101,20 @@ class LLMResult(BaseModel): system_fingerprint: Optional[str] = None +class LLMStructuredOutput(BaseModel): + """ + Model class for llm structured output. + """ + + structured_output: Mapping[str, Any] + + +class LLMResultWithStructuredOutput(LLMResult, LLMStructuredOutput): + """ + Model class for llm result with structured output. + """ + + class LLMResultChunkDelta(BaseModel): """ Model class for llm result chunk delta. diff --git a/api/core/plugin/entities/parameters.py b/api/core/plugin/entities/parameters.py index 895dd0d0fc..0f309bc666 100644 --- a/api/core/plugin/entities/parameters.py +++ b/api/core/plugin/entities/parameters.py @@ -10,6 +10,9 @@ from core.tools.entities.common_entities import I18nObject class PluginParameterOption(BaseModel): value: str = Field(..., description="The value of the option") label: I18nObject = Field(..., description="The label of the option") + icon: Optional[str] = Field( + default=None, description="The icon of the option, can be a url or a base64 encoded image" + ) @field_validator("value", mode="before") @classmethod diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 124ae6d75d..7ef681addc 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -5,11 +5,10 @@ import logging from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, Optional, cast -import json_repair - from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity from core.file import FileType, file_manager from core.helper.code_executor import CodeExecutor, CodeLanguage +from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output from core.memory.token_buffer_memory import TokenBufferMemory from core.model_manager import ModelInstance, ModelManager from core.model_runtime.entities import ( @@ -18,7 +17,7 @@ from core.model_runtime.entities import ( PromptMessageContentType, TextPromptMessageContent, ) -from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMUsage +from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMStructuredOutput, LLMUsage from core.model_runtime.entities.message_entities import ( AssistantPromptMessage, PromptMessageContentUnionTypes, @@ -31,7 +30,6 @@ from core.model_runtime.entities.model_entities import ( ModelFeature, ModelPropertyKey, ModelType, - ParameterRule, ) from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel from core.model_runtime.utils.encoders import jsonable_encoder @@ -62,11 +60,6 @@ from core.workflow.nodes.event import ( RunRetrieverResourceEvent, RunStreamChunkEvent, ) -from core.workflow.utils.structured_output.entities import ( - ResponseFormat, - SpecialModelType, -) -from core.workflow.utils.structured_output.prompt import STRUCTURED_OUTPUT_PROMPT from core.workflow.utils.variable_template_parser import VariableTemplateParser from . import llm_utils @@ -143,12 +136,6 @@ class LLMNode(BaseNode[LLMNodeData]): return "1" def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: - def process_structured_output(text: str) -> Optional[dict[str, Any]]: - """Process structured output if enabled""" - if not self.node_data.structured_output_enabled or not self.node_data.structured_output: - return None - return self._parse_structured_output(text) - node_inputs: Optional[dict[str, Any]] = None process_data = None result_text = "" @@ -244,6 +231,8 @@ class LLMNode(BaseNode[LLMNodeData]): stop=stop, ) + structured_output: LLMStructuredOutput | None = None + for event in generator: if isinstance(event, RunStreamChunkEvent): yield event @@ -254,10 +243,12 @@ class LLMNode(BaseNode[LLMNodeData]): # deduct quota llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) break + elif isinstance(event, LLMStructuredOutput): + structured_output = event + outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason} - structured_output = process_structured_output(result_text) if structured_output: - outputs["structured_output"] = structured_output + outputs["structured_output"] = structured_output.structured_output if self._file_outputs is not None: outputs["files"] = ArrayFileSegment(value=self._file_outputs) @@ -302,20 +293,40 @@ class LLMNode(BaseNode[LLMNodeData]): model_instance: ModelInstance, prompt_messages: Sequence[PromptMessage], stop: Optional[Sequence[str]] = None, - ) -> Generator[NodeEvent, None, None]: - invoke_result = model_instance.invoke_llm( - prompt_messages=list(prompt_messages), - model_parameters=node_data_model.completion_params, - stop=list(stop or []), - stream=True, - user=self.user_id, + ) -> Generator[NodeEvent | LLMStructuredOutput, None, None]: + model_schema = model_instance.model_type_instance.get_model_schema( + node_data_model.name, model_instance.credentials ) + if not model_schema: + raise ValueError(f"Model schema not found for {node_data_model.name}") + + if self.node_data.structured_output_enabled: + output_schema = self._fetch_structured_output_schema() + invoke_result = invoke_llm_with_structured_output( + provider=model_instance.provider, + model_schema=model_schema, + model_instance=model_instance, + prompt_messages=prompt_messages, + json_schema=output_schema, + model_parameters=node_data_model.completion_params, + stop=list(stop or []), + stream=True, + user=self.user_id, + ) + else: + invoke_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), + model_parameters=node_data_model.completion_params, + stop=list(stop or []), + stream=True, + user=self.user_id, + ) return self._handle_invoke_result(invoke_result=invoke_result) def _handle_invoke_result( - self, invoke_result: LLMResult | Generator[LLMResultChunk, None, None] - ) -> Generator[NodeEvent, None, None]: + self, invoke_result: LLMResult | Generator[LLMResultChunk | LLMStructuredOutput, None, None] + ) -> Generator[NodeEvent | LLMStructuredOutput, None, None]: # For blocking mode if isinstance(invoke_result, LLMResult): event = self._handle_blocking_result(invoke_result=invoke_result) @@ -330,22 +341,25 @@ class LLMNode(BaseNode[LLMNodeData]): finish_reason = None full_text_buffer = io.StringIO() for result in invoke_result: - contents = result.delta.message.content - for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents): - full_text_buffer.write(text_part) - yield RunStreamChunkEvent(chunk_content=text_part, from_variable_selector=[self.node_id, "text"]) - - # Update the whole metadata - if not model and result.model: - model = result.model - if len(prompt_messages) == 0: - # TODO(QuantumGhost): it seems that this update has no visable effect. - # What's the purpose of the line below? - prompt_messages = list(result.prompt_messages) - if usage.prompt_tokens == 0 and result.delta.usage: - usage = result.delta.usage - if finish_reason is None and result.delta.finish_reason: - finish_reason = result.delta.finish_reason + if isinstance(result, LLMResultChunk): + contents = result.delta.message.content + for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents): + full_text_buffer.write(text_part) + yield RunStreamChunkEvent(chunk_content=text_part, from_variable_selector=[self.node_id, "text"]) + + # Update the whole metadata + if not model and result.model: + model = result.model + if len(prompt_messages) == 0: + # TODO(QuantumGhost): it seems that this update has no visable effect. + # What's the purpose of the line below? + prompt_messages = list(result.prompt_messages) + if usage.prompt_tokens == 0 and result.delta.usage: + usage = result.delta.usage + if finish_reason is None and result.delta.finish_reason: + finish_reason = result.delta.finish_reason + elif isinstance(result, LLMStructuredOutput): + yield result yield ModelInvokeCompletedEvent(text=full_text_buffer.getvalue(), usage=usage, finish_reason=finish_reason) @@ -522,12 +536,6 @@ class LLMNode(BaseNode[LLMNodeData]): if not model_schema: raise ModelNotExistError(f"Model {node_data_model.name} not exist.") - if self.node_data.structured_output_enabled: - if model_schema.support_structure_output: - completion_params = self._handle_native_json_schema(completion_params, model_schema.parameter_rules) - else: - # Set appropriate response format based on model capabilities - self._set_response_format(completion_params, model_schema.parameter_rules) model_config_with_cred.parameters = completion_params # NOTE(-LAN-): This line modify the `self.node_data.model`, which is used in `_invoke_llm()`. node_data_model.completion_params = completion_params @@ -719,32 +727,8 @@ class LLMNode(BaseNode[LLMNodeData]): ) if not model_schema: raise ModelNotExistError(f"Model {model_config.model} not exist.") - if self.node_data.structured_output_enabled: - if not model_schema.support_structure_output: - filtered_prompt_messages = self._handle_prompt_based_schema( - prompt_messages=filtered_prompt_messages, - ) return filtered_prompt_messages, model_config.stop - def _parse_structured_output(self, result_text: str) -> dict[str, Any]: - structured_output: dict[str, Any] = {} - try: - parsed = json.loads(result_text) - if not isinstance(parsed, dict): - raise LLMNodeError(f"Failed to parse structured output: {result_text}") - structured_output = parsed - except json.JSONDecodeError as e: - # if the result_text is not a valid json, try to repair it - parsed = json_repair.loads(result_text) - if not isinstance(parsed, dict): - # handle reasoning model like deepseek-r1 got '\n\n\n' prefix - if isinstance(parsed, list): - parsed = next((item for item in parsed if isinstance(item, dict)), {}) - else: - raise LLMNodeError(f"Failed to parse structured output: {result_text}") - structured_output = parsed - return structured_output - @classmethod def _extract_variable_selector_to_variable_mapping( cls, @@ -934,104 +918,6 @@ class LLMNode(BaseNode[LLMNodeData]): self._file_outputs.append(saved_file) return saved_file - def _handle_native_json_schema(self, model_parameters: dict, rules: list[ParameterRule]) -> dict: - """ - Handle structured output for models with native JSON schema support. - - :param model_parameters: Model parameters to update - :param rules: Model parameter rules - :return: Updated model parameters with JSON schema configuration - """ - # Process schema according to model requirements - schema = self._fetch_structured_output_schema() - schema_json = self._prepare_schema_for_model(schema) - - # Set JSON schema in parameters - model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False) - - # Set appropriate response format if required by the model - for rule in rules: - if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options: - model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value - - return model_parameters - - def _handle_prompt_based_schema(self, prompt_messages: Sequence[PromptMessage]) -> list[PromptMessage]: - """ - Handle structured output for models without native JSON schema support. - This function modifies the prompt messages to include schema-based output requirements. - - Args: - prompt_messages: Original sequence of prompt messages - - Returns: - list[PromptMessage]: Updated prompt messages with structured output requirements - """ - # Convert schema to string format - schema_str = json.dumps(self._fetch_structured_output_schema(), ensure_ascii=False) - - # Find existing system prompt with schema placeholder - system_prompt = next( - (prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)), - None, - ) - structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str) - # Prepare system prompt content - system_prompt_content = ( - structured_output_prompt + "\n\n" + system_prompt.content - if system_prompt and isinstance(system_prompt.content, str) - else structured_output_prompt - ) - system_prompt = SystemPromptMessage(content=system_prompt_content) - - # Extract content from the last user message - - filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)] - updated_prompt = [system_prompt] + filtered_prompts - - return updated_prompt - - def _set_response_format(self, model_parameters: dict, rules: list) -> None: - """ - Set the appropriate response format parameter based on model rules. - - :param model_parameters: Model parameters to update - :param rules: Model parameter rules - """ - for rule in rules: - if rule.name == "response_format": - if ResponseFormat.JSON.value in rule.options: - model_parameters["response_format"] = ResponseFormat.JSON.value - elif ResponseFormat.JSON_OBJECT.value in rule.options: - model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value - - def _prepare_schema_for_model(self, schema: dict) -> dict: - """ - Prepare JSON schema based on model requirements. - - Different models have different requirements for JSON schema formatting. - This function handles these differences. - - :param schema: The original JSON schema - :return: Processed schema compatible with the current model - """ - - # Deep copy to avoid modifying the original schema - processed_schema = schema.copy() - - # Convert boolean types to string types (common requirement) - convert_boolean_to_string(processed_schema) - - # Apply model-specific transformations - if SpecialModelType.GEMINI in self.node_data.model.name: - remove_additional_properties(processed_schema) - return processed_schema - elif SpecialModelType.OLLAMA in self.node_data.model.provider: - return processed_schema - else: - # Default format with name field - return {"schema": processed_schema, "name": "llm_response"} - def _fetch_model_schema(self, provider: str) -> AIModelEntity | None: """ Fetch model schema @@ -1243,49 +1129,3 @@ def _handle_completion_template( ) prompt_messages.append(prompt_message) return prompt_messages - - -def remove_additional_properties(schema: dict) -> None: - """ - Remove additionalProperties fields from JSON schema. - Used for models like Gemini that don't support this property. - - :param schema: JSON schema to modify in-place - """ - if not isinstance(schema, dict): - return - - # Remove additionalProperties at current level - schema.pop("additionalProperties", None) - - # Process nested structures recursively - for value in schema.values(): - if isinstance(value, dict): - remove_additional_properties(value) - elif isinstance(value, list): - for item in value: - if isinstance(item, dict): - remove_additional_properties(item) - - -def convert_boolean_to_string(schema: dict) -> None: - """ - Convert boolean type specifications to string in JSON schema. - - :param schema: JSON schema to modify in-place - """ - if not isinstance(schema, dict): - return - - # Check for boolean type at current level - if schema.get("type") == "boolean": - schema["type"] = "string" - - # Process nested dictionaries and lists recursively - for value in schema.values(): - if isinstance(value, dict): - convert_boolean_to_string(value) - elif isinstance(value, list): - for item in value: - if isinstance(item, dict): - convert_boolean_to_string(item)