From 5eb086bafdfa9a814287e1759e7bd1de48cd972f Mon Sep 17 00:00:00 2001 From: Vitor Hideyoshi Date: Fri, 12 Sep 2025 23:58:33 -0300 Subject: [PATCH] Better Internat Static Typing --- jambo/parser/_type_parser.py | 22 ++--- jambo/parser/allof_type_parser.py | 23 ++--- jambo/parser/enum_type_parser.py | 6 +- jambo/parser/object_type_parser.py | 22 +++-- jambo/parser/oneof_type_parser.py | 9 +- jambo/parser/ref_type_parser.py | 39 +++++---- jambo/schema_converter.py | 10 ++- jambo/types/json_schema_type.py | 135 +++++++++++++---------------- jambo/types/type_parser_options.py | 4 +- tests/test_schema_converter.py | 14 +++ 10 files changed, 152 insertions(+), 132 deletions(-) diff --git a/jambo/parser/_type_parser.py b/jambo/parser/_type_parser.py index 080965c..cce8042 100644 --- a/jambo/parser/_type_parser.py +++ b/jambo/parser/_type_parser.py @@ -1,16 +1,16 @@ -from jambo.types.type_parser_options import TypeParserOptions +from jambo.types.type_parser_options import JSONSchema, TypeParserOptions from pydantic import Field, TypeAdapter -from typing_extensions import Annotated, Any, Generic, Self, TypeVar, Unpack +from typing_extensions import Annotated, Any, ClassVar, Generic, Self, TypeVar, Unpack from abc import ABC, abstractmethod -T = TypeVar("T") +T = TypeVar("T", bound=type) class GenericTypeParser(ABC, Generic[T]): - json_schema_type: str = None + json_schema_type: ClassVar[str] type_mappings: dict[str, str] = {} @@ -21,7 +21,7 @@ class GenericTypeParser(ABC, Generic[T]): @abstractmethod def from_properties_impl( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ) -> tuple[T, dict]: """ Abstract method to convert properties to a type and its fields properties. @@ -32,7 +32,7 @@ class GenericTypeParser(ABC, Generic[T]): """ def from_properties( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ) -> tuple[T, dict]: """ Converts properties to a type and its fields properties. @@ -54,7 +54,7 @@ class GenericTypeParser(ABC, Generic[T]): @classmethod def type_from_properties( - cls, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] + cls, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ) -> tuple[type, dict]: """ Factory method to fetch the appropriate type parser based on properties @@ -69,14 +69,14 @@ class GenericTypeParser(ABC, Generic[T]): return parser().from_properties(name=name, properties=properties, **kwargs) @classmethod - def _get_impl(cls, properties: dict[str, Any]) -> type[Self]: + def _get_impl(cls, properties: JSONSchema) -> type[Self]: for subcls in cls.__subclasses__(): schema_type, schema_value = subcls._get_schema_type() if schema_type not in properties: continue - if schema_value is None or schema_value == properties[schema_type]: + if schema_value is None or schema_value == properties[schema_type]: # type: ignore return subcls raise ValueError("Unknown type") @@ -108,7 +108,7 @@ class GenericTypeParser(ABC, Generic[T]): } @staticmethod - def _validate_default(field_type: type, field_prop: dict) -> bool: + def _validate_default(field_type: T, field_prop: dict) -> bool: value = field_prop.get("default") if value is None and field_prop.get("default_factory") is not None: @@ -118,7 +118,7 @@ class GenericTypeParser(ABC, Generic[T]): return True try: - field = Annotated[field_type, Field(**field_prop)] + field = Annotated[field_type, Field(**field_prop)] # type: ignore TypeAdapter(field).validate_python(value) except Exception as _: return False diff --git a/jambo/parser/allof_type_parser.py b/jambo/parser/allof_type_parser.py index 3180ae3..2fb62f9 100644 --- a/jambo/parser/allof_type_parser.py +++ b/jambo/parser/allof_type_parser.py @@ -1,7 +1,8 @@ from jambo.parser._type_parser import GenericTypeParser +from jambo.types.json_schema_type import JSONSchema from jambo.types.type_parser_options import TypeParserOptions -from typing_extensions import Any, Unpack +from typing_extensions import Unpack class AllOfTypeParser(GenericTypeParser): @@ -10,7 +11,7 @@ class AllOfTypeParser(GenericTypeParser): json_schema_type = "allOf" def from_properties_impl( - self, name, properties, **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ): sub_properties = properties.get("allOf", []) @@ -29,12 +30,12 @@ class AllOfTypeParser(GenericTypeParser): @staticmethod def _get_type_parser( - sub_properties: list[dict[str, Any]], + sub_properties: list[JSONSchema], ) -> type[GenericTypeParser]: if not sub_properties: raise ValueError("Invalid JSON Schema: 'allOf' is empty.") - parsers = set( + parsers: set[type[GenericTypeParser]] = set( GenericTypeParser._get_impl(sub_property) for sub_property in sub_properties ) if len(parsers) != 1: @@ -44,17 +45,19 @@ class AllOfTypeParser(GenericTypeParser): @staticmethod def _rebuild_properties_from_subproperties( - sub_properties: list[dict[str, Any]], - ) -> dict[str, Any]: - properties = {} + sub_properties: list[JSONSchema], + ) -> JSONSchema: + properties: JSONSchema = {} for subProperty in sub_properties: for name, prop in subProperty.items(): if name not in properties: - properties[name] = prop + properties[name] = prop # type: ignore else: # Merge properties if they exist in both sub-properties - properties[name] = AllOfTypeParser._validate_prop( - name, properties[name], prop + properties[name] = AllOfTypeParser._validate_prop( # type: ignore + name, + properties[name], # type: ignore + prop, ) return properties diff --git a/jambo/parser/enum_type_parser.py b/jambo/parser/enum_type_parser.py index 42f4d6f..c59a725 100644 --- a/jambo/parser/enum_type_parser.py +++ b/jambo/parser/enum_type_parser.py @@ -1,6 +1,6 @@ from jambo.parser._type_parser import GenericTypeParser from jambo.types.json_schema_type import JSONSchemaNativeTypes -from jambo.types.type_parser_options import TypeParserOptions +from jambo.types.type_parser_options import JSONSchema, TypeParserOptions from typing_extensions import Unpack @@ -11,7 +11,7 @@ class EnumTypeParser(GenericTypeParser): json_schema_type = "enum" def from_properties_impl( - self, name, properties, **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ): if "enum" not in properties: raise ValueError(f"Enum type {name} must have 'enum' property defined.") @@ -27,7 +27,7 @@ class EnumTypeParser(GenericTypeParser): ) # Create a new Enum type dynamically - enum_type = Enum(name, {str(value).upper(): value for value in enum_values}) + enum_type = Enum(name, {str(value).upper(): value for value in enum_values}) # type: ignore parsed_properties = self.mappings_properties_builder(properties, **kwargs) if "default" in parsed_properties and parsed_properties["default"] is not None: diff --git a/jambo/parser/object_type_parser.py b/jambo/parser/object_type_parser.py index 0f0ab7e..6cb60e7 100644 --- a/jambo/parser/object_type_parser.py +++ b/jambo/parser/object_type_parser.py @@ -1,8 +1,10 @@ from jambo.parser._type_parser import GenericTypeParser +from jambo.types.json_schema_type import JSONSchema from jambo.types.type_parser_options import TypeParserOptions from pydantic import BaseModel, ConfigDict, Field, create_model -from typing_extensions import Any, Unpack +from pydantic.fields import FieldInfo +from typing_extensions import Unpack class ObjectTypeParser(GenericTypeParser): @@ -11,7 +13,7 @@ class ObjectTypeParser(GenericTypeParser): json_schema_type = "type:object" def from_properties_impl( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ) -> tuple[type[BaseModel], dict]: type_parsing = self.to_model( name, @@ -32,29 +34,29 @@ class ObjectTypeParser(GenericTypeParser): def to_model( cls, name: str, - schema: dict[str, Any], + properties: dict[str, JSONSchema], required_keys: list[str], **kwargs: Unpack[TypeParserOptions], ) -> type[BaseModel]: """ Converts JSON Schema object properties to a Pydantic model. :param name: The name of the model. - :param schema: The properties of the JSON Schema object. + :param properties: The properties of the JSON Schema object. :param required_keys: List of required keys in the schema. :return: A Pydantic model class. """ model_config = ConfigDict(validate_assignment=True) - fields = cls._parse_properties(schema, required_keys, **kwargs) + fields = cls._parse_properties(properties, required_keys, **kwargs) - return create_model(name, __config__=model_config, **fields) + return create_model(name, __config__=model_config, **fields) # type: ignore @classmethod def _parse_properties( cls, - properties: dict[str, Any], + properties: dict[str, JSONSchema], required_keys: list[str], **kwargs: Unpack[TypeParserOptions], - ) -> dict[str, tuple[type, Field]]: + ) -> dict[str, tuple[type, FieldInfo]]: required_keys = required_keys or [] fields = {} @@ -63,7 +65,9 @@ class ObjectTypeParser(GenericTypeParser): sub_property["required"] = name in required_keys parsed_type, parsed_properties = GenericTypeParser.type_from_properties( - name, prop, **sub_property + name, + prop, + **sub_property, # type: ignore ) fields[name] = (parsed_type, Field(**parsed_properties)) diff --git a/jambo/parser/oneof_type_parser.py b/jambo/parser/oneof_type_parser.py index 707d277..317ce61 100644 --- a/jambo/parser/oneof_type_parser.py +++ b/jambo/parser/oneof_type_parser.py @@ -5,6 +5,9 @@ from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter, ValidationE from typing_extensions import Annotated, Any, Union, Unpack, get_args +Annotation = Annotated[Any, ...] + + class OneOfTypeParser(GenericTypeParser): mapped_type = Union @@ -49,8 +52,8 @@ class OneOfTypeParser(GenericTypeParser): @staticmethod def _build_type_one_of_with_discriminator( - subfield_types: list[Annotated], discriminator_prop: dict - ) -> Annotated: + subfield_types: list[Annotation], discriminator_prop: dict + ) -> Annotation: """ Build a type with a discriminator. """ @@ -74,7 +77,7 @@ class OneOfTypeParser(GenericTypeParser): return Annotated[Union[(*subfield_types,)], Field(discriminator=property_name)] @staticmethod - def _build_type_one_of_with_func(subfield_types: list[Annotated]) -> Annotated: + def _build_type_one_of_with_func(subfield_types: list[Annotation]) -> Annotation: """ Build a type with a validation function for the oneOf constraint. """ diff --git a/jambo/parser/ref_type_parser.py b/jambo/parser/ref_type_parser.py index 57abeac..7aa435d 100644 --- a/jambo/parser/ref_type_parser.py +++ b/jambo/parser/ref_type_parser.py @@ -1,10 +1,11 @@ from jambo.parser import GenericTypeParser +from jambo.types.json_schema_type import JSONSchema from jambo.types.type_parser_options import TypeParserOptions -from typing_extensions import Any, ForwardRef, Literal, TypeVar, Union, Unpack +from typing_extensions import ForwardRef, Literal, Union, Unpack -RefType = TypeVar("RefType", bound=Union[type, ForwardRef]) +RefType = Union[type, ForwardRef] RefStrategy = Literal["forward_ref", "def_ref"] @@ -13,7 +14,7 @@ class RefTypeParser(GenericTypeParser): json_schema_type = "$ref" def from_properties_impl( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] ) -> tuple[RefType, dict]: if "$ref" not in properties: raise ValueError(f"RefTypeParser: Missing $ref in properties for {name}") @@ -41,19 +42,19 @@ class RefTypeParser(GenericTypeParser): # If the reference is either processing or already cached return ref_state, mapped_properties - ref_cache[ref_name] = self._parse_from_strategy( - ref_strategy, ref_name, ref_property, **kwargs - ) + ref = self._parse_from_strategy(ref_strategy, ref_name, ref_property, **kwargs) + ref_cache[ref_name] = ref - return ref_cache[ref_name], mapped_properties + return ref, mapped_properties def _parse_from_strategy( self, ref_strategy: RefStrategy, ref_name: str, - ref_property: dict[str, Any], + ref_property: JSONSchema, **kwargs: Unpack[TypeParserOptions], - ): + ) -> RefType: + mapped_type: RefType match ref_strategy: case "forward_ref": mapped_type = ForwardRef(ref_name) @@ -69,7 +70,7 @@ class RefTypeParser(GenericTypeParser): return mapped_type def _get_ref_from_cache( - self, ref_name: str, ref_cache: dict[str, type] + self, ref_name: str, ref_cache: dict[str, ForwardRef | type | None] ) -> RefType | type | None: try: ref_state = ref_cache[ref_name] @@ -84,10 +85,12 @@ class RefTypeParser(GenericTypeParser): # If the reference is not in the cache, we will set it to None ref_cache[ref_name] = None + return None + def _examine_ref_strategy( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] - ) -> tuple[RefStrategy, str, dict] | None: - if properties["$ref"] == "#": + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] + ) -> tuple[RefStrategy, str, JSONSchema]: + if properties.get("$ref") == "#": ref_name = kwargs["context"].get("title") if ref_name is None: raise ValueError( @@ -95,7 +98,7 @@ class RefTypeParser(GenericTypeParser): ) return "forward_ref", ref_name, {} - if properties["$ref"].startswith("#/$defs/"): + if properties.get("$ref", "").startswith("#/$defs/"): target_name, target_property = self._extract_target_ref( name, properties, **kwargs ) @@ -106,8 +109,8 @@ class RefTypeParser(GenericTypeParser): ) def _extract_target_ref( - self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] - ) -> tuple[str, dict]: + self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions] + ) -> tuple[str, JSONSchema]: target_name = None target_property = kwargs["context"] for prop_name in properties["$ref"].split("/")[1:]: @@ -117,9 +120,9 @@ class RefTypeParser(GenericTypeParser): " properties for $ref {properties['$ref']}" ) target_name = prop_name - target_property = target_property[prop_name] + target_property = target_property[prop_name] # type: ignore - if target_name is None or target_property is None: + if not isinstance(target_name, str) or target_property is None: raise ValueError(f"RefTypeParser: Invalid $ref {properties['$ref']}") return target_name, target_property diff --git a/jambo/schema_converter.py b/jambo/schema_converter.py index 6f9020e..da89d21 100644 --- a/jambo/schema_converter.py +++ b/jambo/schema_converter.py @@ -25,7 +25,7 @@ class SchemaConverter: try: validator = validator_for(schema) - validator.check_schema(schema) + validator.check_schema(schema) # type: ignore except SchemaError as e: raise ValueError(f"Invalid JSON Schema: {e}") @@ -42,6 +42,7 @@ class SchemaConverter: schema.get("required", []), context=schema, ref_cache=dict(), + required=True, ) case "$ref": @@ -50,6 +51,7 @@ class SchemaConverter: schema, context=schema, ref_cache=dict(), + required=True, ) return parsed_model case _: @@ -65,4 +67,8 @@ class SchemaConverter: if "$ref" in schema: return "$ref" - return schema.get("type", "undefined") + schema_type = schema.get("type") + if isinstance(schema_type, str): + return schema_type + + raise ValueError("Schema must have a valid 'type' or '$ref' field.") diff --git a/jambo/types/json_schema_type.py b/jambo/types/json_schema_type.py index d99d791..dcb0951 100644 --- a/jambo/types/json_schema_type.py +++ b/jambo/types/json_schema_type.py @@ -1,93 +1,80 @@ -from typing_extensions import Dict, List, Literal, TypedDict, Union +from __future__ import annotations + +from typing_extensions import ( + Dict, + List, + Literal, + TypedDict, + Union, +) from types import NoneType +# Primitive JSON types JSONSchemaType = Literal[ "string", "number", "integer", "boolean", "object", "array", "null" ] - JSONSchemaNativeTypes: tuple[type, ...] = ( str, - int, float, + int, bool, list, set, NoneType, ) - JSONType = Union[str, int, float, bool, None, Dict[str, "JSONType"], List["JSONType"]] - -class JSONSchema(TypedDict, total=False): - # Basic metadata - title: str - description: str - default: JSONType - examples: List[JSONType] - - # Type definitions - type: Union[JSONSchemaType, List[JSONSchemaType]] - - # Object-specific keywords - properties: Dict[str, "JSONSchema"] - required: List[str] - additionalProperties: Union[bool, "JSONSchema"] - minProperties: int - maxProperties: int - patternProperties: Dict[str, "JSONSchema"] - dependencies: Dict[str, Union[List[str], "JSONSchema"]] - - # Array-specific keywords - items: Union["JSONSchema", List["JSONSchema"]] - additionalItems: Union[bool, "JSONSchema"] - minItems: int - maxItems: int - uniqueItems: bool - - # String-specific keywords - minLength: int - maxLength: int - pattern: str - format: str - - # Number-specific keywords - minimum: float - maximum: float - exclusiveMinimum: float - exclusiveMaximum: float - multipleOf: float - - # Enum and const - enum: List[JSONType] - const: JSONType - - # Conditionals - if_: "JSONSchema" # 'if' is a reserved word in Python - then: "JSONSchema" - else_: "JSONSchema" # 'else' is also a reserved word - - # Combination keywords - allOf: List["JSONSchema"] - anyOf: List["JSONSchema"] - oneOf: List["JSONSchema"] - not_: "JSONSchema" # 'not' is a reserved word - - -# Fix forward references -JSONSchema.__annotations__["properties"] = Dict[str, JSONSchema] -JSONSchema.__annotations__["items"] = Union[JSONSchema, List[JSONSchema]] -JSONSchema.__annotations__["additionalItems"] = Union[bool, JSONSchema] -JSONSchema.__annotations__["additionalProperties"] = Union[bool, JSONSchema] -JSONSchema.__annotations__["patternProperties"] = Dict[str, JSONSchema] -JSONSchema.__annotations__["dependencies"] = Dict[str, Union[List[str], JSONSchema]] -JSONSchema.__annotations__["if_"] = JSONSchema -JSONSchema.__annotations__["then"] = JSONSchema -JSONSchema.__annotations__["else_"] = JSONSchema -JSONSchema.__annotations__["allOf"] = List[JSONSchema] -JSONSchema.__annotations__["anyOf"] = List[JSONSchema] -JSONSchema.__annotations__["oneOf"] = List[JSONSchema] -JSONSchema.__annotations__["not_"] = JSONSchema +# Dynamically define TypedDict with JSON Schema keywords +JSONSchema = TypedDict( + "JSONSchema", + { + "$id": str, + "$schema": str, + "$ref": str, + "$anchor": str, + "$comment": str, + "$defs": Dict[str, "JSONSchema"], + "title": str, + "description": str, + "default": JSONType, + "examples": List[JSONType], + "type": Union[JSONSchemaType, List[JSONSchemaType]], + "enum": List[JSONType], + "const": JSONType, + "properties": Dict[str, "JSONSchema"], + "patternProperties": Dict[str, "JSONSchema"], + "additionalProperties": Union[bool, "JSONSchema"], + "required": List[str], + "minProperties": int, + "maxProperties": int, + "dependencies": Dict[str, Union[List[str], "JSONSchema"]], + "items": Union["JSONSchema", List["JSONSchema"]], + "prefixItems": List["JSONSchema"], + "additionalItems": Union[bool, "JSONSchema"], + "contains": "JSONSchema", + "minItems": int, + "maxItems": int, + "uniqueItems": bool, + "minLength": int, + "maxLength": int, + "pattern": str, + "format": str, + "minimum": float, + "maximum": float, + "exclusiveMinimum": Union[bool, float], + "exclusiveMaximum": Union[bool, float], + "multipleOf": float, + "if": "JSONSchema", + "then": "JSONSchema", + "else": "JSONSchema", + "allOf": List["JSONSchema"], + "anyOf": List["JSONSchema"], + "oneOf": List["JSONSchema"], + "not": "JSONSchema", + }, + total=False, # all fields optional +) diff --git a/jambo/types/type_parser_options.py b/jambo/types/type_parser_options.py index 4f7d8e0..baf518b 100644 --- a/jambo/types/type_parser_options.py +++ b/jambo/types/type_parser_options.py @@ -1,9 +1,9 @@ from jambo.types.json_schema_type import JSONSchema -from typing_extensions import TypedDict +from typing_extensions import ForwardRef, TypedDict class TypeParserOptions(TypedDict): required: bool context: JSONSchema - ref_cache: dict[str, type] + ref_cache: dict[str, ForwardRef | type | None] diff --git a/tests/test_schema_converter.py b/tests/test_schema_converter.py index cdd294d..6f831f2 100644 --- a/tests/test_schema_converter.py +++ b/tests/test_schema_converter.py @@ -26,6 +26,20 @@ class TestSchemaConverter(TestCase): with self.assertRaises(ValueError): SchemaConverter.build(schema) + def test_invalid_schema_type(self): + schema = { + "title": 1, + "description": "A person", + "type": 1, + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + }, + } + + with self.assertRaises(ValueError): + SchemaConverter.build(schema) + def test_build_expects_title(self): schema = { "description": "A person",