Better Internat Static Typing

This commit is contained in:
2025-09-12 23:58:33 -03:00
parent 5c30e752e3
commit 5eb086bafd
10 changed files with 152 additions and 132 deletions

View File

@@ -1,16 +1,16 @@
from jambo.types.type_parser_options import TypeParserOptions
from jambo.types.type_parser_options import JSONSchema, TypeParserOptions
from pydantic import Field, TypeAdapter
from typing_extensions import Annotated, Any, Generic, Self, TypeVar, Unpack
from typing_extensions import Annotated, Any, ClassVar, Generic, Self, TypeVar, Unpack
from abc import ABC, abstractmethod
T = TypeVar("T")
T = TypeVar("T", bound=type)
class GenericTypeParser(ABC, Generic[T]):
json_schema_type: str = None
json_schema_type: ClassVar[str]
type_mappings: dict[str, str] = {}
@@ -21,7 +21,7 @@ class GenericTypeParser(ABC, Generic[T]):
@abstractmethod
def from_properties_impl(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[T, dict]:
"""
Abstract method to convert properties to a type and its fields properties.
@@ -32,7 +32,7 @@ class GenericTypeParser(ABC, Generic[T]):
"""
def from_properties(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[T, dict]:
"""
Converts properties to a type and its fields properties.
@@ -54,7 +54,7 @@ class GenericTypeParser(ABC, Generic[T]):
@classmethod
def type_from_properties(
cls, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
cls, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[type, dict]:
"""
Factory method to fetch the appropriate type parser based on properties
@@ -69,14 +69,14 @@ class GenericTypeParser(ABC, Generic[T]):
return parser().from_properties(name=name, properties=properties, **kwargs)
@classmethod
def _get_impl(cls, properties: dict[str, Any]) -> type[Self]:
def _get_impl(cls, properties: JSONSchema) -> type[Self]:
for subcls in cls.__subclasses__():
schema_type, schema_value = subcls._get_schema_type()
if schema_type not in properties:
continue
if schema_value is None or schema_value == properties[schema_type]:
if schema_value is None or schema_value == properties[schema_type]: # type: ignore
return subcls
raise ValueError("Unknown type")
@@ -108,7 +108,7 @@ class GenericTypeParser(ABC, Generic[T]):
}
@staticmethod
def _validate_default(field_type: type, field_prop: dict) -> bool:
def _validate_default(field_type: T, field_prop: dict) -> bool:
value = field_prop.get("default")
if value is None and field_prop.get("default_factory") is not None:
@@ -118,7 +118,7 @@ class GenericTypeParser(ABC, Generic[T]):
return True
try:
field = Annotated[field_type, Field(**field_prop)]
field = Annotated[field_type, Field(**field_prop)] # type: ignore
TypeAdapter(field).validate_python(value)
except Exception as _:
return False

View File

@@ -1,7 +1,8 @@
from jambo.parser._type_parser import GenericTypeParser
from jambo.types.json_schema_type import JSONSchema
from jambo.types.type_parser_options import TypeParserOptions
from typing_extensions import Any, Unpack
from typing_extensions import Unpack
class AllOfTypeParser(GenericTypeParser):
@@ -10,7 +11,7 @@ class AllOfTypeParser(GenericTypeParser):
json_schema_type = "allOf"
def from_properties_impl(
self, name, properties, **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
):
sub_properties = properties.get("allOf", [])
@@ -29,12 +30,12 @@ class AllOfTypeParser(GenericTypeParser):
@staticmethod
def _get_type_parser(
sub_properties: list[dict[str, Any]],
sub_properties: list[JSONSchema],
) -> type[GenericTypeParser]:
if not sub_properties:
raise ValueError("Invalid JSON Schema: 'allOf' is empty.")
parsers = set(
parsers: set[type[GenericTypeParser]] = set(
GenericTypeParser._get_impl(sub_property) for sub_property in sub_properties
)
if len(parsers) != 1:
@@ -44,17 +45,19 @@ class AllOfTypeParser(GenericTypeParser):
@staticmethod
def _rebuild_properties_from_subproperties(
sub_properties: list[dict[str, Any]],
) -> dict[str, Any]:
properties = {}
sub_properties: list[JSONSchema],
) -> JSONSchema:
properties: JSONSchema = {}
for subProperty in sub_properties:
for name, prop in subProperty.items():
if name not in properties:
properties[name] = prop
properties[name] = prop # type: ignore
else:
# Merge properties if they exist in both sub-properties
properties[name] = AllOfTypeParser._validate_prop(
name, properties[name], prop
properties[name] = AllOfTypeParser._validate_prop( # type: ignore
name,
properties[name], # type: ignore
prop,
)
return properties

View File

@@ -1,6 +1,6 @@
from jambo.parser._type_parser import GenericTypeParser
from jambo.types.json_schema_type import JSONSchemaNativeTypes
from jambo.types.type_parser_options import TypeParserOptions
from jambo.types.type_parser_options import JSONSchema, TypeParserOptions
from typing_extensions import Unpack
@@ -11,7 +11,7 @@ class EnumTypeParser(GenericTypeParser):
json_schema_type = "enum"
def from_properties_impl(
self, name, properties, **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
):
if "enum" not in properties:
raise ValueError(f"Enum type {name} must have 'enum' property defined.")
@@ -27,7 +27,7 @@ class EnumTypeParser(GenericTypeParser):
)
# Create a new Enum type dynamically
enum_type = Enum(name, {str(value).upper(): value for value in enum_values})
enum_type = Enum(name, {str(value).upper(): value for value in enum_values}) # type: ignore
parsed_properties = self.mappings_properties_builder(properties, **kwargs)
if "default" in parsed_properties and parsed_properties["default"] is not None:

View File

@@ -1,8 +1,10 @@
from jambo.parser._type_parser import GenericTypeParser
from jambo.types.json_schema_type import JSONSchema
from jambo.types.type_parser_options import TypeParserOptions
from pydantic import BaseModel, ConfigDict, Field, create_model
from typing_extensions import Any, Unpack
from pydantic.fields import FieldInfo
from typing_extensions import Unpack
class ObjectTypeParser(GenericTypeParser):
@@ -11,7 +13,7 @@ class ObjectTypeParser(GenericTypeParser):
json_schema_type = "type:object"
def from_properties_impl(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[type[BaseModel], dict]:
type_parsing = self.to_model(
name,
@@ -32,29 +34,29 @@ class ObjectTypeParser(GenericTypeParser):
def to_model(
cls,
name: str,
schema: dict[str, Any],
properties: dict[str, JSONSchema],
required_keys: list[str],
**kwargs: Unpack[TypeParserOptions],
) -> type[BaseModel]:
"""
Converts JSON Schema object properties to a Pydantic model.
:param name: The name of the model.
:param schema: The properties of the JSON Schema object.
:param properties: The properties of the JSON Schema object.
:param required_keys: List of required keys in the schema.
:return: A Pydantic model class.
"""
model_config = ConfigDict(validate_assignment=True)
fields = cls._parse_properties(schema, required_keys, **kwargs)
fields = cls._parse_properties(properties, required_keys, **kwargs)
return create_model(name, __config__=model_config, **fields)
return create_model(name, __config__=model_config, **fields) # type: ignore
@classmethod
def _parse_properties(
cls,
properties: dict[str, Any],
properties: dict[str, JSONSchema],
required_keys: list[str],
**kwargs: Unpack[TypeParserOptions],
) -> dict[str, tuple[type, Field]]:
) -> dict[str, tuple[type, FieldInfo]]:
required_keys = required_keys or []
fields = {}
@@ -63,7 +65,9 @@ class ObjectTypeParser(GenericTypeParser):
sub_property["required"] = name in required_keys
parsed_type, parsed_properties = GenericTypeParser.type_from_properties(
name, prop, **sub_property
name,
prop,
**sub_property, # type: ignore
)
fields[name] = (parsed_type, Field(**parsed_properties))

View File

@@ -5,6 +5,9 @@ from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter, ValidationE
from typing_extensions import Annotated, Any, Union, Unpack, get_args
Annotation = Annotated[Any, ...]
class OneOfTypeParser(GenericTypeParser):
mapped_type = Union
@@ -49,8 +52,8 @@ class OneOfTypeParser(GenericTypeParser):
@staticmethod
def _build_type_one_of_with_discriminator(
subfield_types: list[Annotated], discriminator_prop: dict
) -> Annotated:
subfield_types: list[Annotation], discriminator_prop: dict
) -> Annotation:
"""
Build a type with a discriminator.
"""
@@ -74,7 +77,7 @@ class OneOfTypeParser(GenericTypeParser):
return Annotated[Union[(*subfield_types,)], Field(discriminator=property_name)]
@staticmethod
def _build_type_one_of_with_func(subfield_types: list[Annotated]) -> Annotated:
def _build_type_one_of_with_func(subfield_types: list[Annotation]) -> Annotation:
"""
Build a type with a validation function for the oneOf constraint.
"""

View File

@@ -1,10 +1,11 @@
from jambo.parser import GenericTypeParser
from jambo.types.json_schema_type import JSONSchema
from jambo.types.type_parser_options import TypeParserOptions
from typing_extensions import Any, ForwardRef, Literal, TypeVar, Union, Unpack
from typing_extensions import ForwardRef, Literal, Union, Unpack
RefType = TypeVar("RefType", bound=Union[type, ForwardRef])
RefType = Union[type, ForwardRef]
RefStrategy = Literal["forward_ref", "def_ref"]
@@ -13,7 +14,7 @@ class RefTypeParser(GenericTypeParser):
json_schema_type = "$ref"
def from_properties_impl(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[RefType, dict]:
if "$ref" not in properties:
raise ValueError(f"RefTypeParser: Missing $ref in properties for {name}")
@@ -41,19 +42,19 @@ class RefTypeParser(GenericTypeParser):
# If the reference is either processing or already cached
return ref_state, mapped_properties
ref_cache[ref_name] = self._parse_from_strategy(
ref_strategy, ref_name, ref_property, **kwargs
)
ref = self._parse_from_strategy(ref_strategy, ref_name, ref_property, **kwargs)
ref_cache[ref_name] = ref
return ref_cache[ref_name], mapped_properties
return ref, mapped_properties
def _parse_from_strategy(
self,
ref_strategy: RefStrategy,
ref_name: str,
ref_property: dict[str, Any],
ref_property: JSONSchema,
**kwargs: Unpack[TypeParserOptions],
):
) -> RefType:
mapped_type: RefType
match ref_strategy:
case "forward_ref":
mapped_type = ForwardRef(ref_name)
@@ -69,7 +70,7 @@ class RefTypeParser(GenericTypeParser):
return mapped_type
def _get_ref_from_cache(
self, ref_name: str, ref_cache: dict[str, type]
self, ref_name: str, ref_cache: dict[str, ForwardRef | type | None]
) -> RefType | type | None:
try:
ref_state = ref_cache[ref_name]
@@ -84,10 +85,12 @@ class RefTypeParser(GenericTypeParser):
# If the reference is not in the cache, we will set it to None
ref_cache[ref_name] = None
return None
def _examine_ref_strategy(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
) -> tuple[RefStrategy, str, dict] | None:
if properties["$ref"] == "#":
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[RefStrategy, str, JSONSchema]:
if properties.get("$ref") == "#":
ref_name = kwargs["context"].get("title")
if ref_name is None:
raise ValueError(
@@ -95,7 +98,7 @@ class RefTypeParser(GenericTypeParser):
)
return "forward_ref", ref_name, {}
if properties["$ref"].startswith("#/$defs/"):
if properties.get("$ref", "").startswith("#/$defs/"):
target_name, target_property = self._extract_target_ref(
name, properties, **kwargs
)
@@ -106,8 +109,8 @@ class RefTypeParser(GenericTypeParser):
)
def _extract_target_ref(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
) -> tuple[str, dict]:
self, name: str, properties: JSONSchema, **kwargs: Unpack[TypeParserOptions]
) -> tuple[str, JSONSchema]:
target_name = None
target_property = kwargs["context"]
for prop_name in properties["$ref"].split("/")[1:]:
@@ -117,9 +120,9 @@ class RefTypeParser(GenericTypeParser):
" properties for $ref {properties['$ref']}"
)
target_name = prop_name
target_property = target_property[prop_name]
target_property = target_property[prop_name] # type: ignore
if target_name is None or target_property is None:
if not isinstance(target_name, str) or target_property is None:
raise ValueError(f"RefTypeParser: Invalid $ref {properties['$ref']}")
return target_name, target_property

View File

@@ -25,7 +25,7 @@ class SchemaConverter:
try:
validator = validator_for(schema)
validator.check_schema(schema)
validator.check_schema(schema) # type: ignore
except SchemaError as e:
raise ValueError(f"Invalid JSON Schema: {e}")
@@ -42,6 +42,7 @@ class SchemaConverter:
schema.get("required", []),
context=schema,
ref_cache=dict(),
required=True,
)
case "$ref":
@@ -50,6 +51,7 @@ class SchemaConverter:
schema,
context=schema,
ref_cache=dict(),
required=True,
)
return parsed_model
case _:
@@ -65,4 +67,8 @@ class SchemaConverter:
if "$ref" in schema:
return "$ref"
return schema.get("type", "undefined")
schema_type = schema.get("type")
if isinstance(schema_type, str):
return schema_type
raise ValueError("Schema must have a valid 'type' or '$ref' field.")

View File

@@ -1,93 +1,80 @@
from typing_extensions import Dict, List, Literal, TypedDict, Union
from __future__ import annotations
from typing_extensions import (
Dict,
List,
Literal,
TypedDict,
Union,
)
from types import NoneType
# Primitive JSON types
JSONSchemaType = Literal[
"string", "number", "integer", "boolean", "object", "array", "null"
]
JSONSchemaNativeTypes: tuple[type, ...] = (
str,
int,
float,
int,
bool,
list,
set,
NoneType,
)
JSONType = Union[str, int, float, bool, None, Dict[str, "JSONType"], List["JSONType"]]
class JSONSchema(TypedDict, total=False):
# Basic metadata
title: str
description: str
default: JSONType
examples: List[JSONType]
# Type definitions
type: Union[JSONSchemaType, List[JSONSchemaType]]
# Object-specific keywords
properties: Dict[str, "JSONSchema"]
required: List[str]
additionalProperties: Union[bool, "JSONSchema"]
minProperties: int
maxProperties: int
patternProperties: Dict[str, "JSONSchema"]
dependencies: Dict[str, Union[List[str], "JSONSchema"]]
# Array-specific keywords
items: Union["JSONSchema", List["JSONSchema"]]
additionalItems: Union[bool, "JSONSchema"]
minItems: int
maxItems: int
uniqueItems: bool
# String-specific keywords
minLength: int
maxLength: int
pattern: str
format: str
# Number-specific keywords
minimum: float
maximum: float
exclusiveMinimum: float
exclusiveMaximum: float
multipleOf: float
# Enum and const
enum: List[JSONType]
const: JSONType
# Conditionals
if_: "JSONSchema" # 'if' is a reserved word in Python
then: "JSONSchema"
else_: "JSONSchema" # 'else' is also a reserved word
# Combination keywords
allOf: List["JSONSchema"]
anyOf: List["JSONSchema"]
oneOf: List["JSONSchema"]
not_: "JSONSchema" # 'not' is a reserved word
# Fix forward references
JSONSchema.__annotations__["properties"] = Dict[str, JSONSchema]
JSONSchema.__annotations__["items"] = Union[JSONSchema, List[JSONSchema]]
JSONSchema.__annotations__["additionalItems"] = Union[bool, JSONSchema]
JSONSchema.__annotations__["additionalProperties"] = Union[bool, JSONSchema]
JSONSchema.__annotations__["patternProperties"] = Dict[str, JSONSchema]
JSONSchema.__annotations__["dependencies"] = Dict[str, Union[List[str], JSONSchema]]
JSONSchema.__annotations__["if_"] = JSONSchema
JSONSchema.__annotations__["then"] = JSONSchema
JSONSchema.__annotations__["else_"] = JSONSchema
JSONSchema.__annotations__["allOf"] = List[JSONSchema]
JSONSchema.__annotations__["anyOf"] = List[JSONSchema]
JSONSchema.__annotations__["oneOf"] = List[JSONSchema]
JSONSchema.__annotations__["not_"] = JSONSchema
# Dynamically define TypedDict with JSON Schema keywords
JSONSchema = TypedDict(
"JSONSchema",
{
"$id": str,
"$schema": str,
"$ref": str,
"$anchor": str,
"$comment": str,
"$defs": Dict[str, "JSONSchema"],
"title": str,
"description": str,
"default": JSONType,
"examples": List[JSONType],
"type": Union[JSONSchemaType, List[JSONSchemaType]],
"enum": List[JSONType],
"const": JSONType,
"properties": Dict[str, "JSONSchema"],
"patternProperties": Dict[str, "JSONSchema"],
"additionalProperties": Union[bool, "JSONSchema"],
"required": List[str],
"minProperties": int,
"maxProperties": int,
"dependencies": Dict[str, Union[List[str], "JSONSchema"]],
"items": Union["JSONSchema", List["JSONSchema"]],
"prefixItems": List["JSONSchema"],
"additionalItems": Union[bool, "JSONSchema"],
"contains": "JSONSchema",
"minItems": int,
"maxItems": int,
"uniqueItems": bool,
"minLength": int,
"maxLength": int,
"pattern": str,
"format": str,
"minimum": float,
"maximum": float,
"exclusiveMinimum": Union[bool, float],
"exclusiveMaximum": Union[bool, float],
"multipleOf": float,
"if": "JSONSchema",
"then": "JSONSchema",
"else": "JSONSchema",
"allOf": List["JSONSchema"],
"anyOf": List["JSONSchema"],
"oneOf": List["JSONSchema"],
"not": "JSONSchema",
},
total=False, # all fields optional
)

View File

@@ -1,9 +1,9 @@
from jambo.types.json_schema_type import JSONSchema
from typing_extensions import TypedDict
from typing_extensions import ForwardRef, TypedDict
class TypeParserOptions(TypedDict):
required: bool
context: JSONSchema
ref_cache: dict[str, type]
ref_cache: dict[str, ForwardRef | type | None]

View File

@@ -26,6 +26,20 @@ class TestSchemaConverter(TestCase):
with self.assertRaises(ValueError):
SchemaConverter.build(schema)
def test_invalid_schema_type(self):
schema = {
"title": 1,
"description": "A person",
"type": 1,
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
},
}
with self.assertRaises(ValueError):
SchemaConverter.build(schema)
def test_build_expects_title(self):
schema = {
"description": "A person",