diff --git a/jambo/parser/_type_parser.py b/jambo/parser/_type_parser.py index 20a11d4..165d9d9 100644 --- a/jambo/parser/_type_parser.py +++ b/jambo/parser/_type_parser.py @@ -2,41 +2,61 @@ from pydantic import Field, TypeAdapter from typing_extensions import Annotated, Self from abc import ABC, abstractmethod -from typing import Generic, Type, TypeVar +from typing import Any, Generic, TypeVar T = TypeVar("T") class GenericTypeParser(ABC, Generic[T]): - mapped_type: Type[T] = None - json_schema_type: str = None + type_mappings: dict[str, str] = None + default_mappings = { "default": "default", "description": "description", } - type_mappings: dict[str, str] = None + @classmethod + def type_from_properties( + cls, name: str, properties: dict[str, Any], **kwargs + ) -> tuple[type, dict]: + parser = cls._get_impl(properties) + + return parser().from_properties(name=name, properties=properties, **kwargs) @classmethod - def get_impl(cls, type_name: str) -> Self: + def _get_impl(cls, properties: dict[str, Any]) -> type[Self]: for subcls in cls.__subclasses__(): - if subcls.json_schema_type is None: - raise RuntimeError(f"Unknown type: {type_name}") + schema_type, schema_value = subcls._get_schema_type() - if subcls.json_schema_type == type_name: - return subcls() + if schema_type not in properties: + continue - raise ValueError(f"Unknown type: {type_name}") + if schema_value is None or schema_value == properties[schema_type]: + return subcls + + raise ValueError("Unknown type") + + @classmethod + def _get_schema_type(cls) -> tuple[str, str | None]: + if cls.json_schema_type is None: + raise RuntimeError("TypeParser: json_schema_type not defined") + + schema_definition = cls.json_schema_type.split(":") + + if len(schema_definition) == 1: + return schema_definition[0], None + + return schema_definition[0], schema_definition[1] @abstractmethod def from_properties( - self, name: str, properties: dict[str, any], required: bool = False + self, name: str, properties: dict[str, Any], required: bool = False ) -> tuple[T, dict]: ... - def mappings_properties_builder(self, properties, required=False) -> dict[str, any]: + def mappings_properties_builder(self, properties, required=False) -> dict[str, Any]: if self.type_mappings is None: raise NotImplementedError("Type mappings not defined") diff --git a/jambo/parser/allof_type_parser.py b/jambo/parser/allof_type_parser.py index f0b1839..0e56cd5 100644 --- a/jambo/parser/allof_type_parser.py +++ b/jambo/parser/allof_type_parser.py @@ -1,5 +1,7 @@ from jambo.parser._type_parser import GenericTypeParser +from typing import Any + class AllOfTypeParser(GenericTypeParser): mapped_type = any @@ -7,35 +9,42 @@ class AllOfTypeParser(GenericTypeParser): json_schema_type = "allOf" def from_properties(self, name, properties, required=False): - subProperties = properties.get("allOf") - if not subProperties: - raise ValueError("Invalid JSON Schema: 'allOf' is not specified.") + sub_properties = properties.get("allOf", []) - _mapped_type = properties.get("type") - if _mapped_type is None: - _mapped_type = subProperties[0].get("type") + root_type = properties.get("type") + if root_type is not None: + for sub_property in sub_properties: + sub_property["type"] = root_type - if _mapped_type is None: - raise ValueError("Invalid JSON Schema: 'type' is not specified.") + parser = self._get_type_parser(sub_properties) - if any( - [prop.get("type", _mapped_type) != _mapped_type for prop in subProperties] - ): - raise ValueError("Invalid JSON Schema: allOf types do not match.") - - for subProperty in subProperties: - # If a sub-property has not defined a type, we need to set it to the top-level type - subProperty["type"] = _mapped_type - - combined_properties = self._rebuild_properties_from_subproperties(subProperties) - - return GenericTypeParser.get_impl(_mapped_type).from_properties( - name, combined_properties + combined_properties = self._rebuild_properties_from_subproperties( + sub_properties ) - def _rebuild_properties_from_subproperties(self, subProperties): + return parser().from_properties(name, combined_properties) + + @staticmethod + def _get_type_parser( + sub_properties: list[dict[str, Any]], + ) -> type[GenericTypeParser]: + if not sub_properties: + raise ValueError("Invalid JSON Schema: 'allOf' is empty.") + + parsers = set( + GenericTypeParser._get_impl(sub_property) for sub_property in sub_properties + ) + if len(parsers) != 1: + raise ValueError("Invalid JSON Schema: allOf types do not match.") + + return parsers.pop() + + @staticmethod + def _rebuild_properties_from_subproperties( + sub_properties: list[dict[str, Any]], + ) -> dict[str, Any]: properties = {} - for subProperty in subProperties: + for subProperty in sub_properties: for name, prop in subProperty.items(): if name not in properties: properties[name] = prop diff --git a/jambo/parser/anyof_type_parser.py b/jambo/parser/anyof_type_parser.py index b70591e..ca65e8d 100644 --- a/jambo/parser/anyof_type_parser.py +++ b/jambo/parser/anyof_type_parser.py @@ -23,9 +23,7 @@ class AnyOfTypeParser(GenericTypeParser): subProperties = properties["anyOf"] sub_types = [ - GenericTypeParser.get_impl(subProperty["type"]).from_properties( - name, subProperty - ) + GenericTypeParser.type_from_properties(name, subProperty) for subProperty in subProperties ] diff --git a/jambo/parser/array_type_parser.py b/jambo/parser/array_type_parser.py index 5ec162c..bdda4ce 100644 --- a/jambo/parser/array_type_parser.py +++ b/jambo/parser/array_type_parser.py @@ -10,7 +10,7 @@ V = TypeVar("V") class ArrayTypeParser(GenericTypeParser): mapped_type = list - json_schema_type = "array" + json_schema_type = "type:array" default_mappings = {"description": "description"} @@ -20,9 +20,9 @@ class ArrayTypeParser(GenericTypeParser): } def from_properties(self, name, properties, required=False): - _item_type, _item_args = GenericTypeParser.get_impl( - properties["items"]["type"] - ).from_properties(name, properties["items"], required=True) + _item_type, _item_args = GenericTypeParser.type_from_properties( + name, properties["items"], required=True + ) wrapper_type = set if properties.get("uniqueItems", False) else list field_type = wrapper_type[_item_type] diff --git a/jambo/parser/boolean_type_parser.py b/jambo/parser/boolean_type_parser.py index 384da9d..f2ae257 100644 --- a/jambo/parser/boolean_type_parser.py +++ b/jambo/parser/boolean_type_parser.py @@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser class BooleanTypeParser(GenericTypeParser): mapped_type = bool - json_schema_type = "boolean" + json_schema_type = "type:boolean" type_mappings = { "default": "default", diff --git a/jambo/parser/float_type_parser.py b/jambo/parser/float_type_parser.py index 565f69e..f5ab7e3 100644 --- a/jambo/parser/float_type_parser.py +++ b/jambo/parser/float_type_parser.py @@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser class FloatTypeParser(GenericTypeParser): mapped_type = float - json_schema_type = "number" + json_schema_type = "type:number" type_mappings = { "minimum": "ge", diff --git a/jambo/parser/int_type_parser.py b/jambo/parser/int_type_parser.py index 2a352bb..7041047 100644 --- a/jambo/parser/int_type_parser.py +++ b/jambo/parser/int_type_parser.py @@ -4,7 +4,7 @@ from jambo.parser._type_parser import GenericTypeParser class IntTypeParser(GenericTypeParser): mapped_type = int - json_schema_type = "integer" + json_schema_type = "type:integer" type_mappings = { "minimum": "ge", diff --git a/jambo/parser/object_type_parser.py b/jambo/parser/object_type_parser.py index f7c9f6a..827afa7 100644 --- a/jambo/parser/object_type_parser.py +++ b/jambo/parser/object_type_parser.py @@ -1,16 +1,22 @@ from jambo.parser._type_parser import GenericTypeParser +from pydantic import Field, create_model +from pydantic.main import ModelT + +from typing import Any + class ObjectTypeParser(GenericTypeParser): mapped_type = object - json_schema_type = "object" + json_schema_type = "type:object" - @staticmethod - def from_properties(name, properties, required=False): - from jambo.schema_converter import SchemaConverter - - type_parsing = SchemaConverter.build_object(name, properties) + def from_properties( + self, name: str, properties: dict[str, Any], required: bool = False + ): + type_parsing = self.to_model( + name, properties.get("properties", {}), properties.get("required", []) + ) type_properties = {} if "default" in properties: @@ -19,3 +25,32 @@ class ObjectTypeParser(GenericTypeParser): ) return type_parsing, type_properties + + def to_model( + self, name: str, schema: dict[str, Any], required_keys: list[str], **kwargs + ) -> type[ModelT]: + """ + Converts JSON Schema object properties to a Pydantic model. + :param name: The name of the model. + :param properties: The properties of the JSON Schema object. + :param required_keys: List of required keys in the schema. + :return: A Pydantic model class. + """ + fields = self._parse_properties(schema, required_keys, **kwargs) + return create_model(name, **fields) + + @staticmethod + def _parse_properties( + properties: dict[str, Any], required_keys: list[str], **kwargs + ) -> dict[str, tuple[type, Field]]: + required_keys = required_keys or [] + + fields = {} + for name, prop in properties.items(): + is_required = name in required_keys + parsed_type, parsed_properties = GenericTypeParser.type_from_properties( + name, prop, required=is_required, **kwargs + ) + fields[name] = (parsed_type, Field(**parsed_properties)) + + return fields diff --git a/jambo/parser/string_type_parser.py b/jambo/parser/string_type_parser.py index cf4c41c..e0aa02b 100644 --- a/jambo/parser/string_type_parser.py +++ b/jambo/parser/string_type_parser.py @@ -8,7 +8,7 @@ from datetime import date, datetime, time class StringTypeParser(GenericTypeParser): mapped_type = str - json_schema_type = "string" + json_schema_type = "type:string" type_mappings = { "maxLength": "max_length", diff --git a/jambo/schema_converter.py b/jambo/schema_converter.py index 5254760..0926358 100644 --- a/jambo/schema_converter.py +++ b/jambo/schema_converter.py @@ -1,10 +1,8 @@ -from jambo.parser import GenericTypeParser +from jambo.parser import ObjectTypeParser from jambo.types.json_schema_type import JSONSchema from jsonschema.exceptions import SchemaError from jsonschema.validators import validator_for -from pydantic import create_model -from pydantic.fields import Field from pydantic.main import ModelT @@ -24,22 +22,6 @@ class SchemaConverter: :param schema: The JSON Schema to convert. :return: A Pydantic model class. """ - if "title" not in schema: - raise ValueError("JSON Schema must have a title.") - - return SchemaConverter.build_object(schema["title"], schema) - - @staticmethod - def build_object( - name: str, - schema: JSONSchema, - ) -> type[ModelT]: - """ - Converts a JSON Schema object to a Pydantic model given a name. - :param name: - :param schema: - :return: - """ try: validator = validator_for(schema) @@ -47,50 +29,14 @@ class SchemaConverter: except SchemaError as e: raise ValueError(f"Invalid JSON Schema: {e}") + if "title" not in schema: + raise ValueError("JSON Schema must have a title.") + if schema["type"] != "object": raise TypeError( f"Invalid JSON Schema: {schema['type']}. Only 'object' can be converted to Pydantic models." ) - return SchemaConverter._build_model_from_properties( - name, schema["properties"], schema.get("required", []) + return ObjectTypeParser().to_model( + schema["title"], schema.get("properties"), schema.get("required") ) - - @staticmethod - def _build_model_from_properties( - model_name: str, model_properties: dict, required_keys: list[str] - ) -> type[ModelT]: - properties = SchemaConverter._parse_properties(model_properties, required_keys) - - return create_model(model_name, **properties) - - @staticmethod - def _parse_properties( - properties: dict, required_keys=None - ) -> dict[str, tuple[type, Field]]: - required_keys = required_keys or [] - - fields = {} - for name, prop in properties.items(): - is_required = name in required_keys - fields[name] = SchemaConverter._build_field(name, prop, is_required) - - return fields - - @staticmethod - def _build_field(name, properties: dict, required=False) -> tuple[type, Field]: - match properties: - case {"anyOf": _}: - _field_type = "anyOf" - case {"allOf": _}: - _field_type = "allOf" - case {"type": _}: - _field_type = properties["type"] - case _: - raise ValueError(f"Invalid JSON Schema: {properties}") - - _field_type, _field_args = GenericTypeParser.get_impl( - _field_type - ).from_properties(name, properties, required) - - return _field_type, Field(**_field_args) diff --git a/tests/parser/test_type_parser.py b/tests/parser/test_type_parser.py index 38bb6a1..794f4a4 100644 --- a/tests/parser/test_type_parser.py +++ b/tests/parser/test_type_parser.py @@ -3,29 +3,34 @@ from jambo.parser._type_parser import GenericTypeParser from unittest import TestCase -class InvalidGenericTypeParser(GenericTypeParser): - mapped_type = str - json_schema_type = "invalid" - - def from_properties( - self, name: str, properties: dict[str, any], required: bool = False - ): ... - - class TestGenericTypeParser(TestCase): + def setUp(self): + class InvalidGenericTypeParser(GenericTypeParser): + mapped_type = str + json_schema_type = "type:invalid" + + def from_properties( + self, name: str, properties: dict[str, any], required: bool = False + ): ... + + self.InvalidGenericTypeParser = InvalidGenericTypeParser + + def tearDown(self): + del self.InvalidGenericTypeParser + def test_invalid_get_impl(self): # Assuming GenericTypeParser is imported from the module with self.assertRaises(ValueError): - GenericTypeParser.get_impl("another_invalid_type") + GenericTypeParser._get_impl({"type": "another_invalid_type"}) def test_invalid_json_schema_type(self): - InvalidGenericTypeParser.json_schema_type = None + self.InvalidGenericTypeParser.json_schema_type = None # This is more for the developer's sanity check with self.assertRaises(RuntimeError): - GenericTypeParser.get_impl("another_invalid_type") + GenericTypeParser._get_impl({"type": "another_invalid_type"}) def test_invalid_mappings_properties_builder(self): - parser = InvalidGenericTypeParser() + parser = self.InvalidGenericTypeParser() with self.assertRaises(NotImplementedError): parser.mappings_properties_builder({}, required=False) diff --git a/tests/test_schema_converter.py b/tests/test_schema_converter.py index c2f5395..57894ba 100644 --- a/tests/test_schema_converter.py +++ b/tests/test_schema_converter.py @@ -24,20 +24,6 @@ class TestSchemaConverter(TestCase): with self.assertRaises(ValueError): SchemaConverter.build(schema) - def test_build_expects_valid_schema(self): - invalid_schema = { - "type": "object", - "properties": { - "name": { - "type": "strng" - } # typo: "strng" is not a valid JSON Schema type - }, - "required": ["name"], - } - - with self.assertRaises(ValueError): - SchemaConverter.build_object("placeholder", invalid_schema) - def test_build_expects_object(self): schema = { "title": "Person", @@ -61,8 +47,9 @@ class TestSchemaConverter(TestCase): # 'required': ['name', 'age', 'is_active', 'friends', 'address'], } - with self.assertRaises(ValueError): + with self.assertRaises(ValueError) as context: SchemaConverter.build(schema) + self.assertTrue("Unknown type" in str(context.exception)) def test_jsonschema_to_pydantic(self): schema = {