[FEATURE] Implementation of $ref JSON Schema Keyword #20

Merged
HideyoshiNakazone merged 6 commits from feature/ref-type-parser into main 2025-06-20 01:09:11 +00:00
7 changed files with 234 additions and 63 deletions
Showing only changes of commit f4effac41c - Show all commits

View File

@@ -33,7 +33,7 @@ class GenericTypeParser(ABC, Generic[T]):
def from_properties( def from_properties(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
) -> tuple[type, dict]: ) -> tuple[T, dict]:
""" """
Converts properties to a type and its fields properties. Converts properties to a type and its fields properties.
:param name: The name of the type. :param name: The name of the type.

View File

@@ -1,8 +1,7 @@
from jambo.parser._type_parser import GenericTypeParser from jambo.parser._type_parser import GenericTypeParser
from jambo.types.type_parser_options import TypeParserOptions from jambo.types.type_parser_options import TypeParserOptions
from pydantic import Field, create_model from pydantic import BaseModel, Field, create_model
from pydantic.main import ModelT
from typing_extensions import Any, Unpack from typing_extensions import Any, Unpack
@@ -13,7 +12,7 @@ class ObjectTypeParser(GenericTypeParser):
def from_properties_impl( def from_properties_impl(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions] self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
): ) -> tuple[type[BaseModel], dict]:
type_parsing = self.to_model( type_parsing = self.to_model(
name, name,
properties.get("properties", {}), properties.get("properties", {}),
@@ -29,13 +28,14 @@ class ObjectTypeParser(GenericTypeParser):
return type_parsing, type_properties return type_parsing, type_properties
@classmethod
def to_model( def to_model(
self, cls,
name: str, name: str,
schema: dict[str, Any], schema: dict[str, Any],
required_keys: list[str], required_keys: list[str],
**kwargs: Unpack[TypeParserOptions], **kwargs: Unpack[TypeParserOptions],
) -> type[ModelT]: ) -> type[BaseModel]:
""" """
Converts JSON Schema object properties to a Pydantic model. Converts JSON Schema object properties to a Pydantic model.
:param name: The name of the model. :param name: The name of the model.
@@ -43,11 +43,12 @@ class ObjectTypeParser(GenericTypeParser):
:param required_keys: List of required keys in the schema. :param required_keys: List of required keys in the schema.
:return: A Pydantic model class. :return: A Pydantic model class.
""" """
fields = self._parse_properties(schema, required_keys, **kwargs) fields = cls._parse_properties(schema, required_keys, **kwargs)
return create_model(name, **fields) return create_model(name, **fields)
@staticmethod @classmethod
def _parse_properties( def _parse_properties(
cls,
properties: dict[str, Any], properties: dict[str, Any],
required_keys: list[str], required_keys: list[str],
**kwargs: Unpack[TypeParserOptions], **kwargs: Unpack[TypeParserOptions],

View File

@@ -1,7 +1,7 @@
from jambo.parser import GenericTypeParser from jambo.parser import GenericTypeParser
from jambo.types.type_parser_options import TypeParserOptions from jambo.types.type_parser_options import TypeParserOptions
from typing_extensions import Any, ForwardRef, TypeVar, Union, Unpack from typing_extensions import Any, ForwardRef, Literal, TypeVar, Union, Unpack
RefType = TypeVar("RefType", bound=Union[type, ForwardRef]) RefType = TypeVar("RefType", bound=Union[type, ForwardRef])
@@ -17,7 +17,10 @@ class RefTypeParser(GenericTypeParser):
raise ValueError(f"RefTypeParser: Missing $ref in properties for {name}") raise ValueError(f"RefTypeParser: Missing $ref in properties for {name}")
context = kwargs["context"] context = kwargs["context"]
required = kwargs.get("required", False) ref_cache = kwargs["ref_cache"]
mapped_type = None
mapped_properties = self.mappings_properties_builder(properties, **kwargs)
if context is None: if context is None:
raise RuntimeError( raise RuntimeError(
@@ -30,31 +33,72 @@ class RefTypeParser(GenericTypeParser):
"Look into $defs and # for recursive references." "Look into $defs and # for recursive references."
) )
if properties["$ref"] == "#": ref_strategy, ref_name, ref_property = self._examine_ref_strategy(
if "title" not in context: name, properties, **kwargs
)
# In this code ellipsis is used to indicate that the reference is still being processed,
# If the reference is already in the cache, return it.
ref_state = ref_cache.setdefault(ref_name)
if ref_state is Ellipsis:
return ForwardRef(ref_name), mapped_properties
elif ref_state is not None:
return ref_state, mapped_properties
else:
ref_cache[ref_name] = Ellipsis
match ref_strategy:
case "forward_ref":
mapped_type = ForwardRef(ref_name)
case "def_ref":
mapped_type, _ = GenericTypeParser.type_from_properties(
ref_name, ref_property, **kwargs
)
case _:
raise ValueError( raise ValueError(
"RefTypeParser: Missing title in properties for $ref #" f"RefTypeParser: Unsupported $ref {properties['$ref']}"
) )
return ForwardRef(context["title"]), {} # Sets cached reference to the mapped type
ref_cache[ref_name] = mapped_type
elif properties["$ref"].startswith("#/$defs/"): return mapped_type, mapped_properties
target_name = None
target_property = context
for prop_name in properties["$ref"].split("/")[1:]:
if prop_name not in target_property:
raise ValueError(
f"RefTypeParser: Missing {prop_name} in"
" properties for $ref {properties['$ref']}"
)
target_name = prop_name
target_property = target_property[prop_name]
if target_name is None or target_property is None: def _examine_ref_strategy(
raise ValueError(f"RefTypeParser: Invalid $ref {properties['$ref']}") self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
) -> tuple[Literal["forward_ref", "def_ref"], str, dict]:
if properties["$ref"] == "#":
ref_name = kwargs["context"].get("title")
if ref_name is None:
raise ValueError(
f"RefTypeParser: Missing title in properties for $ref {properties['$ref']}"
)
return "forward_ref", ref_name, {}
return GenericTypeParser.type_from_properties( if properties["$ref"].startswith("#/$defs/"):
target_name, target_property, **kwargs target_name, target_property = self._extract_target_ref(
name, properties, **kwargs
) )
return "def_ref", target_name, target_property
raise ValueError(f"RefTypeParser: Unsupported $ref {properties['$ref']}") raise ValueError(f"RefTypeParser: Unsupported $ref {properties['$ref']}")
def _extract_target_ref(
self, name: str, properties: dict[str, Any], **kwargs: Unpack[TypeParserOptions]
) -> tuple[str, dict]:
target_name = None
target_property = kwargs["context"]
for prop_name in properties["$ref"].split("/")[1:]:
if prop_name not in target_property:
raise ValueError(
f"RefTypeParser: Missing {prop_name} in"
" properties for $ref {properties['$ref']}"
)
target_name = prop_name
target_property = target_property[prop_name]
if target_name is None or target_property is None:
raise ValueError(f"RefTypeParser: Invalid $ref {properties['$ref']}")
return target_name, target_property

View File

@@ -3,7 +3,7 @@ from jambo.types.json_schema_type import JSONSchema
from jsonschema.exceptions import SchemaError from jsonschema.exceptions import SchemaError
from jsonschema.validators import validator_for from jsonschema.validators import validator_for
from pydantic.main import ModelT from pydantic import BaseModel
class SchemaConverter: class SchemaConverter:
@@ -16,7 +16,7 @@ class SchemaConverter:
""" """
@staticmethod @staticmethod
def build(schema: JSONSchema) -> type[ModelT]: def build(schema: JSONSchema) -> type[BaseModel]:
""" """
Converts a JSON Schema to a Pydantic model. Converts a JSON Schema to a Pydantic model.
:param schema: The JSON Schema to convert. :param schema: The JSON Schema to convert.
@@ -32,11 +32,22 @@ class SchemaConverter:
if "title" not in schema: if "title" not in schema:
raise ValueError("JSON Schema must have a title.") raise ValueError("JSON Schema must have a title.")
if schema["type"] != "object": if (schema_type := schema.get("type", "undefined")) != "object":
raise TypeError( raise TypeError(
f"Invalid JSON Schema: {schema['type']}. Only 'object' can be converted to Pydantic models." f"Invalid JSON Schema: {schema_type}. Only 'object' can be converted to Pydantic models."
) )
return ObjectTypeParser().to_model( parsed_model = ObjectTypeParser.to_model(
schema["title"], schema.get("properties"), schema.get("required") schema["title"],
schema.get("properties"),
schema.get("required"),
context=schema,
ref_cache=dict(),
) )
if not issubclass(parsed_model, BaseModel):
raise TypeError(
f"Parsed model {parsed_model.__name__} is not a subclass of BaseModel."
)
return parsed_model

View File

@@ -1,9 +1,9 @@
from jambo.types.json_schema_type import JSONSchema from jambo.types.json_schema_type import JSONSchema
from typing_extensions import NotRequired, TypedDict from typing_extensions import TypedDict
class TypeParserOptions(TypedDict): class TypeParserOptions(TypedDict):
required: bool required: bool
context: JSONSchema context: JSONSchema
ref_cache: NotRequired[dict[str, type]] ref_cache: dict[str, type]

View File

@@ -1,7 +1,5 @@
from jambo.parser import ObjectTypeParser, RefTypeParser from jambo.parser import ObjectTypeParser, RefTypeParser
from typing_extensions import ForwardRef, get_type_hints
from unittest import TestCase from unittest import TestCase
@@ -25,6 +23,7 @@ class TestRefTypeParser(TestCase):
"person", "person",
properties, properties,
context=properties, context=properties,
ref_cache={},
required=True, required=True,
) )
@@ -46,38 +45,101 @@ class TestRefTypeParser(TestCase):
"$ref": "#", "$ref": "#",
}, },
}, },
"required": ["name", "age"],
} }
type_parsing, type_validator = ObjectTypeParser().from_properties( model, type_validator = ObjectTypeParser().from_properties(
"person", "person",
properties, properties,
context=properties, context=properties,
ref_cache={},
required=True, required=True,
) )
type_parsing.update_forward_refs(person=type_parsing)
self.assertIsInstance(type_parsing, type) obj = model(
name="John",
age=30,
emergency_contact=model(
name="Jane",
age=28,
),
)
type_hints = get_type_hints(type_parsing, globals(), locals()) self.assertEqual(obj.name, "John")
self.assertEqual(obj.age, 30)
self.assertIsInstance(obj.emergency_contact, model)
self.assertEqual(obj.emergency_contact.name, "Jane")
self.assertEqual(obj.emergency_contact.age, 28)
self.assertIsInstance(type_hints["emergency_contact"], ForwardRef) def test_ref_type_parser_forward_ref_can_checks_validation(self):
properties = {
"title": "person",
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"emergency_contact": {
"$ref": "#",
},
},
"required": ["name", "age"],
}
""" model, type_validator = ObjectTypeParser().from_properties(
This is a example of how to resolve ForwardRef in a dynamic model: "person",
```python properties,
from typing import get_type_hints context=properties,
ref_cache={},
required=True,
)
# Make sure your dynamic model has a name # checks if when created via FowardRef the model is validated correctly.
model = type_parsing with self.assertRaises(ValueError):
model.update_forward_refs(person=model) # 👈 resolve the ForwardRef("person") model(
name="John",
age=30,
emergency_contact=model(
name="Jane",
),
)
# Inject into globals manually def test_ref_type_parser_with_ciclic_def(self):
globalns = globals().copy() properties = {
globalns['person'] = model "title": "person",
"$ref": "#/$defs/person",
"$defs": {
"person": {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"emergency_contact": {
"$ref": "#/$defs/person",
},
},
}
},
}
# Now you can get the resolved hints model, type_validator = RefTypeParser().from_properties(
type_hints = get_type_hints(model, globalns=globalns) "person",
``` properties,
Use `TypeParserOptions.ref_cache` option to cache and resolve ForwardRefs context=properties,
inside the ObjectTypeParser.to_model method. ref_cache={},
""" required=True,
)
obj = model(
name="John",
age=30,
emergency_contact=model(
name="Jane",
age=28,
),
)
self.assertEqual(obj.name, "John")
self.assertEqual(obj.age, 30)
self.assertIsInstance(obj.emergency_contact, model)
self.assertEqual(obj.emergency_contact.name, "Jane")
self.assertEqual(obj.emergency_contact.age, 28)

View File

@@ -496,3 +496,56 @@ class TestSchemaConverter(TestCase):
} }
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
SchemaConverter.build(schema) SchemaConverter.build(schema)
def test_ref_with_root_ref(self):
schema = {
"title": "Person",
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"emergency_contact": {
"$ref": "#",
},
},
"required": ["name", "age"],
}
model = SchemaConverter.build(schema)
obj = model(
name="John",
age=30,
emergency_contact=model(
name="Jane",
age=28,
),
)
self.assertEqual(obj.name, "John")
self.assertEqual(obj.age, 30)
self.assertIsInstance(obj.emergency_contact, model)
self.assertEqual(obj.emergency_contact.name, "Jane")
self.assertEqual(obj.emergency_contact.age, 28)
def test_ref_with_def(self):
schema = {
"title": "person",
"$ref": "#/$defs/person",
"$defs": {
"person": {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"emergency_contact": {
"$ref": "#/$defs/person",
},
},
}
},
}
with self.assertRaises(TypeError):
# This should raise TypeError because the root schema is not an object
SchemaConverter.build(schema)