Better Object Internal Structure and Type Selection
This commit is contained in:
@@ -2,41 +2,61 @@ from pydantic import Field, TypeAdapter
|
||||
from typing_extensions import Annotated, Self
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Generic, Type, TypeVar
|
||||
from typing import Any, Generic, TypeVar
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class GenericTypeParser(ABC, Generic[T]):
|
||||
mapped_type: Type[T] = None
|
||||
|
||||
json_schema_type: str = None
|
||||
|
||||
type_mappings: dict[str, str] = None
|
||||
|
||||
default_mappings = {
|
||||
"default": "default",
|
||||
"description": "description",
|
||||
}
|
||||
|
||||
type_mappings: dict[str, str] = None
|
||||
@classmethod
|
||||
def type_from_properties(
|
||||
cls, name: str, properties: dict[str, Any], **kwargs
|
||||
) -> tuple[type, dict]:
|
||||
parser = cls._get_impl(properties)
|
||||
|
||||
return parser().from_properties(name=name, properties=properties, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def get_impl(cls, type_name: str) -> Self:
|
||||
def _get_impl(cls, properties: dict[str, Any]) -> type[Self]:
|
||||
for subcls in cls.__subclasses__():
|
||||
if subcls.json_schema_type is None:
|
||||
raise RuntimeError(f"Unknown type: {type_name}")
|
||||
schema_type, schema_value = subcls._get_schema_type()
|
||||
|
||||
if subcls.json_schema_type == type_name:
|
||||
return subcls()
|
||||
if schema_type not in properties:
|
||||
continue
|
||||
|
||||
raise ValueError(f"Unknown type: {type_name}")
|
||||
if schema_value is None or schema_value == properties[schema_type]:
|
||||
return subcls
|
||||
|
||||
raise ValueError("Unknown type")
|
||||
|
||||
@classmethod
|
||||
def _get_schema_type(cls) -> tuple[str, str | None]:
|
||||
if cls.json_schema_type is None:
|
||||
raise RuntimeError("TypeParser: json_schema_type not defined")
|
||||
|
||||
schema_definition = cls.json_schema_type.split(":")
|
||||
|
||||
if len(schema_definition) == 1:
|
||||
return schema_definition[0], None
|
||||
|
||||
return schema_definition[0], schema_definition[1]
|
||||
|
||||
@abstractmethod
|
||||
def from_properties(
|
||||
self, name: str, properties: dict[str, any], required: bool = False
|
||||
self, name: str, properties: dict[str, Any], required: bool = False
|
||||
) -> tuple[T, dict]: ...
|
||||
|
||||
def mappings_properties_builder(self, properties, required=False) -> dict[str, any]:
|
||||
def mappings_properties_builder(self, properties, required=False) -> dict[str, Any]:
|
||||
if self.type_mappings is None:
|
||||
raise NotImplementedError("Type mappings not defined")
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from jambo.parser._type_parser import GenericTypeParser
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class AllOfTypeParser(GenericTypeParser):
|
||||
mapped_type = any
|
||||
@@ -7,35 +9,42 @@ class AllOfTypeParser(GenericTypeParser):
|
||||
json_schema_type = "allOf"
|
||||
|
||||
def from_properties(self, name, properties, required=False):
|
||||
subProperties = properties.get("allOf")
|
||||
if not subProperties:
|
||||
raise ValueError("Invalid JSON Schema: 'allOf' is not specified.")
|
||||
sub_properties = properties.get("allOf", [])
|
||||
|
||||
_mapped_type = properties.get("type")
|
||||
if _mapped_type is None:
|
||||
_mapped_type = subProperties[0].get("type")
|
||||
root_type = properties.get("type")
|
||||
if root_type is not None:
|
||||
for sub_property in sub_properties:
|
||||
sub_property["type"] = root_type
|
||||
|
||||
if _mapped_type is None:
|
||||
raise ValueError("Invalid JSON Schema: 'type' is not specified.")
|
||||
parser = self._get_type_parser(sub_properties)
|
||||
|
||||
if any(
|
||||
[prop.get("type", _mapped_type) != _mapped_type for prop in subProperties]
|
||||
):
|
||||
raise ValueError("Invalid JSON Schema: allOf types do not match.")
|
||||
|
||||
for subProperty in subProperties:
|
||||
# If a sub-property has not defined a type, we need to set it to the top-level type
|
||||
subProperty["type"] = _mapped_type
|
||||
|
||||
combined_properties = self._rebuild_properties_from_subproperties(subProperties)
|
||||
|
||||
return GenericTypeParser.get_impl(_mapped_type).from_properties(
|
||||
name, combined_properties
|
||||
combined_properties = self._rebuild_properties_from_subproperties(
|
||||
sub_properties
|
||||
)
|
||||
|
||||
def _rebuild_properties_from_subproperties(self, subProperties):
|
||||
return parser().from_properties(name, combined_properties)
|
||||
|
||||
@staticmethod
|
||||
def _get_type_parser(
|
||||
sub_properties: list[dict[str, Any]],
|
||||
) -> type[GenericTypeParser]:
|
||||
if not sub_properties:
|
||||
raise ValueError("Invalid JSON Schema: 'allOf' is empty.")
|
||||
|
||||
parsers = set(
|
||||
GenericTypeParser._get_impl(sub_property) for sub_property in sub_properties
|
||||
)
|
||||
if len(parsers) != 1:
|
||||
raise ValueError("Invalid JSON Schema: allOf types do not match.")
|
||||
|
||||
return parsers.pop()
|
||||
|
||||
@staticmethod
|
||||
def _rebuild_properties_from_subproperties(
|
||||
sub_properties: list[dict[str, Any]],
|
||||
) -> dict[str, Any]:
|
||||
properties = {}
|
||||
for subProperty in subProperties:
|
||||
for subProperty in sub_properties:
|
||||
for name, prop in subProperty.items():
|
||||
if name not in properties:
|
||||
properties[name] = prop
|
||||
|
||||
@@ -23,9 +23,7 @@ class AnyOfTypeParser(GenericTypeParser):
|
||||
subProperties = properties["anyOf"]
|
||||
|
||||
sub_types = [
|
||||
GenericTypeParser.get_impl(subProperty["type"]).from_properties(
|
||||
name, subProperty
|
||||
)
|
||||
GenericTypeParser.type_from_properties(name, subProperty)
|
||||
for subProperty in subProperties
|
||||
]
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ V = TypeVar("V")
|
||||
class ArrayTypeParser(GenericTypeParser):
|
||||
mapped_type = list
|
||||
|
||||
json_schema_type = "array"
|
||||
json_schema_type = "type:array"
|
||||
|
||||
default_mappings = {"description": "description"}
|
||||
|
||||
@@ -20,9 +20,9 @@ class ArrayTypeParser(GenericTypeParser):
|
||||
}
|
||||
|
||||
def from_properties(self, name, properties, required=False):
|
||||
_item_type, _item_args = GenericTypeParser.get_impl(
|
||||
properties["items"]["type"]
|
||||
).from_properties(name, properties["items"], required=True)
|
||||
_item_type, _item_args = GenericTypeParser.type_from_properties(
|
||||
name, properties["items"], required=True
|
||||
)
|
||||
|
||||
wrapper_type = set if properties.get("uniqueItems", False) else list
|
||||
field_type = wrapper_type[_item_type]
|
||||
|
||||
@@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser
|
||||
class BooleanTypeParser(GenericTypeParser):
|
||||
mapped_type = bool
|
||||
|
||||
json_schema_type = "boolean"
|
||||
json_schema_type = "type:boolean"
|
||||
|
||||
type_mappings = {
|
||||
"default": "default",
|
||||
|
||||
@@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser
|
||||
class FloatTypeParser(GenericTypeParser):
|
||||
mapped_type = float
|
||||
|
||||
json_schema_type = "number"
|
||||
json_schema_type = "type:number"
|
||||
|
||||
type_mappings = {
|
||||
"minimum": "ge",
|
||||
|
||||
@@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser
|
||||
class IntTypeParser(GenericTypeParser):
|
||||
mapped_type = int
|
||||
|
||||
json_schema_type = "integer"
|
||||
json_schema_type = "type:integer"
|
||||
|
||||
type_mappings = {
|
||||
"minimum": "ge",
|
||||
|
||||
@@ -1,16 +1,22 @@
|
||||
from jambo.parser._type_parser import GenericTypeParser
|
||||
|
||||
from pydantic import Field, create_model
|
||||
from pydantic.main import ModelT
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ObjectTypeParser(GenericTypeParser):
|
||||
mapped_type = object
|
||||
|
||||
json_schema_type = "object"
|
||||
json_schema_type = "type:object"
|
||||
|
||||
@staticmethod
|
||||
def from_properties(name, properties, required=False):
|
||||
from jambo.schema_converter import SchemaConverter
|
||||
|
||||
type_parsing = SchemaConverter.build_object(name, properties)
|
||||
def from_properties(
|
||||
self, name: str, properties: dict[str, Any], required: bool = False
|
||||
):
|
||||
type_parsing = self.to_model(
|
||||
name, properties.get("properties", {}), properties.get("required", [])
|
||||
)
|
||||
type_properties = {}
|
||||
|
||||
if "default" in properties:
|
||||
@@ -19,3 +25,32 @@ class ObjectTypeParser(GenericTypeParser):
|
||||
)
|
||||
|
||||
return type_parsing, type_properties
|
||||
|
||||
def to_model(
|
||||
self, name: str, schema: dict[str, Any], required_keys: list[str], **kwargs
|
||||
) -> type[ModelT]:
|
||||
"""
|
||||
Converts JSON Schema object properties to a Pydantic model.
|
||||
:param name: The name of the model.
|
||||
:param properties: The properties of the JSON Schema object.
|
||||
:param required_keys: List of required keys in the schema.
|
||||
:return: A Pydantic model class.
|
||||
"""
|
||||
fields = self._parse_properties(schema, required_keys, **kwargs)
|
||||
return create_model(name, **fields)
|
||||
|
||||
@staticmethod
|
||||
def _parse_properties(
|
||||
properties: dict[str, Any], required_keys: list[str], **kwargs
|
||||
) -> dict[str, tuple[type, Field]]:
|
||||
required_keys = required_keys or []
|
||||
|
||||
fields = {}
|
||||
for name, prop in properties.items():
|
||||
is_required = name in required_keys
|
||||
parsed_type, parsed_properties = GenericTypeParser.type_from_properties(
|
||||
name, prop, required=is_required, **kwargs
|
||||
)
|
||||
fields[name] = (parsed_type, Field(**parsed_properties))
|
||||
|
||||
return fields
|
||||
|
||||
@@ -8,7 +8,7 @@ from datetime import date, datetime, time
|
||||
class StringTypeParser(GenericTypeParser):
|
||||
mapped_type = str
|
||||
|
||||
json_schema_type = "string"
|
||||
json_schema_type = "type:string"
|
||||
|
||||
type_mappings = {
|
||||
"maxLength": "max_length",
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
from jambo.parser import GenericTypeParser
|
||||
from jambo.parser import ObjectTypeParser
|
||||
from jambo.types.json_schema_type import JSONSchema
|
||||
|
||||
from jsonschema.exceptions import SchemaError
|
||||
from jsonschema.validators import validator_for
|
||||
from pydantic import create_model
|
||||
from pydantic.fields import Field
|
||||
from pydantic.main import ModelT
|
||||
|
||||
|
||||
@@ -24,22 +22,6 @@ class SchemaConverter:
|
||||
:param schema: The JSON Schema to convert.
|
||||
:return: A Pydantic model class.
|
||||
"""
|
||||
if "title" not in schema:
|
||||
raise ValueError("JSON Schema must have a title.")
|
||||
|
||||
return SchemaConverter.build_object(schema["title"], schema)
|
||||
|
||||
@staticmethod
|
||||
def build_object(
|
||||
name: str,
|
||||
schema: JSONSchema,
|
||||
) -> type[ModelT]:
|
||||
"""
|
||||
Converts a JSON Schema object to a Pydantic model given a name.
|
||||
:param name:
|
||||
:param schema:
|
||||
:return:
|
||||
"""
|
||||
|
||||
try:
|
||||
validator = validator_for(schema)
|
||||
@@ -47,50 +29,14 @@ class SchemaConverter:
|
||||
except SchemaError as e:
|
||||
raise ValueError(f"Invalid JSON Schema: {e}")
|
||||
|
||||
if "title" not in schema:
|
||||
raise ValueError("JSON Schema must have a title.")
|
||||
|
||||
if schema["type"] != "object":
|
||||
raise TypeError(
|
||||
f"Invalid JSON Schema: {schema['type']}. Only 'object' can be converted to Pydantic models."
|
||||
)
|
||||
|
||||
return SchemaConverter._build_model_from_properties(
|
||||
name, schema["properties"], schema.get("required", [])
|
||||
return ObjectTypeParser().to_model(
|
||||
schema["title"], schema.get("properties"), schema.get("required")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_model_from_properties(
|
||||
model_name: str, model_properties: dict, required_keys: list[str]
|
||||
) -> type[ModelT]:
|
||||
properties = SchemaConverter._parse_properties(model_properties, required_keys)
|
||||
|
||||
return create_model(model_name, **properties)
|
||||
|
||||
@staticmethod
|
||||
def _parse_properties(
|
||||
properties: dict, required_keys=None
|
||||
) -> dict[str, tuple[type, Field]]:
|
||||
required_keys = required_keys or []
|
||||
|
||||
fields = {}
|
||||
for name, prop in properties.items():
|
||||
is_required = name in required_keys
|
||||
fields[name] = SchemaConverter._build_field(name, prop, is_required)
|
||||
|
||||
return fields
|
||||
|
||||
@staticmethod
|
||||
def _build_field(name, properties: dict, required=False) -> tuple[type, Field]:
|
||||
match properties:
|
||||
case {"anyOf": _}:
|
||||
_field_type = "anyOf"
|
||||
case {"allOf": _}:
|
||||
_field_type = "allOf"
|
||||
case {"type": _}:
|
||||
_field_type = properties["type"]
|
||||
case _:
|
||||
raise ValueError(f"Invalid JSON Schema: {properties}")
|
||||
|
||||
_field_type, _field_args = GenericTypeParser.get_impl(
|
||||
_field_type
|
||||
).from_properties(name, properties, required)
|
||||
|
||||
return _field_type, Field(**_field_args)
|
||||
|
||||
@@ -3,29 +3,34 @@ from jambo.parser._type_parser import GenericTypeParser
|
||||
from unittest import TestCase
|
||||
|
||||
|
||||
class InvalidGenericTypeParser(GenericTypeParser):
|
||||
mapped_type = str
|
||||
json_schema_type = "invalid"
|
||||
|
||||
def from_properties(
|
||||
self, name: str, properties: dict[str, any], required: bool = False
|
||||
): ...
|
||||
|
||||
|
||||
class TestGenericTypeParser(TestCase):
|
||||
def setUp(self):
|
||||
class InvalidGenericTypeParser(GenericTypeParser):
|
||||
mapped_type = str
|
||||
json_schema_type = "type:invalid"
|
||||
|
||||
def from_properties(
|
||||
self, name: str, properties: dict[str, any], required: bool = False
|
||||
): ...
|
||||
|
||||
self.InvalidGenericTypeParser = InvalidGenericTypeParser
|
||||
|
||||
def tearDown(self):
|
||||
del self.InvalidGenericTypeParser
|
||||
|
||||
def test_invalid_get_impl(self):
|
||||
# Assuming GenericTypeParser is imported from the module
|
||||
with self.assertRaises(ValueError):
|
||||
GenericTypeParser.get_impl("another_invalid_type")
|
||||
GenericTypeParser._get_impl({"type": "another_invalid_type"})
|
||||
|
||||
def test_invalid_json_schema_type(self):
|
||||
InvalidGenericTypeParser.json_schema_type = None
|
||||
self.InvalidGenericTypeParser.json_schema_type = None
|
||||
|
||||
# This is more for the developer's sanity check
|
||||
with self.assertRaises(RuntimeError):
|
||||
GenericTypeParser.get_impl("another_invalid_type")
|
||||
GenericTypeParser._get_impl({"type": "another_invalid_type"})
|
||||
|
||||
def test_invalid_mappings_properties_builder(self):
|
||||
parser = InvalidGenericTypeParser()
|
||||
parser = self.InvalidGenericTypeParser()
|
||||
with self.assertRaises(NotImplementedError):
|
||||
parser.mappings_properties_builder({}, required=False)
|
||||
|
||||
@@ -24,20 +24,6 @@ class TestSchemaConverter(TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
SchemaConverter.build(schema)
|
||||
|
||||
def test_build_expects_valid_schema(self):
|
||||
invalid_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "strng"
|
||||
} # typo: "strng" is not a valid JSON Schema type
|
||||
},
|
||||
"required": ["name"],
|
||||
}
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
SchemaConverter.build_object("placeholder", invalid_schema)
|
||||
|
||||
def test_build_expects_object(self):
|
||||
schema = {
|
||||
"title": "Person",
|
||||
@@ -61,8 +47,9 @@ class TestSchemaConverter(TestCase):
|
||||
# 'required': ['name', 'age', 'is_active', 'friends', 'address'],
|
||||
}
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
with self.assertRaises(ValueError) as context:
|
||||
SchemaConverter.build(schema)
|
||||
self.assertTrue("Unknown type" in str(context.exception))
|
||||
|
||||
def test_jsonschema_to_pydantic(self):
|
||||
schema = {
|
||||
|
||||
Reference in New Issue
Block a user