openapi-core/openapi_core/schema/schemas/models.py

543 lines
21 KiB
Python
Raw Normal View History

2018-04-17 12:18:40 +00:00
"""OpenAPI core schemas models module"""
2018-08-24 14:57:41 +00:00
import attr
import functools
2017-09-21 11:51:37 +00:00
import logging
2019-02-26 16:49:25 +00:00
from base64 import b64decode, b64encode
2017-09-21 11:51:37 +00:00
from collections import defaultdict
2018-08-22 10:51:06 +00:00
from datetime import date, datetime
2019-02-26 16:49:25 +00:00
from uuid import UUID
import re
2017-10-17 13:23:26 +00:00
import warnings
2018-08-17 14:54:01 +00:00
from six import iteritems, integer_types, binary_type, text_type
2017-09-21 11:51:37 +00:00
2018-04-17 12:18:40 +00:00
from openapi_core.extensions.models.factories import ModelFactory
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
2018-04-18 10:39:03 +00:00
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaValue, UndefinedSchemaProperty, MissingSchemaProperty,
2018-08-05 12:40:34 +00:00
OpenAPISchemaError, NoOneOfSchema, MultipleOneOfSchema, NoValidSchema,
2018-09-12 16:43:31 +00:00
UndefinedItemsSchema, InvalidCustomFormatSchemaValue, InvalidSchemaProperty,
2018-04-18 10:39:03 +00:00
)
2018-08-22 10:51:06 +00:00
from openapi_core.schema.schemas.util import (
forcebool, format_date, format_datetime,
)
2018-08-17 14:54:01 +00:00
from openapi_core.schema.schemas.validators import (
TypeValidator, AttributeValidator,
)
2017-09-21 11:51:37 +00:00
log = logging.getLogger(__name__)
2018-09-05 11:39:10 +00:00
2018-08-24 14:57:41 +00:00
@attr.s
2018-09-05 11:39:10 +00:00
class Format(object):
unmarshal = attr.ib()
2018-08-24 14:57:41 +00:00
validate = attr.ib()
2018-02-28 12:01:05 +00:00
2017-09-21 11:51:37 +00:00
class Schema(object):
"""Represents an OpenAPI Schema."""
2018-04-17 12:18:40 +00:00
DEFAULT_CAST_CALLABLE_GETTER = {
SchemaType.INTEGER: int,
SchemaType.NUMBER: float,
SchemaType.BOOLEAN: forcebool,
}
2018-08-24 14:57:41 +00:00
STRING_FORMAT_CALLABLE_GETTER = {
2018-09-05 11:39:10 +00:00
SchemaFormat.NONE: Format(text_type, TypeValidator(text_type)),
2019-02-26 16:49:25 +00:00
SchemaFormat.DATE: Format(
format_date, TypeValidator(date, exclude=datetime)),
2018-09-05 11:39:10 +00:00
SchemaFormat.DATETIME: Format(format_datetime, TypeValidator(datetime)),
SchemaFormat.BINARY: Format(binary_type, TypeValidator(binary_type)),
SchemaFormat.UUID: Format(UUID, TypeValidator(UUID)),
2019-02-26 16:49:25 +00:00
SchemaFormat.BYTE: Format(b64decode, TypeValidator(binary_type)),
2018-08-22 10:51:06 +00:00
}
2018-08-22 09:05:15 +00:00
TYPE_VALIDATOR_CALLABLE_GETTER = {
SchemaType.ANY: lambda x: True,
2018-08-17 14:54:01 +00:00
SchemaType.BOOLEAN: TypeValidator(bool),
SchemaType.INTEGER: TypeValidator(integer_types, exclude=bool),
SchemaType.NUMBER: TypeValidator(integer_types, float, exclude=bool),
2018-08-22 10:51:06 +00:00
SchemaType.STRING: TypeValidator(
text_type, date, datetime, binary_type, UUID),
2018-08-17 14:54:01 +00:00
SchemaType.ARRAY: TypeValidator(list, tuple),
2018-08-22 09:05:15 +00:00
SchemaType.OBJECT: AttributeValidator('__dict__'),
2018-08-17 14:54:01 +00:00
}
2017-09-21 11:51:37 +00:00
def __init__(
2018-04-04 10:26:21 +00:00
self, schema_type=None, model=None, properties=None, items=None,
2017-11-14 13:36:05 +00:00
schema_format=None, required=None, default=None, nullable=False,
2018-05-30 10:15:17 +00:00
enum=None, deprecated=False, all_of=None, one_of=None,
additional_properties=True, min_items=None, max_items=None,
min_length=None, max_length=None, pattern=None, unique_items=False,
minimum=None, maximum=None, multiple_of=None,
exclusive_minimum=False, exclusive_maximum=False,
min_properties=None, max_properties=None):
2018-08-02 18:30:51 +00:00
self.type = SchemaType(schema_type)
self.model = model
2017-09-21 11:51:37 +00:00
self.properties = properties and dict(properties) or {}
self.items = items
2018-05-30 08:41:34 +00:00
self.format = schema_format
2017-11-06 16:50:00 +00:00
self.required = required or []
2017-09-25 14:15:00 +00:00
self.default = default
2017-10-17 13:02:21 +00:00
self.nullable = nullable
2017-10-17 13:23:26 +00:00
self.enum = enum
2017-10-17 13:33:46 +00:00
self.deprecated = deprecated
2017-11-06 16:50:00 +00:00
self.all_of = all_of and list(all_of) or []
2018-05-25 15:32:09 +00:00
self.one_of = one_of and list(one_of) or []
2018-05-30 10:15:17 +00:00
self.additional_properties = additional_properties
self.min_items = int(min_items) if min_items is not None else None
self.max_items = int(max_items) if max_items is not None else None
self.min_length = int(min_length) if min_length is not None else None
self.max_length = int(max_length) if max_length is not None else None
self.pattern = pattern and re.compile(pattern) or None
self.unique_items = unique_items
self.minimum = int(minimum) if minimum is not None else None
self.maximum = int(maximum) if maximum is not None else None
self.multiple_of = int(multiple_of)\
if multiple_of is not None else None
self.exclusive_minimum = exclusive_minimum
self.exclusive_maximum = exclusive_maximum
self.min_properties = int(min_properties)\
if min_properties is not None else None
self.max_properties = int(max_properties)\
if max_properties is not None else None
2018-05-25 15:32:09 +00:00
self._all_required_properties_cache = None
self._all_optional_properties_cache = None
2017-09-21 11:51:37 +00:00
def __getitem__(self, name):
return self.properties[name]
2017-11-06 16:50:00 +00:00
def get_all_properties(self):
properties = self.properties.copy()
for subschema in self.all_of:
subschema_props = subschema.get_all_properties()
properties.update(subschema_props)
return properties
2018-05-25 15:32:09 +00:00
def get_all_properties_names(self):
all_properties = self.get_all_properties()
return set(all_properties.keys())
def get_all_required_properties(self):
2018-05-25 15:32:09 +00:00
if self._all_required_properties_cache is None:
self._all_required_properties_cache =\
self._get_all_required_properties()
return self._all_required_properties_cache
def _get_all_required_properties(self):
all_properties = self.get_all_properties()
required = self.get_all_required_properties_names()
return dict(
(prop_name, val)
2018-08-05 12:40:34 +00:00
for prop_name, val in iteritems(all_properties)
2018-05-25 15:32:09 +00:00
if prop_name in required
)
def get_all_required_properties_names(self):
2018-07-15 21:22:44 +00:00
required = self.required[:]
for subschema in self.all_of:
subschema_req = subschema.get_all_required_properties()
required += subschema_req
2018-05-25 15:32:09 +00:00
return set(required)
2018-09-05 11:39:10 +00:00
def get_cast_mapping(self, custom_formatters=None):
pass_defaults = lambda f: functools.partial(
f, custom_formatters=custom_formatters)
2018-04-17 12:18:40 +00:00
mapping = self.DEFAULT_CAST_CALLABLE_GETTER.copy()
mapping.update({
2018-09-05 11:39:10 +00:00
SchemaType.STRING: pass_defaults(self._unmarshal_string),
SchemaType.ANY: pass_defaults(self._unmarshal_any),
SchemaType.ARRAY: pass_defaults(self._unmarshal_collection),
SchemaType.OBJECT: pass_defaults(self._unmarshal_object),
})
2017-09-21 11:51:37 +00:00
return defaultdict(lambda: lambda x: x, mapping)
2018-09-05 11:39:10 +00:00
def cast(self, value, custom_formatters=None):
2017-09-21 11:51:37 +00:00
"""Cast value to schema type"""
if value is None:
2017-10-17 13:02:21 +00:00
if not self.nullable:
raise InvalidSchemaValue("Null value for non-nullable schema", value, self.type)
2017-10-17 13:02:21 +00:00
return self.default
2017-09-21 11:51:37 +00:00
2018-09-05 11:39:10 +00:00
cast_mapping = self.get_cast_mapping(custom_formatters=custom_formatters)
2017-09-21 11:51:37 +00:00
if self.type is not SchemaType.STRING and value == '':
2017-09-21 11:51:37 +00:00
return None
cast_callable = cast_mapping[self.type]
try:
return cast_callable(value)
except ValueError:
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue(
"Failed to cast value {value} to type {type}", value, self.type)
2017-09-21 11:51:37 +00:00
2018-08-24 14:57:41 +00:00
def unmarshal(self, value, custom_formatters=None):
2017-09-21 11:51:37 +00:00
"""Unmarshal parameter from the value."""
2017-10-17 13:33:46 +00:00
if self.deprecated:
2018-08-05 12:40:34 +00:00
warnings.warn("The schema is deprecated", DeprecationWarning)
2018-09-05 11:39:10 +00:00
casted = self.cast(value, custom_formatters=custom_formatters)
2017-09-21 11:51:37 +00:00
if casted is None and not self.required:
return None
2017-10-17 13:23:26 +00:00
if self.enum and casted not in self.enum:
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue(
"Value {value} not in enum choices: {type}", value, self.enum)
2017-10-17 13:23:26 +00:00
2017-09-21 11:51:37 +00:00
return casted
2018-09-05 11:39:10 +00:00
def _unmarshal_string(self, value, custom_formatters=None):
2018-08-22 10:51:06 +00:00
try:
schema_format = SchemaFormat(self.format)
except ValueError:
msg = "Unsupported format {type} unmarshalling for value {value}"
2018-09-05 11:39:10 +00:00
if custom_formatters is not None:
formatstring = custom_formatters.get(self.format)
2018-08-24 14:57:41 +00:00
if formatstring is None:
raise InvalidSchemaValue(msg, value, self.format)
2018-08-24 14:57:41 +00:00
else:
raise InvalidSchemaValue(msg, value, self.format)
2018-08-22 10:51:06 +00:00
else:
2018-08-24 14:57:41 +00:00
formatstring = self.STRING_FORMAT_CALLABLE_GETTER[schema_format]
2018-08-22 10:51:06 +00:00
try:
2018-09-05 11:39:10 +00:00
return formatstring.unmarshal(value)
except ValueError as exc:
raise InvalidCustomFormatSchemaValue(
"Failed to format value {value} to format {type}: {exception}", value, self.format, exc)
2018-09-05 11:39:10 +00:00
def _unmarshal_any(self, value, custom_formatters=None):
2018-08-05 12:40:34 +00:00
types_resolve_order = [
SchemaType.OBJECT, SchemaType.ARRAY, SchemaType.BOOLEAN,
SchemaType.INTEGER, SchemaType.NUMBER, SchemaType.STRING,
]
cast_mapping = self.get_cast_mapping()
for schema_type in types_resolve_order:
cast_callable = cast_mapping[schema_type]
try:
return cast_callable(value)
# @todo: remove ValueError when validation separated
except (OpenAPISchemaError, TypeError, ValueError):
continue
raise NoValidSchema(value)
2018-08-05 12:40:34 +00:00
2018-09-05 11:39:10 +00:00
def _unmarshal_collection(self, value, custom_formatters=None):
2018-08-05 12:40:34 +00:00
if self.items is None:
raise UndefinedItemsSchema(self.type)
2018-08-05 12:40:34 +00:00
2018-09-05 11:39:10 +00:00
f = functools.partial(self.items.unmarshal,
custom_formatters=custom_formatters)
2018-08-24 14:57:41 +00:00
return list(map(f, value))
2018-09-05 11:39:10 +00:00
def _unmarshal_object(self, value, model_factory=None,
custom_formatters=None):
2018-04-23 18:50:29 +00:00
if not isinstance(value, (dict, )):
raise InvalidSchemaValue("Value {value} is not of type {type}", value, self.type)
2018-08-21 17:33:24 +00:00
model_factory = model_factory or ModelFactory()
2018-05-25 15:32:09 +00:00
if self.one_of:
properties = None
for one_of_schema in self.one_of:
try:
found_props = self._unmarshal_properties(
2018-09-05 11:39:10 +00:00
value, one_of_schema, custom_formatters=custom_formatters)
2018-05-25 15:32:09 +00:00
except OpenAPISchemaError:
pass
else:
if properties is not None:
raise MultipleOneOfSchema(self.type)
2018-05-25 15:32:09 +00:00
properties = found_props
if properties is None:
raise NoOneOfSchema(self.type)
2018-05-25 15:32:09 +00:00
else:
2018-09-05 11:39:10 +00:00
properties = self._unmarshal_properties(
value, custom_formatters=custom_formatters)
2018-05-25 15:32:09 +00:00
2018-08-21 17:33:24 +00:00
return model_factory.create(properties, name=self.model)
2017-09-25 14:15:00 +00:00
2018-09-05 11:39:10 +00:00
def _unmarshal_properties(self, value, one_of_schema=None,
custom_formatters=None):
2018-05-25 15:32:09 +00:00
all_props = self.get_all_properties()
all_props_names = self.get_all_properties_names()
all_req_props_names = self.get_all_required_properties_names()
2017-09-25 14:15:00 +00:00
2018-05-25 15:32:09 +00:00
if one_of_schema is not None:
all_props.update(one_of_schema.get_all_properties())
all_props_names |= one_of_schema.\
get_all_properties_names()
all_req_props_names |= one_of_schema.\
get_all_required_properties_names()
value_props_names = value.keys()
extra_props = set(value_props_names) - set(all_props_names)
if extra_props and self.additional_properties is False:
raise UndefinedSchemaProperty(extra_props)
2017-09-25 14:15:00 +00:00
properties = {}
if self.additional_properties is not True:
for prop_name in extra_props:
prop_value = value[prop_name]
properties[prop_name] = self.additional_properties.unmarshal(
prop_value, custom_formatters=custom_formatters)
2018-05-30 10:15:17 +00:00
2018-05-25 15:32:09 +00:00
for prop_name, prop in iteritems(all_props):
2017-09-25 14:15:00 +00:00
try:
prop_value = value[prop_name]
except KeyError:
2018-05-25 15:32:09 +00:00
if prop_name in all_req_props_names:
raise MissingSchemaProperty(prop_name)
2017-10-17 13:02:21 +00:00
if not prop.nullable and not prop.default:
continue
2017-09-25 14:15:00 +00:00
prop_value = prop.default
2018-09-12 16:43:31 +00:00
try:
properties[prop_name] = prop.unmarshal(
prop_value, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidSchemaProperty(prop_name, exc)
2018-08-21 17:33:24 +00:00
2018-09-05 11:39:10 +00:00
self._validate_properties(properties, one_of_schema=one_of_schema,
custom_formatters=custom_formatters)
2018-08-21 17:33:24 +00:00
2018-05-25 15:32:09 +00:00
return properties
2018-08-17 14:54:01 +00:00
2018-08-21 17:33:24 +00:00
def get_validator_mapping(self):
2018-08-22 09:05:15 +00:00
mapping = {
2018-08-22 09:29:39 +00:00
SchemaType.ARRAY: self._validate_collection,
2018-08-22 10:51:06 +00:00
SchemaType.STRING: self._validate_string,
2018-08-21 17:33:24 +00:00
SchemaType.OBJECT: self._validate_object,
SchemaType.INTEGER: self._validate_number,
SchemaType.NUMBER: self._validate_number,
2018-08-22 09:05:15 +00:00
}
2018-08-21 17:33:24 +00:00
2018-09-05 11:39:10 +00:00
def default(x, **kw):
return x
return defaultdict(lambda: default, mapping)
2018-08-21 17:33:24 +00:00
2018-09-05 11:39:10 +00:00
def validate(self, value, custom_formatters=None):
2018-08-17 14:54:01 +00:00
if value is None:
if not self.nullable:
raise InvalidSchemaValue("Null value for non-nullable schema of type {type}", value, self.type)
2018-08-22 09:05:15 +00:00
return
2018-08-17 14:54:01 +00:00
2018-08-22 09:05:15 +00:00
# type validation
type_validator_callable = self.TYPE_VALIDATOR_CALLABLE_GETTER[
self.type]
if not type_validator_callable(value):
2018-08-17 14:54:01 +00:00
raise InvalidSchemaValue(
"Value {value} not valid type {type}", value, self.type.value)
2018-08-17 14:54:01 +00:00
2018-08-22 09:05:15 +00:00
# structure validation
validator_mapping = self.get_validator_mapping()
validator_callable = validator_mapping[self.type]
2018-09-05 11:39:10 +00:00
validator_callable(value, custom_formatters=custom_formatters)
2018-08-22 09:05:15 +00:00
2018-08-17 14:54:01 +00:00
return value
2018-08-21 17:33:24 +00:00
2018-09-05 11:39:10 +00:00
def _validate_collection(self, value, custom_formatters=None):
2018-08-22 09:29:39 +00:00
if self.items is None:
raise UndefinedItemsSchema(self.type)
2018-08-22 09:29:39 +00:00
if self.min_items is not None:
if self.min_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" minItems must be non-negative"
)
if len(value) < self.min_items:
raise InvalidSchemaValue(
"Value must contain at least {type} item(s),"
" {value} found", len(value), self.min_items)
if self.max_items is not None:
if self.max_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" maxItems must be non-negative"
)
if len(value) > self.max_items:
raise InvalidSchemaValue(
"Value must contain at most {value} item(s),"
" {type} found", len(value), self.max_items)
if self.unique_items and len(set(value)) != len(value):
raise OpenAPISchemaError("Value may not contain duplicate items")
2018-09-05 11:39:10 +00:00
f = functools.partial(self.items.validate,
custom_formatters=custom_formatters)
return list(map(f, value))
2018-08-22 09:29:39 +00:00
def _validate_number(self, value, custom_formatters=None):
if self.minimum is not None:
if self.exclusive_minimum and value <= self.minimum:
raise InvalidSchemaValue(
"Value {value} is not less than or equal to {type}", value, self.minimum)
elif value < self.minimum:
raise InvalidSchemaValue(
"Value {value} is not less than {type}", value, self.minimum)
if self.maximum is not None:
if self.exclusive_maximum and value >= self.maximum:
raise InvalidSchemaValue(
"Value {value} is not greater than or equal to {type}", value, self.maximum)
elif value > self.maximum:
raise InvalidSchemaValue(
"Value {value} is not greater than {type}", value, self.maximum)
if self.multiple_of is not None and value % self.multiple_of:
raise InvalidSchemaValue(
"Value {value} is not a multiple of {type}",
value, self.multiple_of)
2018-09-05 11:39:10 +00:00
def _validate_string(self, value, custom_formatters=None):
2018-08-22 10:51:06 +00:00
try:
schema_format = SchemaFormat(self.format)
except ValueError:
2018-08-24 14:57:41 +00:00
msg = "Unsupported {0} format validation".format(self.format)
2018-09-05 11:39:10 +00:00
if custom_formatters is not None:
formatstring = custom_formatters.get(self.format)
2018-08-24 14:57:41 +00:00
if formatstring is None:
raise OpenAPISchemaError(msg)
else:
raise OpenAPISchemaError(msg)
2018-08-22 10:51:06 +00:00
else:
2018-08-24 14:57:41 +00:00
formatstring =\
self.STRING_FORMAT_CALLABLE_GETTER[schema_format]
2018-08-22 10:51:06 +00:00
2018-08-24 14:57:41 +00:00
if not formatstring.validate(value):
2018-08-22 10:51:06 +00:00
raise InvalidSchemaValue(
"Value {value} not valid format {type}", value, self.format)
2018-08-22 10:51:06 +00:00
if self.min_length is not None:
if self.min_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" minLength must be non-negative"
)
if len(value) < self.min_length:
raise InvalidSchemaValue(
"Value is shorter ({value}) than the minimum length of {type}",
len(value), self.min_length
)
if self.max_length is not None:
if self.max_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" maxLength must be non-negative"
)
if len(value) > self.max_length:
raise InvalidSchemaValue(
"Value is longer ({value}) than the maximum length of {type}",
len(value), self.max_length
)
if self.pattern is not None and not self.pattern.search(value):
raise InvalidSchemaValue(
"Value {value} does not match the pattern {type}",
value, self.pattern.pattern
)
2018-08-22 10:51:06 +00:00
return True
2018-09-05 11:39:10 +00:00
def _validate_object(self, value, custom_formatters=None):
2018-08-21 17:33:24 +00:00
properties = value.__dict__
if self.one_of:
valid_one_of_schema = None
for one_of_schema in self.one_of:
try:
2018-09-05 11:39:10 +00:00
self._validate_properties(
properties, one_of_schema,
custom_formatters=custom_formatters)
2018-08-21 17:33:24 +00:00
except OpenAPISchemaError:
pass
else:
if valid_one_of_schema is not None:
raise MultipleOneOfSchema(self.type)
2018-08-21 17:33:24 +00:00
valid_one_of_schema = True
if valid_one_of_schema is None:
raise NoOneOfSchema(self.type)
2018-08-21 17:33:24 +00:00
else:
2018-09-05 11:39:10 +00:00
self._validate_properties(properties,
custom_formatters=custom_formatters)
2018-08-21 17:33:24 +00:00
if self.min_properties is not None:
if self.min_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" minProperties must be non-negative"
)
if len(properties) < self.min_properties:
raise InvalidSchemaValue(
"Value must contain at least {type} properties,"
" {value} found", len(properties), self.min_properties
)
if self.max_properties is not None:
if self.max_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" maxProperties must be non-negative"
)
if len(properties) > self.max_properties:
raise InvalidSchemaValue(
"Value must contain at most {type} properties,"
" {value} found", len(properties), self.max_properties
)
2018-08-21 17:33:24 +00:00
return True
2018-09-05 11:39:10 +00:00
def _validate_properties(self, value, one_of_schema=None,
custom_formatters=None):
2018-08-21 17:33:24 +00:00
all_props = self.get_all_properties()
all_props_names = self.get_all_properties_names()
all_req_props_names = self.get_all_required_properties_names()
if one_of_schema is not None:
all_props.update(one_of_schema.get_all_properties())
all_props_names |= one_of_schema.\
get_all_properties_names()
all_req_props_names |= one_of_schema.\
get_all_required_properties_names()
value_props_names = value.keys()
extra_props = set(value_props_names) - set(all_props_names)
if extra_props and self.additional_properties is False:
raise UndefinedSchemaProperty(extra_props)
2018-08-21 17:33:24 +00:00
if self.additional_properties is not True:
for prop_name in extra_props:
prop_value = value[prop_name]
self.additional_properties.validate(
prop_value, custom_formatters=custom_formatters)
2018-08-21 17:33:24 +00:00
for prop_name, prop in iteritems(all_props):
try:
prop_value = value[prop_name]
except KeyError:
if prop_name in all_req_props_names:
raise MissingSchemaProperty(prop_name)
2018-08-21 17:33:24 +00:00
if not prop.nullable and not prop.default:
continue
prop_value = prop.default
2018-09-12 16:43:31 +00:00
try:
prop.validate(prop_value, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidSchemaProperty(prop_name, original_exception=exc)
2018-08-21 17:33:24 +00:00
return True