mirror of
https://github.com/correl/openapi-core.git
synced 2024-11-25 03:00:11 +00:00
240 lines
7 KiB
Python
240 lines
7 KiB
Python
import logging
|
|
|
|
from six import text_type, binary_type, integer_types
|
|
from six import iteritems
|
|
|
|
from openapi_core.extensions.models.factories import ModelFactory
|
|
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
|
|
from openapi_core.schema.schemas.exceptions import (
|
|
ValidateError,
|
|
)
|
|
from openapi_core.schema.schemas.types import NoValue
|
|
from openapi_core.unmarshalling.schemas.exceptions import (
|
|
UnmarshalError,
|
|
InvalidCustomFormatSchemaValue,
|
|
UnmarshallerStrictTypeError,
|
|
)
|
|
from openapi_core.unmarshalling.schemas.util import (
|
|
forcebool, format_date, format_datetime, format_byte, format_uuid,
|
|
format_number,
|
|
)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StrictUnmarshaller(object):
|
|
|
|
STRICT_TYPES = ()
|
|
|
|
def __call__(self, value, strict=True):
|
|
if strict and not self._is_strict(value):
|
|
raise UnmarshallerStrictTypeError(value, self.STRICT_TYPES)
|
|
|
|
return value
|
|
|
|
def _is_strict(self, value):
|
|
if not self.STRICT_TYPES:
|
|
return True
|
|
|
|
return isinstance(value, self.STRICT_TYPES)
|
|
|
|
|
|
class PrimitiveTypeUnmarshaller(StrictUnmarshaller):
|
|
|
|
FORMATTERS = {}
|
|
|
|
def __init__(self, formatter, schema):
|
|
self.formatter = formatter
|
|
self.schema = schema
|
|
|
|
def __call__(self, value=NoValue, strict=True):
|
|
if value is NoValue:
|
|
value = self.schema.default
|
|
if value is None:
|
|
return
|
|
value = super(PrimitiveTypeUnmarshaller, self).__call__(
|
|
value, strict=strict)
|
|
|
|
return self.format(value)
|
|
|
|
def format(self, value):
|
|
try:
|
|
return self.formatter(value)
|
|
except ValueError as exc:
|
|
raise InvalidCustomFormatSchemaValue(
|
|
value, self.schema.format, exc)
|
|
|
|
|
|
class StringUnmarshaller(PrimitiveTypeUnmarshaller):
|
|
|
|
STRICT_TYPES = (text_type, binary_type)
|
|
FORMATTERS = {
|
|
SchemaFormat.NONE: text_type,
|
|
SchemaFormat.PASSWORD: text_type,
|
|
SchemaFormat.DATE: format_date,
|
|
SchemaFormat.DATETIME: format_datetime,
|
|
SchemaFormat.BINARY: binary_type,
|
|
SchemaFormat.UUID: format_uuid,
|
|
SchemaFormat.BYTE: format_byte,
|
|
}
|
|
|
|
|
|
class IntegerUnmarshaller(PrimitiveTypeUnmarshaller):
|
|
|
|
STRICT_TYPES = integer_types
|
|
FORMATTERS = {
|
|
SchemaFormat.NONE: int,
|
|
SchemaFormat.INT32: int,
|
|
SchemaFormat.INT64: int,
|
|
}
|
|
|
|
|
|
class NumberUnmarshaller(PrimitiveTypeUnmarshaller):
|
|
|
|
STRICT_TYPES = (float, ) + integer_types
|
|
FORMATTERS = {
|
|
SchemaFormat.NONE: format_number,
|
|
SchemaFormat.FLOAT: float,
|
|
SchemaFormat.DOUBLE: float,
|
|
}
|
|
|
|
|
|
class BooleanUnmarshaller(PrimitiveTypeUnmarshaller):
|
|
|
|
STRICT_TYPES = (bool, )
|
|
FORMATTERS = {
|
|
SchemaFormat.NONE: forcebool,
|
|
}
|
|
|
|
|
|
class ComplexUnmarshaller(PrimitiveTypeUnmarshaller):
|
|
|
|
def __init__(self, formatter, schema, unmarshallers_factory):
|
|
super(ComplexUnmarshaller, self).__init__(formatter, schema)
|
|
self.unmarshallers_factory = unmarshallers_factory
|
|
|
|
|
|
class ArrayUnmarshaller(ComplexUnmarshaller):
|
|
|
|
STRICT_TYPES = (list, tuple)
|
|
FORMATTERS = {}
|
|
|
|
@property
|
|
def items_unmarshaller(self):
|
|
return self.unmarshallers_factory.create(self.schema.items)
|
|
|
|
def __call__(self, value=NoValue, strict=True):
|
|
value = super(ArrayUnmarshaller, self).__call__(value, strict=strict)
|
|
|
|
self.unmarshallers_factory.create(self.schema.items)
|
|
|
|
return list(map(self.items_unmarshaller, value))
|
|
|
|
|
|
class ObjectUnmarshaller(ComplexUnmarshaller):
|
|
|
|
STRICT_TYPES = (dict, )
|
|
FORMATTERS = {}
|
|
|
|
@property
|
|
def model_factory(self):
|
|
return ModelFactory()
|
|
|
|
def __call__(self, value=NoValue, strict=True):
|
|
value = super(ObjectUnmarshaller, self).__call__(value, strict=strict)
|
|
|
|
if self.schema.one_of:
|
|
properties = None
|
|
for one_of_schema in self.schema.one_of:
|
|
try:
|
|
unmarshalled = self._unmarshal_properties(
|
|
value, one_of_schema, strict=strict)
|
|
except (UnmarshalError, ValueError):
|
|
pass
|
|
else:
|
|
if properties is not None:
|
|
log.warning("multiple valid oneOf schemas found")
|
|
continue
|
|
properties = unmarshalled
|
|
|
|
if properties is None:
|
|
log.warning("valid oneOf schema not found")
|
|
|
|
else:
|
|
properties = self._unmarshal_properties(value)
|
|
|
|
if 'x-model' in self.schema.extensions:
|
|
extension = self.schema.extensions['x-model']
|
|
return self.model_factory.create(properties, name=extension.value)
|
|
|
|
return properties
|
|
|
|
def _unmarshal_properties(
|
|
self, value=NoValue, one_of_schema=None, strict=True):
|
|
all_props = self.schema.get_all_properties()
|
|
all_props_names = self.schema.get_all_properties_names()
|
|
|
|
if one_of_schema is not None:
|
|
all_props.update(one_of_schema.get_all_properties())
|
|
all_props_names |= one_of_schema.\
|
|
get_all_properties_names()
|
|
|
|
value_props_names = value.keys()
|
|
extra_props = set(value_props_names) - set(all_props_names)
|
|
|
|
properties = {}
|
|
if self.schema.additional_properties is not True:
|
|
for prop_name in extra_props:
|
|
prop_value = value[prop_name]
|
|
properties[prop_name] = self.unmarshallers_factory.create(
|
|
self.schema.additional_properties)(
|
|
prop_value, strict=strict)
|
|
|
|
for prop_name, prop in iteritems(all_props):
|
|
try:
|
|
prop_value = value[prop_name]
|
|
except KeyError:
|
|
if prop.default is NoValue:
|
|
continue
|
|
prop_value = prop.default
|
|
|
|
properties[prop_name] = self.unmarshallers_factory.create(
|
|
prop)(prop_value, strict=strict)
|
|
|
|
return properties
|
|
|
|
|
|
class AnyUnmarshaller(ComplexUnmarshaller):
|
|
|
|
SCHEMA_TYPES_ORDER = [
|
|
SchemaType.OBJECT, SchemaType.ARRAY, SchemaType.BOOLEAN,
|
|
SchemaType.INTEGER, SchemaType.NUMBER, SchemaType.STRING,
|
|
]
|
|
|
|
def __call__(self, value=NoValue, strict=True):
|
|
one_of_schema = self._get_one_of_schema(value)
|
|
if one_of_schema:
|
|
return self.unmarshallers_factory.create(one_of_schema)(
|
|
value, strict=strict)
|
|
|
|
for schema_type in self.SCHEMA_TYPES_ORDER:
|
|
try:
|
|
unmarshaller = self.unmarshallers_factory.create(
|
|
self.schema, type_override=schema_type)
|
|
return unmarshaller(value, strict=strict)
|
|
except (UnmarshalError, ValueError):
|
|
continue
|
|
|
|
log.warning("failed to unmarshal any type")
|
|
return value
|
|
|
|
def _get_one_of_schema(self, value):
|
|
if not self.schema.one_of:
|
|
return
|
|
for subschema in self.schema.one_of:
|
|
try:
|
|
subschema.validate(value)
|
|
except ValidateError:
|
|
continue
|
|
else:
|
|
return subschema
|