openapi-core/openapi_core/schema/schemas/models.py

541 lines
20 KiB
Python
Raw Permalink Normal View History

2018-04-17 12:18:40 +00:00
"""OpenAPI core schemas models module"""
2017-09-21 11:51:37 +00:00
import logging
from collections import defaultdict
2018-08-22 10:51:06 +00:00
from datetime import date, datetime
import re
2017-10-17 13:23:26 +00:00
import warnings
2018-08-17 14:54:01 +00:00
from six import iteritems, integer_types, binary_type, text_type
2017-09-21 11:51:37 +00:00
2018-04-17 12:18:40 +00:00
from openapi_core.extensions.models.factories import ModelFactory
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
2018-04-18 10:39:03 +00:00
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaValue, UndefinedSchemaProperty, MissingSchemaProperty,
2018-08-05 12:40:34 +00:00
OpenAPISchemaError, NoOneOfSchema, MultipleOneOfSchema, NoValidSchema,
UndefinedItemsSchema,
2018-04-18 10:39:03 +00:00
)
2018-08-22 10:51:06 +00:00
from openapi_core.schema.schemas.util import (
forcebool, format_date, format_datetime,
)
2018-08-17 14:54:01 +00:00
from openapi_core.schema.schemas.validators import (
TypeValidator, AttributeValidator,
)
2017-09-21 11:51:37 +00:00
log = logging.getLogger(__name__)
2018-02-28 12:01:05 +00:00
2017-09-21 11:51:37 +00:00
class Schema(object):
"""Represents an OpenAPI Schema."""
2018-04-17 12:18:40 +00:00
DEFAULT_CAST_CALLABLE_GETTER = {
SchemaType.INTEGER: int,
SchemaType.NUMBER: float,
SchemaType.BOOLEAN: forcebool,
}
2018-08-22 10:51:06 +00:00
STRING_FORMAT_CAST_CALLABLE_GETTER = {
SchemaFormat.NONE: text_type,
SchemaFormat.DATE: format_date,
SchemaFormat.DATETIME: format_datetime,
SchemaFormat.BINARY: binary_type,
}
STRING_FORMAT_VALIDATOR_CALLABLE_GETTER = {
SchemaFormat.NONE: TypeValidator(text_type),
SchemaFormat.DATE: TypeValidator(date, exclude=datetime),
SchemaFormat.DATETIME: TypeValidator(datetime),
SchemaFormat.BINARY: TypeValidator(binary_type),
}
2018-08-22 09:05:15 +00:00
TYPE_VALIDATOR_CALLABLE_GETTER = {
2018-08-05 12:40:34 +00:00
SchemaType.ANY: lambda x: x,
2018-08-17 14:54:01 +00:00
SchemaType.BOOLEAN: TypeValidator(bool),
SchemaType.INTEGER: TypeValidator(integer_types, exclude=bool),
SchemaType.NUMBER: TypeValidator(integer_types, float, exclude=bool),
2018-08-22 10:51:06 +00:00
SchemaType.STRING: TypeValidator(
text_type, date, datetime, binary_type),
2018-08-17 14:54:01 +00:00
SchemaType.ARRAY: TypeValidator(list, tuple),
2018-08-22 09:05:15 +00:00
SchemaType.OBJECT: AttributeValidator('__dict__'),
2018-08-17 14:54:01 +00:00
}
2017-09-21 11:51:37 +00:00
def __init__(
2018-04-04 10:26:21 +00:00
self, schema_type=None, model=None, properties=None, items=None,
2017-11-14 13:36:05 +00:00
schema_format=None, required=None, default=None, nullable=False,
2018-05-30 10:15:17 +00:00
enum=None, deprecated=False, all_of=None, one_of=None,
additional_properties=None, min_items=None, max_items=None,
min_length=None, max_length=None, pattern=None, unique_items=False,
minimum=None, maximum=None, multiple_of=None,
exclusive_minimum=False, exclusive_maximum=False,
min_properties=None, max_properties=None):
2018-08-02 18:30:51 +00:00
self.type = SchemaType(schema_type)
self.model = model
2017-09-21 11:51:37 +00:00
self.properties = properties and dict(properties) or {}
self.items = items
2018-05-30 08:41:34 +00:00
self.format = schema_format
2017-11-06 16:50:00 +00:00
self.required = required or []
2017-09-25 14:15:00 +00:00
self.default = default
2017-10-17 13:02:21 +00:00
self.nullable = nullable
2017-10-17 13:23:26 +00:00
self.enum = enum
2017-10-17 13:33:46 +00:00
self.deprecated = deprecated
2017-11-06 16:50:00 +00:00
self.all_of = all_of and list(all_of) or []
2018-05-25 15:32:09 +00:00
self.one_of = one_of and list(one_of) or []
2018-05-30 10:15:17 +00:00
self.additional_properties = additional_properties
self.min_items = int(min_items) if min_items is not None else None
self.max_items = int(max_items) if max_items is not None else None
self.min_length = int(min_length) if min_length is not None else None
self.max_length = int(max_length) if max_length is not None else None
self.pattern = pattern and re.compile(pattern) or None
self.unique_items = unique_items
self.minimum = int(minimum) if minimum is not None else None
self.maximum = int(maximum) if maximum is not None else None
self.multiple_of = int(multiple_of)\
if multiple_of is not None else None
self.exclusive_minimum = exclusive_minimum
self.exclusive_maximum = exclusive_maximum
self.min_properties = int(min_properties)\
if min_properties is not None else None
self.max_properties = int(max_properties)\
if max_properties is not None else None
2018-05-25 15:32:09 +00:00
self._all_required_properties_cache = None
self._all_optional_properties_cache = None
2017-09-21 11:51:37 +00:00
def __getitem__(self, name):
return self.properties[name]
2017-11-06 16:50:00 +00:00
def get_all_properties(self):
properties = self.properties.copy()
for subschema in self.all_of:
subschema_props = subschema.get_all_properties()
properties.update(subschema_props)
return properties
2018-05-25 15:32:09 +00:00
def get_all_properties_names(self):
all_properties = self.get_all_properties()
return set(all_properties.keys())
def get_all_required_properties(self):
2018-05-25 15:32:09 +00:00
if self._all_required_properties_cache is None:
self._all_required_properties_cache =\
self._get_all_required_properties()
return self._all_required_properties_cache
def _get_all_required_properties(self):
all_properties = self.get_all_properties()
required = self.get_all_required_properties_names()
return dict(
(prop_name, val)
2018-08-05 12:40:34 +00:00
for prop_name, val in iteritems(all_properties)
2018-05-25 15:32:09 +00:00
if prop_name in required
)
def get_all_required_properties_names(self):
2018-07-15 21:22:44 +00:00
required = self.required[:]
for subschema in self.all_of:
subschema_req = subschema.get_all_required_properties()
required += subschema_req
2018-05-25 15:32:09 +00:00
return set(required)
2017-09-21 11:51:37 +00:00
def get_cast_mapping(self):
2018-04-17 12:18:40 +00:00
mapping = self.DEFAULT_CAST_CALLABLE_GETTER.copy()
mapping.update({
SchemaType.STRING: self._unmarshal_string,
2018-08-05 12:40:34 +00:00
SchemaType.ANY: self._unmarshal_any,
2017-11-14 13:36:05 +00:00
SchemaType.ARRAY: self._unmarshal_collection,
SchemaType.OBJECT: self._unmarshal_object,
})
2017-09-21 11:51:37 +00:00
return defaultdict(lambda: lambda x: x, mapping)
def cast(self, value):
"""Cast value to schema type"""
if value is None:
2017-10-17 13:02:21 +00:00
if not self.nullable:
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue("Null value for non-nullable schema")
2017-10-17 13:02:21 +00:00
return self.default
2017-09-21 11:51:37 +00:00
cast_mapping = self.get_cast_mapping()
if self.type is not SchemaType.STRING and value == '':
2017-09-21 11:51:37 +00:00
return None
cast_callable = cast_mapping[self.type]
try:
return cast_callable(value)
except ValueError:
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue(
2017-11-03 11:18:48 +00:00
"Failed to cast value of {0} to {1}".format(value, self.type)
2017-09-21 11:51:37 +00:00
)
def unmarshal(self, value):
"""Unmarshal parameter from the value."""
2017-10-17 13:33:46 +00:00
if self.deprecated:
2018-08-05 12:40:34 +00:00
warnings.warn("The schema is deprecated", DeprecationWarning)
2017-09-21 11:51:37 +00:00
casted = self.cast(value)
if casted is None and not self.required:
return None
2017-10-17 13:23:26 +00:00
if self.enum and casted not in self.enum:
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue(
2017-11-03 11:18:48 +00:00
"Value of {0} not in enum choices: {1}".format(
value, self.enum)
2017-10-17 13:23:26 +00:00
)
2017-09-21 11:51:37 +00:00
return casted
def _unmarshal_string(self, value):
2018-08-22 10:51:06 +00:00
try:
schema_format = SchemaFormat(self.format)
except ValueError:
# @todo: implement custom format unmarshalling support
raise OpenAPISchemaError(
"Unsupported {0} format unmarshalling".format(self.format)
)
else:
formatter = self.STRING_FORMAT_CAST_CALLABLE_GETTER[schema_format]
try:
return formatter(value)
except ValueError:
raise InvalidSchemaValue(
"Failed to format value of {0} to {1}".format(
value, self.format)
)
2018-08-05 12:40:34 +00:00
def _unmarshal_any(self, value):
types_resolve_order = [
SchemaType.OBJECT, SchemaType.ARRAY, SchemaType.BOOLEAN,
SchemaType.INTEGER, SchemaType.NUMBER, SchemaType.STRING,
]
cast_mapping = self.get_cast_mapping()
for schema_type in types_resolve_order:
cast_callable = cast_mapping[schema_type]
try:
return cast_callable(value)
# @todo: remove ValueError when validation separated
except (OpenAPISchemaError, TypeError, ValueError):
continue
raise NoValidSchema(
"No valid schema found for value {0}".format(value))
def _unmarshal_collection(self, value):
2018-08-05 12:40:34 +00:00
if self.items is None:
raise UndefinedItemsSchema("Undefined items' schema")
return list(map(self.items.unmarshal, value))
2018-08-21 17:33:24 +00:00
def _unmarshal_object(self, value, model_factory=None):
2018-04-23 18:50:29 +00:00
if not isinstance(value, (dict, )):
2018-04-18 10:39:03 +00:00
raise InvalidSchemaValue(
2018-08-21 17:33:24 +00:00
"Value of {0} not a dict".format(value))
model_factory = model_factory or ModelFactory()
2018-05-25 15:32:09 +00:00
if self.one_of:
properties = None
for one_of_schema in self.one_of:
try:
found_props = self._unmarshal_properties(
value, one_of_schema)
except OpenAPISchemaError:
pass
else:
if properties is not None:
raise MultipleOneOfSchema(
"Exactly one schema should be valid,"
"multiple found")
properties = found_props
if properties is None:
raise NoOneOfSchema(
"Exactly one valid schema should be valid, None found.")
else:
properties = self._unmarshal_properties(value)
2018-08-21 17:33:24 +00:00
return model_factory.create(properties, name=self.model)
2017-09-25 14:15:00 +00:00
2018-05-25 15:32:09 +00:00
def _unmarshal_properties(self, value, one_of_schema=None):
all_props = self.get_all_properties()
all_props_names = self.get_all_properties_names()
all_req_props_names = self.get_all_required_properties_names()
2017-09-25 14:15:00 +00:00
2018-05-25 15:32:09 +00:00
if one_of_schema is not None:
all_props.update(one_of_schema.get_all_properties())
all_props_names |= one_of_schema.\
get_all_properties_names()
all_req_props_names |= one_of_schema.\
get_all_required_properties_names()
value_props_names = value.keys()
extra_props = set(value_props_names) - set(all_props_names)
2018-05-30 10:15:17 +00:00
if extra_props and self.additional_properties is None:
2017-09-25 14:15:00 +00:00
raise UndefinedSchemaProperty(
"Undefined properties in schema: {0}".format(extra_props))
properties = {}
2018-05-30 10:15:17 +00:00
for prop_name in extra_props:
prop_value = value[prop_name]
properties[prop_name] = self.additional_properties.unmarshal(
prop_value)
2018-05-25 15:32:09 +00:00
for prop_name, prop in iteritems(all_props):
2017-09-25 14:15:00 +00:00
try:
prop_value = value[prop_name]
except KeyError:
2018-05-25 15:32:09 +00:00
if prop_name in all_req_props_names:
2018-04-18 10:39:03 +00:00
raise MissingSchemaProperty(
2017-09-25 14:15:00 +00:00
"Missing schema property {0}".format(prop_name))
2017-10-17 13:02:21 +00:00
if not prop.nullable and not prop.default:
continue
2017-09-25 14:15:00 +00:00
prop_value = prop.default
properties[prop_name] = prop.unmarshal(prop_value)
2018-08-21 17:33:24 +00:00
self._validate_properties(properties, one_of_schema=one_of_schema)
2018-05-25 15:32:09 +00:00
return properties
2018-08-17 14:54:01 +00:00
2018-08-21 17:33:24 +00:00
def get_validator_mapping(self):
2018-08-22 09:05:15 +00:00
mapping = {
2018-08-22 09:29:39 +00:00
SchemaType.ARRAY: self._validate_collection,
2018-08-22 10:51:06 +00:00
SchemaType.STRING: self._validate_string,
2018-08-21 17:33:24 +00:00
SchemaType.OBJECT: self._validate_object,
SchemaType.INTEGER: self._validate_number,
SchemaType.NUMBER: self._validate_number,
2018-08-22 09:05:15 +00:00
}
2018-08-21 17:33:24 +00:00
return defaultdict(lambda: lambda x: x, mapping)
2018-08-17 14:54:01 +00:00
def validate(self, value):
if value is None:
if not self.nullable:
raise InvalidSchemaValue("Null value for non-nullable schema")
2018-08-22 09:05:15 +00:00
return
2018-08-17 14:54:01 +00:00
2018-08-22 09:05:15 +00:00
# type validation
type_validator_callable = self.TYPE_VALIDATOR_CALLABLE_GETTER[
self.type]
if not type_validator_callable(value):
2018-08-17 14:54:01 +00:00
raise InvalidSchemaValue(
"Value of {0} not valid type of {1}".format(
value, self.type.value)
)
2018-08-22 09:05:15 +00:00
# structure validation
validator_mapping = self.get_validator_mapping()
validator_callable = validator_mapping[self.type]
validator_callable(value)
2018-08-17 14:54:01 +00:00
return value
2018-08-21 17:33:24 +00:00
2018-08-22 09:29:39 +00:00
def _validate_collection(self, value):
if self.items is None:
raise OpenAPISchemaError("Schema for collection not defined")
if self.min_items is not None:
if self.min_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" minItems must be non-negative"
)
if len(value) < self.min_items:
raise InvalidSchemaValue(
"Value must contain at least {0} item(s),"
" {1} found".format(
self.min_items, len(value))
)
if self.max_items is not None:
if self.max_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" maxItems must be non-negative"
)
if len(value) > self.max_items:
raise InvalidSchemaValue(
"Value must contain at most {0} item(s),"
" {1} found".format(
self.max_items, len(value))
)
if self.unique_items and len(set(value)) != len(value):
raise InvalidSchemaValue("Value may not contain duplicate items")
2018-08-22 09:29:39 +00:00
return list(map(self.items.validate, value))
def _validate_number(self, value):
if self.minimum is not None:
if self.exclusive_minimum and value <= self.minimum:
raise InvalidSchemaValue(
"Value {0} is not less than or equal to {1}".format(
value, self.minimum)
)
elif value < self.minimum:
raise InvalidSchemaValue(
"Value {0} is not less than {1}".format(
value, self.minimum)
)
if self.maximum is not None:
if self.exclusive_maximum and value >= self.maximum:
raise InvalidSchemaValue(
"Value {0} is not greater than or equal to {1}".format(
value, self.maximum)
)
elif value > self.maximum:
raise InvalidSchemaValue(
"Value {0} is not greater than {1}".format(
value, self.maximum)
)
if self.multiple_of is not None and value % self.multiple_of:
raise InvalidSchemaValue(
"Value {0} is not a multiple of {1}".format(
value, self.multiple_of)
)
2018-08-22 10:51:06 +00:00
def _validate_string(self, value):
try:
schema_format = SchemaFormat(self.format)
except ValueError:
# @todo: implement custom format validation support
raise OpenAPISchemaError(
"Unsupported {0} format validation".format(self.format)
)
else:
format_validator_callable =\
self.STRING_FORMAT_VALIDATOR_CALLABLE_GETTER[schema_format]
if not format_validator_callable(value):
raise InvalidSchemaValue(
"Value of {0} not valid format of {1}".format(
value, self.format)
)
if self.min_length is not None:
if self.min_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" minLength must be non-negative"
)
if len(value) < self.min_length:
raise InvalidSchemaValue(
"Value is shorter than the minimum length of {0}".format(
self.min_length)
)
if self.max_length is not None:
if self.max_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" maxLength must be non-negative"
)
if len(value) > self.max_length:
raise InvalidSchemaValue(
"Value is longer than the maximum length of {0}".format(
self.max_length)
)
if self.pattern is not None and not self.pattern.search(value):
raise InvalidSchemaValue(
"Value {0} does not match the pattern {1}".format(
value, self.pattern.pattern)
)
2018-08-22 10:51:06 +00:00
return True
2018-08-21 17:33:24 +00:00
def _validate_object(self, value):
properties = value.__dict__
if self.one_of:
valid_one_of_schema = None
for one_of_schema in self.one_of:
try:
self._validate_properties(properties, one_of_schema)
except OpenAPISchemaError:
pass
else:
if valid_one_of_schema is not None:
raise MultipleOneOfSchema(
"Exactly one schema should be valid,"
"multiple found")
valid_one_of_schema = True
if valid_one_of_schema is None:
raise NoOneOfSchema(
"Exactly one valid schema should be valid, None found.")
else:
self._validate_properties(properties)
if self.min_properties is not None:
if self.min_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" minProperties must be non-negative"
)
if len(properties) < self.min_properties:
raise InvalidSchemaValue(
"Value must contain at least {0} properties,"
" {1} found".format(
self.min_properties, len(properties))
)
if self.max_properties is not None:
if self.max_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" maxProperties must be non-negative"
)
if len(properties) > self.max_properties:
raise InvalidSchemaValue(
"Value must contain at most {0} properties,"
" {1} found".format(
self.max_properties, len(properties))
)
2018-08-21 17:33:24 +00:00
return True
def _validate_properties(self, value, one_of_schema=None):
all_props = self.get_all_properties()
all_props_names = self.get_all_properties_names()
all_req_props_names = self.get_all_required_properties_names()
if one_of_schema is not None:
all_props.update(one_of_schema.get_all_properties())
all_props_names |= one_of_schema.\
get_all_properties_names()
all_req_props_names |= one_of_schema.\
get_all_required_properties_names()
value_props_names = value.keys()
extra_props = set(value_props_names) - set(all_props_names)
if extra_props and self.additional_properties is None:
raise UndefinedSchemaProperty(
"Undefined properties in schema: {0}".format(extra_props))
for prop_name in extra_props:
prop_value = value[prop_name]
self.additional_properties.validate(
prop_value)
for prop_name, prop in iteritems(all_props):
try:
prop_value = value[prop_name]
except KeyError:
if prop_name in all_req_props_names:
raise MissingSchemaProperty(
"Missing schema property {0}".format(prop_name))
if not prop.nullable and not prop.default:
continue
prop_value = prop.default
prop.validate(prop_value)
return True