Merge pull request #157 from p1c2u/feature/validation-refactor

OAS validation with JSONSchema
This commit is contained in:
A 2019-09-13 10:13:18 +01:00 committed by GitHub
commit 4690f77c09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 743 additions and 525 deletions

View file

@ -32,22 +32,25 @@ class MediaType(object):
deserializer = self.get_dererializer()
return deserializer(value)
def unmarshal(self, value, custom_formatters=None):
def cast(self, value):
if not self.schema:
return value
try:
deserialized = self.deserialize(value)
return self.deserialize(value)
except ValueError as exc:
raise InvalidMediaTypeValue(exc)
def unmarshal(self, value, custom_formatters=None, resolver=None):
if not self.schema:
return value
try:
unmarshalled = self.schema.unmarshal(deserialized, custom_formatters=custom_formatters)
self.schema.validate(value, resolver=resolver)
except OpenAPISchemaError as exc:
raise InvalidMediaTypeValue(exc)
try:
return self.schema.validate(
unmarshalled, custom_formatters=custom_formatters)
return self.schema.unmarshal(value, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidMediaTypeValue(exc)

View file

@ -72,7 +72,7 @@ class Parameter(object):
deserializer = self.get_dererializer()
return deserializer(value)
def get_value(self, request):
def get_raw_value(self, request):
location = request.parameters[self.location.value]
if self.name not in location:
@ -89,7 +89,7 @@ class Parameter(object):
return location[self.name]
def unmarshal(self, value, custom_formatters=None):
def cast(self, value):
if self.deprecated:
warnings.warn(
"{0} parameter is deprecated".format(self.name),
@ -109,21 +109,24 @@ class Parameter(object):
raise InvalidParameterValue(self.name, exc)
try:
casted = self.schema.cast(deserialized)
return self.schema.cast(deserialized)
except OpenAPISchemaError as exc:
raise InvalidParameterValue(self.name, exc)
def unmarshal(self, value, custom_formatters=None, resolver=None):
if not self.schema:
return value
try:
self.schema.validate(value, resolver=resolver)
except OpenAPISchemaError as exc:
raise InvalidParameterValue(self.name, exc)
try:
unmarshalled = self.schema.unmarshal(
casted,
return self.schema.unmarshal(
value,
custom_formatters=custom_formatters,
strict=True,
)
except OpenAPISchemaError as exc:
raise InvalidParameterValue(self.name, exc)
try:
return self.schema.validate(
unmarshalled, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidParameterValue(self.name, exc)

View file

@ -0,0 +1,115 @@
from base64 import b64encode, b64decode
import binascii
from datetime import datetime
from uuid import UUID
from jsonschema._format import FormatChecker
from jsonschema.exceptions import FormatError
from six import binary_type, text_type, integer_types
DATETIME_HAS_STRICT_RFC3339 = False
DATETIME_HAS_ISODATE = False
DATETIME_RAISES = ()
try:
import isodate
except ImportError:
pass
else:
DATETIME_HAS_ISODATE = True
DATETIME_RAISES += (ValueError, isodate.ISO8601Error)
try:
import strict_rfc3339
except ImportError:
pass
else:
DATETIME_HAS_STRICT_RFC3339 = True
DATETIME_RAISES += (ValueError, TypeError)
class StrictFormatChecker(FormatChecker):
def check(self, instance, format):
if format not in self.checkers:
raise FormatError(
"Format checker for %r format not found" % (format, ))
return super(StrictFormatChecker, self).check(
instance, format)
oas30_format_checker = StrictFormatChecker()
@oas30_format_checker.checks('int32')
def is_int32(instance):
return isinstance(instance, integer_types)
@oas30_format_checker.checks('int64')
def is_int64(instance):
return isinstance(instance, integer_types)
@oas30_format_checker.checks('float')
def is_float(instance):
return isinstance(instance, float)
@oas30_format_checker.checks('double')
def is_double(instance):
# float has double precision in Python
# It's double in CPython and Jython
return isinstance(instance, float)
@oas30_format_checker.checks('binary')
def is_binary(instance):
return isinstance(instance, binary_type)
@oas30_format_checker.checks('byte', raises=(binascii.Error, TypeError))
def is_byte(instance):
if isinstance(instance, text_type):
instance = instance.encode()
return b64encode(b64decode(instance)) == instance
@oas30_format_checker.checks("date-time", raises=DATETIME_RAISES)
def is_datetime(instance):
if isinstance(instance, binary_type):
return False
if not isinstance(instance, text_type):
return True
if DATETIME_HAS_STRICT_RFC3339:
return strict_rfc3339.validate_rfc3339(instance)
if DATETIME_HAS_ISODATE:
return isodate.parse_datetime(instance)
return True
@oas30_format_checker.checks("date", raises=ValueError)
def is_date(instance):
if isinstance(instance, binary_type):
return False
if not isinstance(instance, text_type):
return True
return datetime.strptime(instance, "%Y-%m-%d")
@oas30_format_checker.checks("uuid", raises=AttributeError)
def is_uuid(instance):
if isinstance(instance, binary_type):
return False
if not isinstance(instance, text_type):
return True
try:
uuid_obj = UUID(instance)
except ValueError:
return False
return text_type(uuid_obj) == instance

View file

@ -0,0 +1,21 @@
from jsonschema._types import (
TypeChecker, is_any, is_array, is_bool, is_integer,
is_object, is_number,
)
from six import text_type, binary_type
def is_string(checker, instance):
return isinstance(instance, (text_type, binary_type))
oas30_type_checker = TypeChecker(
{
u"string": is_string,
u"number": is_number,
u"integer": is_integer,
u"boolean": is_bool,
u"array": is_array,
u"object": is_object,
},
)

View file

@ -0,0 +1,58 @@
from jsonschema._utils import find_additional_properties, extras_msg
from jsonschema.exceptions import ValidationError, FormatError
def type(validator, data_type, instance, schema):
if instance is None:
return
if not validator.is_type(instance, data_type):
yield ValidationError("%r is not of type %s" % (instance, data_type))
def format(validator, format, instance, schema):
if instance is None:
return
if validator.format_checker is not None:
try:
validator.format_checker.check(instance, format)
except FormatError as error:
yield ValidationError(error.message, cause=error.cause)
def items(validator, items, instance, schema):
if not validator.is_type(instance, "array"):
return
for index, item in enumerate(instance):
for error in validator.descend(item, items, path=index):
yield error
def nullable(validator, is_nullable, instance, schema):
if instance is None and not is_nullable:
yield ValidationError("None for not nullable")
def additionalProperties(validator, aP, instance, schema):
if not validator.is_type(instance, "object"):
return
extras = set(find_additional_properties(instance, schema))
if not extras:
return
if validator.is_type(aP, "object"):
for extra in extras:
for error in validator.descend(instance[extra], aP, path=extra):
yield error
elif validator.is_type(aP, "boolean"):
if not aP:
error = "Additional properties are not allowed (%s %s unexpected)"
yield ValidationError(error % extras_msg(extras))
def not_implemented(validator, value, instance, schema):
pass

View file

@ -66,22 +66,6 @@ class MissingSchemaProperty(OpenAPISchemaError):
return "Missing schema property: {0}".format(self.property_name)
@attr.s(hash=True)
class NoOneOfSchema(OpenAPISchemaError):
type = attr.ib()
def __str__(self):
return "Exactly one valid schema type {0} should be valid, None found.".format(self.type)
@attr.s(hash=True)
class MultipleOneOfSchema(OpenAPISchemaError):
type = attr.ib()
def __str__(self):
return "Exactly one schema type {0} should be valid, more than one found".format(self.type)
class UnmarshallerError(OpenAPIMappingError):
pass

View file

@ -1,9 +1,12 @@
"""OpenAPI core schemas factories module"""
import logging
from six import iteritems
from openapi_core.compat import lru_cache
from openapi_core.schema.properties.generators import PropertiesGenerator
from openapi_core.schema.schemas.models import Schema
from openapi_core.schema.schemas.types import Contribution
log = logging.getLogger(__name__)
@ -50,11 +53,11 @@ class SchemaFactory(object):
all_of = []
if all_of_spec:
all_of = map(self.create, all_of_spec)
all_of = list(map(self.create, all_of_spec))
one_of = []
if one_of_spec:
one_of = map(self.create, one_of_spec)
one_of = list(map(self.create, one_of_spec))
items = None
if items_spec:
@ -76,6 +79,7 @@ class SchemaFactory(object):
exclusive_maximum=exclusive_maximum,
exclusive_minimum=exclusive_minimum,
min_properties=min_properties, max_properties=max_properties,
_source=schema_deref,
)
@property
@ -85,3 +89,59 @@ class SchemaFactory(object):
def _create_items(self, items_spec):
return self.create(items_spec)
class SchemaDictFactory(object):
contributions = (
Contribution('type', src_prop_attr='value'),
Contribution('format'),
Contribution('properties', is_dict=True, dest_default={}),
Contribution('required', dest_default=[]),
Contribution('default'),
Contribution('nullable', dest_default=False),
Contribution('all_of', dest_prop_name='allOf', is_list=True, dest_default=[]),
Contribution('one_of', dest_prop_name='oneOf', is_list=True, dest_default=[]),
Contribution('additional_properties', dest_prop_name='additionalProperties', dest_default=True),
Contribution('min_items', dest_prop_name='minItems'),
Contribution('max_items', dest_prop_name='maxItems'),
Contribution('min_length', dest_prop_name='minLength'),
Contribution('max_length', dest_prop_name='maxLength'),
Contribution('pattern', src_prop_attr='pattern'),
Contribution('unique_items', dest_prop_name='uniqueItems', dest_default=False),
Contribution('minimum'),
Contribution('maximum'),
Contribution('multiple_of', dest_prop_name='multipleOf'),
Contribution('exclusive_minimum', dest_prop_name='exclusiveMinimum', dest_default=False),
Contribution('exclusive_maximum', dest_prop_name='exclusiveMaximum', dest_default=False),
Contribution('min_properties', dest_prop_name='minProperties'),
Contribution('max_properties', dest_prop_name='maxProperties'),
)
def create(self, schema):
schema_dict = {}
for contrib in self.contributions:
self._contribute(schema, schema_dict, contrib)
return schema_dict
def _contribute(self, schema, schema_dict, contrib):
def src_map(x):
return getattr(x, '__dict__')
src_val = getattr(schema, contrib.src_prop_name)
if src_val and contrib.src_prop_attr:
src_val = getattr(src_val, contrib.src_prop_attr)
if contrib.is_list:
src_val = list(map(src_map, src_val))
if contrib.is_dict:
src_val = dict(
(k, src_map(v))
for k, v in iteritems(src_val)
)
if src_val == contrib.dest_default:
return
dest_prop_name = contrib.dest_prop_name or contrib.src_prop_name
schema_dict[dest_prop_name] = src_val

View file

@ -9,12 +9,14 @@ import re
import warnings
from six import iteritems, integer_types, binary_type, text_type
from jsonschema.exceptions import ValidationError
from openapi_core.extensions.models.factories import ModelFactory
from openapi_core.schema.schemas._format import oas30_format_checker
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaValue, UndefinedSchemaProperty, MissingSchemaProperty,
OpenAPISchemaError, NoOneOfSchema, MultipleOneOfSchema, NoValidSchema,
OpenAPISchemaError, NoValidSchema,
UndefinedItemsSchema, InvalidCustomFormatSchemaValue, InvalidSchemaProperty,
UnmarshallerStrictTypeError,
)
@ -22,9 +24,7 @@ from openapi_core.schema.schemas.util import (
forcebool, format_date, format_datetime, format_byte, format_uuid,
format_number,
)
from openapi_core.schema.schemas.validators import (
TypeValidator, AttributeValidator,
)
from openapi_core.schema.schemas.validators import OAS30Validator
log = logging.getLogger(__name__)
@ -47,36 +47,6 @@ class Schema(object):
DEFAULT_UNMARSHAL_CALLABLE_GETTER = {
}
STRING_FORMAT_CALLABLE_GETTER = {
SchemaFormat.NONE: Format(text_type, TypeValidator(text_type)),
SchemaFormat.PASSWORD: Format(text_type, TypeValidator(text_type)),
SchemaFormat.DATE: Format(
format_date, TypeValidator(date, exclude=datetime)),
SchemaFormat.DATETIME: Format(format_datetime, TypeValidator(datetime)),
SchemaFormat.BINARY: Format(binary_type, TypeValidator(binary_type)),
SchemaFormat.UUID: Format(format_uuid, TypeValidator(UUID)),
SchemaFormat.BYTE: Format(format_byte, TypeValidator(text_type)),
}
NUMBER_FORMAT_CALLABLE_GETTER = {
SchemaFormat.NONE: Format(format_number, TypeValidator(
integer_types + (float, ), exclude=bool)),
SchemaFormat.FLOAT: Format(float, TypeValidator(float)),
SchemaFormat.DOUBLE: Format(float, TypeValidator(float)),
}
TYPE_VALIDATOR_CALLABLE_GETTER = {
SchemaType.ANY: lambda x: True,
SchemaType.BOOLEAN: TypeValidator(bool),
SchemaType.INTEGER: TypeValidator(integer_types, exclude=bool),
SchemaType.NUMBER: TypeValidator(
integer_types + (float, ), exclude=bool),
SchemaType.STRING: TypeValidator(
text_type, date, datetime, binary_type, UUID),
SchemaType.ARRAY: TypeValidator(list, tuple),
SchemaType.OBJECT: AttributeValidator('__dict__'),
}
def __init__(
self, schema_type=None, model=None, properties=None, items=None,
schema_format=None, required=None, default=None, nullable=False,
@ -85,7 +55,7 @@ class Schema(object):
min_length=None, max_length=None, pattern=None, unique_items=False,
minimum=None, maximum=None, multiple_of=None,
exclusive_minimum=False, exclusive_maximum=False,
min_properties=None, max_properties=None):
min_properties=None, max_properties=None, _source=None):
self.type = SchemaType(schema_type)
self.model = model
self.properties = properties and dict(properties) or {}
@ -119,6 +89,16 @@ class Schema(object):
self._all_required_properties_cache = None
self._all_optional_properties_cache = None
self._source = _source
@property
def __dict__(self):
return self._source or self.to_dict()
def to_dict(self):
from openapi_core.schema.schemas.factories import SchemaDictFactory
return SchemaDictFactory().create(self)
def __getitem__(self, name):
return self.properties[name]
@ -214,6 +194,18 @@ class Schema(object):
return defaultdict(lambda: lambda x: x, mapping)
def get_validator(self, resolver=None):
return OAS30Validator(
self.__dict__, resolver=resolver, format_checker=oas30_format_checker)
def validate(self, value, resolver=None):
validator = self.get_validator(resolver=resolver)
try:
return validator.validate(value)
except ValidationError:
# TODO: pass validation errors
raise InvalidSchemaValue("Value not valid for schema", value, self.type)
def unmarshal(self, value, custom_formatters=None, strict=True):
"""Unmarshal parameter from the value."""
if self.deprecated:
@ -241,10 +233,7 @@ class Schema(object):
"Value {value} is not of type {type}", value, self.type)
except ValueError:
raise InvalidSchemaValue(
"Failed to cast value {value} to type {type}", value, self.type)
if unmarshalled is None and not self.required:
return None
"Failed to unmarshal value {value} to type {type}", value, self.type)
return unmarshalled
@ -283,11 +272,12 @@ class Schema(object):
continue
else:
if result is not None:
raise MultipleOneOfSchema(self.type)
log.warning("multiple valid oneOf schemas found")
continue
result = unmarshalled
if result is None:
raise NoOneOfSchema(self.type)
log.warning("valid oneOf schema not found")
return result
else:
@ -297,19 +287,16 @@ class Schema(object):
return unmarshal_callable(value)
except UnmarshallerStrictTypeError:
continue
# @todo: remove ValueError when validation separated
except (OpenAPISchemaError, TypeError, ValueError):
except (OpenAPISchemaError, TypeError):
continue
raise NoValidSchema(value)
log.warning("failed to unmarshal any type")
return value
def _unmarshal_collection(self, value, custom_formatters=None, strict=True):
if not isinstance(value, (list, tuple)):
raise InvalidSchemaValue("Value {value} is not of type {type}", value, self.type)
if self.items is None:
raise UndefinedItemsSchema(self.type)
f = functools.partial(
self.items.unmarshal,
custom_formatters=custom_formatters, strict=strict,
@ -327,17 +314,18 @@ class Schema(object):
properties = None
for one_of_schema in self.one_of:
try:
found_props = self._unmarshal_properties(
unmarshalled = self._unmarshal_properties(
value, one_of_schema, custom_formatters=custom_formatters)
except OpenAPISchemaError:
pass
else:
if properties is not None:
raise MultipleOneOfSchema(self.type)
properties = found_props
log.warning("multiple valid oneOf schemas found")
continue
properties = unmarshalled
if properties is None:
raise NoOneOfSchema(self.type)
log.warning("valid oneOf schema not found")
else:
properties = self._unmarshal_properties(
@ -381,242 +369,8 @@ class Schema(object):
if not prop.nullable and not prop.default:
continue
prop_value = prop.default
try:
properties[prop_name] = prop.unmarshal(
prop_value, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidSchemaProperty(prop_name, exc)
self._validate_properties(properties, one_of_schema=one_of_schema,
custom_formatters=custom_formatters)
properties[prop_name] = prop.unmarshal(
prop_value, custom_formatters=custom_formatters)
return properties
def get_validator_mapping(self):
mapping = {
SchemaType.ARRAY: self._validate_collection,
SchemaType.STRING: self._validate_string,
SchemaType.OBJECT: self._validate_object,
SchemaType.INTEGER: self._validate_number,
SchemaType.NUMBER: self._validate_number,
}
def default(x, **kw):
return x
return defaultdict(lambda: default, mapping)
def validate(self, value, custom_formatters=None):
if value is None:
if not self.nullable:
raise InvalidSchemaValue("Null value for non-nullable schema of type {type}", value, self.type)
return
# type validation
type_validator_callable = self.TYPE_VALIDATOR_CALLABLE_GETTER[
self.type]
if not type_validator_callable(value):
raise InvalidSchemaValue(
"Value {value} not valid type {type}", value, self.type.value)
# structure validation
validator_mapping = self.get_validator_mapping()
validator_callable = validator_mapping[self.type]
validator_callable(value, custom_formatters=custom_formatters)
return value
def _validate_collection(self, value, custom_formatters=None):
if self.items is None:
raise UndefinedItemsSchema(self.type)
if self.min_items is not None:
if self.min_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" minItems must be non-negative"
)
if len(value) < self.min_items:
raise InvalidSchemaValue(
"Value must contain at least {type} item(s),"
" {value} found", len(value), self.min_items)
if self.max_items is not None:
if self.max_items < 0:
raise OpenAPISchemaError(
"Schema for collection invalid:"
" maxItems must be non-negative"
)
if len(value) > self.max_items:
raise InvalidSchemaValue(
"Value must contain at most {value} item(s),"
" {type} found", len(value), self.max_items)
if self.unique_items and len(set(value)) != len(value):
raise OpenAPISchemaError("Value may not contain duplicate items")
f = functools.partial(self.items.validate,
custom_formatters=custom_formatters)
return list(map(f, value))
def _validate_number(self, value, custom_formatters=None):
if self.minimum is not None:
if self.exclusive_minimum and value <= self.minimum:
raise InvalidSchemaValue(
"Value {value} is not less than or equal to {type}", value, self.minimum)
elif value < self.minimum:
raise InvalidSchemaValue(
"Value {value} is not less than {type}", value, self.minimum)
if self.maximum is not None:
if self.exclusive_maximum and value >= self.maximum:
raise InvalidSchemaValue(
"Value {value} is not greater than or equal to {type}", value, self.maximum)
elif value > self.maximum:
raise InvalidSchemaValue(
"Value {value} is not greater than {type}", value, self.maximum)
if self.multiple_of is not None and value % self.multiple_of:
raise InvalidSchemaValue(
"Value {value} is not a multiple of {type}",
value, self.multiple_of)
def _validate_string(self, value, custom_formatters=None):
try:
schema_format = SchemaFormat(self.format)
except ValueError:
msg = "Unsupported {0} format validation".format(self.format)
if custom_formatters is not None:
formatstring = custom_formatters.get(self.format)
if formatstring is None:
raise OpenAPISchemaError(msg)
else:
raise OpenAPISchemaError(msg)
else:
formatstring =\
self.STRING_FORMAT_CALLABLE_GETTER[schema_format]
if not formatstring.validate(value):
raise InvalidSchemaValue(
"Value {value} not valid format {type}", value, self.format)
if self.min_length is not None:
if self.min_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" minLength must be non-negative"
)
if len(value) < self.min_length:
raise InvalidSchemaValue(
"Value is shorter ({value}) than the minimum length of {type}",
len(value), self.min_length
)
if self.max_length is not None:
if self.max_length < 0:
raise OpenAPISchemaError(
"Schema for string invalid:"
" maxLength must be non-negative"
)
if len(value) > self.max_length:
raise InvalidSchemaValue(
"Value is longer ({value}) than the maximum length of {type}",
len(value), self.max_length
)
if self.pattern is not None and not self.pattern.search(value):
raise InvalidSchemaValue(
"Value {value} does not match the pattern {type}",
value, self.pattern.pattern
)
return True
def _validate_object(self, value, custom_formatters=None):
properties = value.__dict__
if self.one_of:
valid_one_of_schema = None
for one_of_schema in self.one_of:
try:
self._validate_properties(
properties, one_of_schema,
custom_formatters=custom_formatters)
except OpenAPISchemaError:
pass
else:
if valid_one_of_schema is not None:
raise MultipleOneOfSchema(self.type)
valid_one_of_schema = True
if valid_one_of_schema is None:
raise NoOneOfSchema(self.type)
else:
self._validate_properties(properties,
custom_formatters=custom_formatters)
if self.min_properties is not None:
if self.min_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" minProperties must be non-negative"
)
if len(properties) < self.min_properties:
raise InvalidSchemaValue(
"Value must contain at least {type} properties,"
" {value} found", len(properties), self.min_properties
)
if self.max_properties is not None:
if self.max_properties < 0:
raise OpenAPISchemaError(
"Schema for object invalid:"
" maxProperties must be non-negative"
)
if len(properties) > self.max_properties:
raise InvalidSchemaValue(
"Value must contain at most {type} properties,"
" {value} found", len(properties), self.max_properties
)
return True
def _validate_properties(self, value, one_of_schema=None,
custom_formatters=None):
all_props = self.get_all_properties()
all_props_names = self.get_all_properties_names()
all_req_props_names = self.get_all_required_properties_names()
if one_of_schema is not None:
all_props.update(one_of_schema.get_all_properties())
all_props_names |= one_of_schema.\
get_all_properties_names()
all_req_props_names |= one_of_schema.\
get_all_required_properties_names()
value_props_names = value.keys()
extra_props = set(value_props_names) - set(all_props_names)
extra_props_allowed = self.are_additional_properties_allowed(
one_of_schema)
if extra_props and not extra_props_allowed:
raise UndefinedSchemaProperty(extra_props)
if self.additional_properties is not True:
for prop_name in extra_props:
prop_value = value[prop_name]
self.additional_properties.validate(
prop_value, custom_formatters=custom_formatters)
for prop_name, prop in iteritems(all_props):
try:
prop_value = value[prop_name]
except KeyError:
if prop_name in all_req_props_names:
raise MissingSchemaProperty(prop_name)
if not prop.nullable and not prop.default:
continue
prop_value = prop.default
try:
prop.validate(prop_value, custom_formatters=custom_formatters)
except OpenAPISchemaError as exc:
raise InvalidSchemaProperty(prop_name, original_exception=exc)
return True

View file

@ -0,0 +1,11 @@
import attr
@attr.s(hash=True)
class Contribution(object):
src_prop_name = attr.ib()
src_prop_attr = attr.ib(default=None)
dest_prop_name = attr.ib(default=None)
is_list = attr.ib(default=False)
is_dict = attr.ib(default=False)
dest_default = attr.ib(default=None)

View file

@ -3,7 +3,7 @@ from six import text_type, binary_type, integer_types
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaValue, InvalidCustomFormatSchemaValue,
OpenAPISchemaError, MultipleOneOfSchema, NoOneOfSchema,
OpenAPISchemaError,
InvalidSchemaProperty,
UnmarshallerStrictTypeError,
)

View file

@ -1,26 +1,67 @@
class TypeValidator(object):
from jsonschema import _legacy_validators, _format, _types, _utils, _validators
from jsonschema.validators import create
def __init__(self, *types, **options):
self.types = types
self.exclude = options.get('exclude')
def __call__(self, value):
if self.exclude is not None and isinstance(value, self.exclude):
return False
if not isinstance(value, self.types):
return False
return True
from openapi_core.schema.schemas import _types as oas_types
from openapi_core.schema.schemas import _validators as oas_validators
class AttributeValidator(object):
BaseOAS30Validator = create(
meta_schema=_utils.load_schema("draft4"),
validators={
u"multipleOf": _validators.multipleOf,
# exclusiveMaximum supported inside maximum_draft3_draft4
u"maximum": _legacy_validators.maximum_draft3_draft4,
# exclusiveMinimum supported inside minimum_draft3_draft4
u"minimum": _legacy_validators.minimum_draft3_draft4,
u"maxLength": _validators.maxLength,
u"minLength": _validators.minLength,
u"pattern": _validators.pattern,
u"maxItems": _validators.maxItems,
u"minItems": _validators.minItems,
u"uniqueItems": _validators.uniqueItems,
u"maxProperties": _validators.maxProperties,
u"minProperties": _validators.minProperties,
u"required": _validators.required,
u"enum": _validators.enum,
# adjusted to OAS
u"type": oas_validators.type,
u"allOf": _validators.allOf,
u"oneOf": _validators.oneOf,
u"anyOf": _validators.anyOf,
u"not": _validators.not_,
u"items": oas_validators.items,
u"properties": _validators.properties,
u"additionalProperties": oas_validators.additionalProperties,
# TODO: adjust description
u"format": oas_validators.format,
# TODO: adjust default
u"$ref": _validators.ref,
# fixed OAS fields
u"nullable": oas_validators.nullable,
u"discriminator": oas_validators.not_implemented,
u"readOnly": oas_validators.not_implemented,
u"writeOnly": oas_validators.not_implemented,
u"xml": oas_validators.not_implemented,
u"externalDocs": oas_validators.not_implemented,
u"example": oas_validators.not_implemented,
u"deprecated": oas_validators.not_implemented,
},
type_checker=oas_types.oas30_type_checker,
version="oas30",
id_of=lambda schema: schema.get(u"id", ""),
)
def __init__(self, attribute):
self.attribute = attribute
def __call__(self, value):
if not hasattr(value, self.attribute):
return False
class OAS30Validator(BaseOAS30Validator):
return True
def iter_errors(self, instance, _schema=None):
if _schema is None:
_schema = self.schema
# append defaults to trigger validator (i.e. nullable)
if 'nullable' not in _schema:
_schema.update({
'nullable': False,
})
return super(OAS30Validator, self).iter_errors(instance, _schema)

View file

@ -2,6 +2,7 @@
"""OpenAPI core specs factories module"""
from openapi_spec_validator import openapi_v3_spec_validator
from openapi_spec_validator.validators import Dereferencer
from openapi_core.compat import lru_cache
from openapi_core.schema.components.factories import ComponentsFactory
@ -14,8 +15,8 @@ from openapi_core.schema.specs.models import Spec
class SpecFactory(object):
def __init__(self, dereferencer, config=None):
self.dereferencer = dereferencer
def __init__(self, spec_resolver, config=None):
self.spec_resolver = spec_resolver
self.config = config or {}
def create(self, spec_dict, spec_url=''):
@ -34,9 +35,16 @@ class SpecFactory(object):
paths = self.paths_generator.generate(paths)
components = self.components_factory.create(components_spec)
spec = Spec(
info, list(paths), servers=list(servers), components=components)
info, list(paths), servers=list(servers), components=components,
_resolver=self.spec_resolver,
)
return spec
@property
@lru_cache()
def dereferencer(self):
return Dereferencer(self.spec_resolver)
@property
@lru_cache()
def schemas_registry(self):

View file

@ -14,12 +14,14 @@ log = logging.getLogger(__name__)
class Spec(object):
"""Represents an OpenAPI Specification for a service."""
def __init__(self, info, paths, servers=None, components=None):
def __init__(self, info, paths, servers=None, components=None, _resolver=None):
self.info = info
self.paths = paths and dict(paths)
self.servers = servers or []
self.components = components
self._resolver = _resolver
def __getitem__(self, path_pattern):
return self.get_path(path_pattern)

View file

@ -1,6 +1,5 @@
"""OpenAPI core shortcuts module"""
from jsonschema.validators import RefResolver
from openapi_spec_validator.validators import Dereferencer
from openapi_spec_validator import default_handlers
from openapi_core.schema.media_types.exceptions import OpenAPIMediaTypeError
@ -17,8 +16,7 @@ from openapi_core.validation.response.validators import ResponseValidator
def create_spec(spec_dict, spec_url=''):
spec_resolver = RefResolver(
spec_url, spec_dict, handlers=default_handlers)
dereferencer = Dereferencer(spec_resolver)
spec_factory = SpecFactory(dereferencer)
spec_factory = SpecFactory(spec_resolver)
return spec_factory.create(spec_dict, spec_url=spec_url)

View file

@ -58,7 +58,7 @@ class RequestValidator(object):
continue
seen.add((param_name, param.location.value))
try:
raw_value = param.get_value(request)
raw_value = param.get_raw_value(request)
except MissingParameter:
continue
except OpenAPIMappingError as exc:
@ -66,11 +66,20 @@ class RequestValidator(object):
continue
try:
value = param.unmarshal(raw_value, self.custom_formatters)
casted = param.cast(raw_value)
except OpenAPIMappingError as exc:
errors.append(exc)
continue
try:
unmarshalled = param.unmarshal(
casted, self.custom_formatters,
resolver=self.spec._resolver,
)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
parameters[param.location.value][param_name] = value
parameters[param.location.value][param_name] = unmarshalled
return parameters, errors
@ -92,8 +101,16 @@ class RequestValidator(object):
errors.append(exc)
else:
try:
body = media_type.unmarshal(raw_body, self.custom_formatters)
casted = media_type.cast(raw_body)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
body = media_type.unmarshal(
casted, self.custom_formatters,
resolver=self.spec._resolver,
)
except OpenAPIMappingError as exc:
errors.append(exc)
return body, errors

View file

@ -61,9 +61,17 @@ class ResponseValidator(object):
errors.append(exc)
else:
try:
data = media_type.unmarshal(raw_data, self.custom_formatters)
casted = media_type.cast(raw_data)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
data = media_type.unmarshal(
casted, self.custom_formatters,
resolver=self.spec._resolver,
)
except OpenAPIMappingError as exc:
errors.append(exc)
return data, errors

View file

@ -2,4 +2,5 @@ openapi-spec-validator
six
lazy-object-proxy
strict_rfc3339
isodate
attrs

View file

@ -317,7 +317,10 @@ components:
suberror:
$ref: "#/components/schemas/ExtendedError"
additionalProperties:
type: string
oneOf:
- type: string
- type: integer
format: int32
responses:
ErrorResponse:
description: unexpected error

View file

@ -19,9 +19,7 @@ from openapi_core.schema.paths.models import Path
from openapi_core.schema.request_bodies.models import RequestBody
from openapi_core.schema.responses.models import Response
from openapi_core.schema.schemas.enums import SchemaType
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaProperty, InvalidSchemaValue,
)
from openapi_core.schema.schemas.exceptions import InvalidSchemaValue
from openapi_core.schema.schemas.models import Schema
from openapi_core.schema.servers.exceptions import InvalidServer
from openapi_core.schema.servers.models import Server, ServerVariable
@ -41,13 +39,17 @@ class TestPetstore(object):
api_key_bytes_enc = b64encode(api_key_bytes)
return text_type(api_key_bytes_enc, 'utf8')
@pytest.fixture
def spec_uri(self):
return "file://tests/integration/data/v3.0/petstore.yaml"
@pytest.fixture
def spec_dict(self, factory):
return factory.spec_from_file("data/v3.0/petstore.yaml")
@pytest.fixture
def spec(self, spec_dict):
return create_spec(spec_dict)
def spec(self, spec_dict, spec_uri):
return create_spec(spec_dict, spec_uri)
@pytest.fixture
def request_validator(self, spec):
@ -267,6 +269,9 @@ class TestPetstore(object):
{
'id': 1,
'name': 'Cat',
'ears': {
'healthy': True,
},
}
],
}
@ -322,16 +327,10 @@ class TestPetstore(object):
assert response_result.errors == [
InvalidMediaTypeValue(
original_exception=InvalidSchemaProperty(
property_name='data',
original_exception=InvalidSchemaProperty(
property_name='name',
original_exception=InvalidSchemaValue(
msg="Value {value} is not of type {type}",
type=SchemaType.STRING,
value={'first_name': 'Cat'},
),
),
original_exception=InvalidSchemaValue(
msg='Value not valid for schema',
type=SchemaType.OBJECT,
value=data_json,
),
),
]
@ -932,6 +931,9 @@ class TestPetstore(object):
'data': {
'id': data_id,
'name': data_name,
'ears': {
'healthy': True,
},
},
}
data = json.dumps(data_json)
@ -1239,7 +1241,6 @@ class TestPetstore(object):
assert response_result.data.rootCause == rootCause
assert response_result.data.additionalinfo == additionalinfo
@pytest.mark.xfail(reason='OneOf for string not supported atm')
def test_post_tags_created_invalid_type(
self, spec, response_validator):
host_url = 'http://petstore.swagger.io/v1'

View file

@ -421,6 +421,17 @@ class TestResponseValidator(object):
assert result.data is None
assert result.headers == {}
def test_invalid_media_type(self, validator):
request = MockRequest(self.host_url, 'get', '/v1/pets')
response = MockResponse("abcde")
result = validator.validate(request, response)
assert len(result.errors) == 1
assert type(result.errors[0]) == InvalidMediaTypeValue
assert result.data is None
assert result.headers == {}
def test_invalid_media_type_value(self, validator):
request = MockRequest(self.host_url, 'get', '/v1/pets')
response = MockResponse("{}")
@ -458,7 +469,10 @@ class TestResponseValidator(object):
'data': [
{
'id': 1,
'name': 'Sparky'
'name': 'Sparky',
'ears': {
'healthy': True,
},
},
],
}

View file

@ -0,0 +1,53 @@
import pytest
from openapi_core.schema.media_types.exceptions import InvalidMediaTypeValue
from openapi_core.schema.media_types.models import MediaType
from openapi_core.schema.schemas.models import Schema
class TestMediaTypeCast(object):
def test_empty(self):
media_type = MediaType('application/json')
value = ''
result = media_type.cast(value)
assert result == value
class TestParameterUnmarshal(object):
def test_empty(self):
media_type = MediaType('application/json')
value = ''
result = media_type.unmarshal(value)
assert result == value
def test_schema_type_invalid(self):
schema = Schema('integer', _source={'type': 'integer'})
media_type = MediaType('application/json', schema=schema)
value = 'test'
with pytest.raises(InvalidMediaTypeValue):
media_type.unmarshal(value)
def test_schema_custom_format_invalid(self):
def custom_formatter(value):
raise ValueError
schema = Schema(
'string',
schema_format='custom',
_source={'type': 'string', 'format': 'custom'},
)
custom_formatters = {
'custom': custom_formatter,
}
media_type = MediaType('application/json', schema=schema)
value = 'test'
with pytest.raises(InvalidMediaTypeValue):
media_type.unmarshal(
value, custom_formatters=custom_formatters)

View file

@ -1,8 +1,11 @@
import pytest
from openapi_core.schema.parameters.exceptions import EmptyParameterValue
from openapi_core.schema.parameters.exceptions import (
EmptyParameterValue, InvalidParameterValue,
)
from openapi_core.schema.parameters.enums import ParameterStyle
from openapi_core.schema.parameters.models import Parameter
from openapi_core.schema.schemas.models import Schema
class TestParameterInit(object):
@ -36,17 +39,35 @@ class TestParameterInit(object):
assert param.explode is True
class TestParameterUnmarshal(object):
class TestParameterCast(object):
def test_deprecated(self):
param = Parameter('param', 'query', deprecated=True)
value = 'test'
with pytest.warns(DeprecationWarning):
result = param.unmarshal(value)
result = param.cast(value)
assert result == value
def test_query_empty(self):
param = Parameter('param', 'query')
value = ''
with pytest.raises(EmptyParameterValue):
param.cast(value)
def test_query_valid(self):
param = Parameter('param', 'query')
value = 'test'
result = param.cast(value)
assert result == value
class TestParameterUnmarshal(object):
def test_query_valid(self):
param = Parameter('param', 'query')
value = 'test'
@ -55,13 +76,6 @@ class TestParameterUnmarshal(object):
assert result == value
def test_query_empty(self):
param = Parameter('param', 'query')
value = ''
with pytest.raises(EmptyParameterValue):
param.unmarshal(value)
def test_query_allow_empty_value(self):
param = Parameter('param', 'query', allow_empty_value=True)
value = ''
@ -69,3 +83,28 @@ class TestParameterUnmarshal(object):
result = param.unmarshal(value)
assert result == value
def test_query_schema_type_invalid(self):
schema = Schema('integer', _source={'type': 'integer'})
param = Parameter('param', 'query', schema=schema)
value = 'test'
with pytest.raises(InvalidParameterValue):
param.unmarshal(value)
def test_query_schema_custom_format_invalid(self):
def custom_formatter(value):
raise ValueError
schema = Schema(
'string',
schema_format='custom',
_source={'type': 'string', 'format': 'custom'},
)
custom_formatters = {
'custom': custom_formatter,
}
param = Parameter('param', 'query', schema=schema)
value = 'test'
with pytest.raises(InvalidParameterValue):
param.unmarshal(value, custom_formatters=custom_formatters)

View file

@ -7,8 +7,7 @@ import pytest
from openapi_core.extensions.models.models import Model
from openapi_core.schema.schemas.enums import SchemaFormat, SchemaType
from openapi_core.schema.schemas.exceptions import (
InvalidSchemaValue, MultipleOneOfSchema, NoOneOfSchema, OpenAPISchemaError,
UndefinedSchemaProperty
InvalidSchemaValue, OpenAPISchemaError,
)
from openapi_core.schema.schemas.models import Schema
@ -42,6 +41,17 @@ class TestSchemaUnmarshal(object):
assert result == value
@pytest.mark.parametrize('schema_type', [
'boolean', 'array', 'integer', 'number',
])
def test_non_string_empty_value(self, schema_type):
schema = Schema(schema_type)
value = ''
result = schema.unmarshal(value)
assert result is None
def test_string_valid(self):
schema = Schema('string')
value = 'test'
@ -121,19 +131,28 @@ class TestSchemaUnmarshal(object):
assert result == datetime.datetime(2018, 1, 2, 0, 0)
@pytest.mark.xfail(reason="No custom formats support atm")
def test_string_format_custom(self):
def custom_formatter(value):
return 'x-custom'
custom_format = 'custom'
schema = Schema('string', schema_format=custom_format)
value = 'x'
with mock.patch.dict(
Schema.STRING_FORMAT_CAST_CALLABLE_GETTER,
{custom_format: lambda x: x + '-custom'},
):
result = schema.unmarshal(value)
result = schema.unmarshal(
value, custom_formatters={custom_format: custom_formatter})
assert result == 'x-custom'
assert result == custom_formatter(value)
def test_string_format_custom_value_error(self):
def custom_formatter(value):
raise ValueError
custom_format = 'custom'
schema = Schema('string', schema_format=custom_format)
value = 'x'
with pytest.raises(InvalidSchemaValue):
schema.unmarshal(
value, custom_formatters={custom_format: custom_formatter})
def test_string_format_unknown(self):
unknown_format = 'unknown'
@ -143,16 +162,12 @@ class TestSchemaUnmarshal(object):
with pytest.raises(OpenAPISchemaError):
schema.unmarshal(value)
@pytest.mark.xfail(reason="No custom formats support atm")
def test_string_format_invalid_value(self):
custom_format = 'custom'
schema = Schema('string', schema_format=custom_format)
value = 'x'
with mock.patch.dict(
Schema.STRING_FORMAT_CALLABLE_GETTER,
{custom_format: mock.Mock(side_effect=ValueError())},
), pytest.raises(
with pytest.raises(
InvalidSchemaValue, message='Failed to format value'
):
schema.unmarshal(value)
@ -307,22 +322,6 @@ class TestSchemaUnmarshal(object):
])
assert schema.unmarshal(['hello']) == ['hello']
def test_schema_any_one_of_mutiple(self):
schema = Schema(one_of=[
Schema('array', items=Schema('string')),
Schema('array', items=Schema('number')),
])
with pytest.raises(MultipleOneOfSchema):
schema.unmarshal([])
def test_schema_any_one_of_no_valid(self):
schema = Schema(one_of=[
Schema('array', items=Schema('string')),
Schema('array', items=Schema('number')),
])
with pytest.raises(NoOneOfSchema):
schema.unmarshal({})
def test_schema_any(self):
schema = Schema()
assert schema.unmarshal('string') == 'string'
@ -351,13 +350,23 @@ class TestSchemaValidate(object):
assert result is None
@pytest.mark.xfail(
reason="validation does not care about custom formats atm")
def test_string_format_custom_missing(self):
custom_format = 'custom'
schema = Schema('string', schema_format=custom_format)
value = 'x'
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
@pytest.mark.parametrize('value', [False, True])
def test_boolean(self, value):
schema = Schema('boolean')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [1, 3.14, u('true'), [True, False]])
def test_boolean_invalid(self, value):
@ -366,22 +375,22 @@ class TestSchemaValidate(object):
with pytest.raises(InvalidSchemaValue):
schema.validate(value)
@pytest.mark.parametrize('value', [[1, 2], (3, 4)])
@pytest.mark.parametrize('value', [(1, 2)])
def test_array_no_schema(self, value):
schema = Schema('array')
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
@pytest.mark.parametrize('value', [[1, 2], (3, 4)])
@pytest.mark.parametrize('value', [[1, 2]])
def test_array(self, value):
schema = Schema('array', items=Schema('integer'))
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [False, 1, 3.14, u('true')])
@pytest.mark.parametrize('value', [False, 1, 3.14, u('true'), (3, 4)])
def test_array_invalid(self, value):
schema = Schema('array')
@ -394,7 +403,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [False, 3.14, u('true'), [1, 2]])
def test_integer_invalid(self, value):
@ -416,7 +425,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [4, 5, 6])
def test_integer_maximum_invalid(self, value):
@ -431,7 +440,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [1, 2, 4])
def test_integer_multiple_of_invalid(self, value):
@ -446,7 +455,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [1, 3.14])
def test_number(self, value):
@ -454,7 +463,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [False, 'true', [1, 3]])
def test_number_invalid(self, value):
@ -476,7 +485,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [1, 2, 3])
def test_number_exclusive_minimum_invalid(self, value):
@ -491,7 +500,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [4, 5, 6])
def test_number_maximum_invalid(self, value):
@ -506,7 +515,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [3, 4, 5])
def test_number_exclusive_maximum_invalid(self, value):
@ -521,7 +530,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [1, 2, 4])
def test_number_multiple_of_invalid(self, value):
@ -536,17 +545,17 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [u('true'), ])
@pytest.mark.parametrize('value', [u('true'), b('test')])
def test_string(self, value):
schema = Schema('string')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [b('test'), False, 1, 3.14, [1, 3]])
@pytest.mark.parametrize('value', [False, 1, 3.14, [1, 3]])
def test_string_invalid(self, value):
schema = Schema('string')
@ -564,24 +573,24 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
datetime.date(1989, 1, 2), datetime.date(2018, 1, 2),
u('1989-01-02'), u('2018-01-02'),
])
def test_string_format_date(self, value):
schema = Schema('string', schema_format='date')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [
uuid.UUID('{12345678-1234-5678-1234-567812345678}'),
u('12345678-1234-5678-1234-567812345678'),
])
def test_string_format_uuid(self, value):
schema = Schema('string', schema_format='uuid')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [
b('true'), u('true'), False, 1, 3.14, [1, 3],
@ -595,7 +604,7 @@ class TestSchemaValidate(object):
@pytest.mark.parametrize('value', [
b('true'), u('true'), False, 1, 3.14, [1, 3],
datetime.date(1989, 1, 2),
u('1989-01-02'),
])
def test_string_format_datetime_invalid(self, value):
schema = Schema('string', schema_format='date-time')
@ -604,19 +613,46 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
datetime.datetime(1989, 1, 2, 0, 0, 0),
datetime.datetime(2018, 1, 2, 23, 59, 59),
u('1989-01-02T00:00:00Z'),
u('2018-01-02T23:59:59Z'),
])
def test_string_format_datetime(self, value):
@mock.patch(
'openapi_core.schema.schemas._format.'
'DATETIME_HAS_STRICT_RFC3339', True
)
@mock.patch(
'openapi_core.schema.schemas._format.'
'DATETIME_HAS_ISODATE', False
)
def test_string_format_datetime_strict_rfc3339(self, value):
schema = Schema('string', schema_format='date-time')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [
u('true'), False, 1, 3.14, [1, 3], datetime.date(1989, 1, 2),
datetime.datetime(1989, 1, 2, 0, 0, 0),
u('1989-01-02T00:00:00Z'),
u('2018-01-02T23:59:59Z'),
])
@mock.patch(
'openapi_core.schema.schemas._format.'
'DATETIME_HAS_STRICT_RFC3339', False
)
@mock.patch(
'openapi_core.schema.schemas._format.'
'DATETIME_HAS_ISODATE', True
)
def test_string_format_datetime_isodate(self, value):
schema = Schema('string', schema_format='date-time')
result = schema.validate(value)
assert result is None
@pytest.mark.parametrize('value', [
u('true'), False, 1, 3.14, [1, 3], u('1989-01-02'),
u('1989-01-02T00:00:00Z'),
])
def test_string_format_binary_invalid(self, value):
schema = Schema('string', schema_format='binary')
@ -632,10 +668,20 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [
b('tsssst'), b('dGVzdA=='),
b('dGVzdA=='), u('dGVzdA=='),
])
def test_string_format_byte(self, value):
schema = Schema('string', schema_format='byte')
result = schema.validate(value)
assert result is None
@pytest.mark.parametrize('value', [
u('tsssst'), b('tsssst'), b('tesddddsdsdst'),
])
def test_string_format_byte_invalid(self, value):
schema = Schema('string', schema_format='byte')
@ -643,16 +689,6 @@ class TestSchemaValidate(object):
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
@pytest.mark.parametrize('value', [
u('tsssst'), u('dGVzdA=='),
])
def test_string_format_byte(self, value):
schema = Schema('string', schema_format='byte')
result = schema.validate(value)
assert result == value
@pytest.mark.parametrize('value', [
u('test'), b('stream'), datetime.date(1989, 1, 2),
datetime.datetime(1989, 1, 2, 0, 0, 0),
@ -664,13 +700,6 @@ class TestSchemaValidate(object):
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
@pytest.mark.parametrize('value', [u(""), ])
def test_string_min_length_invalid_schema(self, value):
schema = Schema('string', min_length=-1)
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
@pytest.mark.parametrize('value', [u(""), u("a"), u("ab")])
def test_string_min_length_invalid(self, value):
schema = Schema('string', min_length=3)
@ -684,7 +713,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [u(""), ])
def test_string_max_length_invalid_schema(self, value):
@ -706,7 +735,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [u("foo"), u("bar")])
def test_string_pattern_invalid(self, value):
@ -721,7 +750,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', ['true', False, 1, 3.14, [1, 3]])
def test_object_not_an_object(self, value):
@ -737,20 +766,20 @@ class TestSchemaValidate(object):
]
schema = Schema('object', one_of=one_of)
with pytest.raises(MultipleOneOfSchema):
with pytest.raises(InvalidSchemaValue):
schema.validate(value)
@pytest.mark.parametrize('value', [Model(), ])
@pytest.mark.parametrize('value', [{}, ])
def test_object_defferent_type_one_of(self, value):
one_of = [
Schema('integer'), Schema('string'),
]
schema = Schema('object', one_of=one_of)
with pytest.raises(MultipleOneOfSchema):
with pytest.raises(InvalidSchemaValue):
schema.validate(value)
@pytest.mark.parametrize('value', [Model(), ])
@pytest.mark.parametrize('value', [{}, ])
def test_object_no_one_of(self, value):
one_of = [
Schema(
@ -766,17 +795,17 @@ class TestSchemaValidate(object):
]
schema = Schema('object', one_of=one_of)
with pytest.raises(NoOneOfSchema):
with pytest.raises(InvalidSchemaValue):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({
{
'foo': u("FOO"),
}),
Model({
},
{
'foo': u("FOO"),
'bar': u("BAR"),
}),
},
])
def test_unambiguous_one_of(self, value):
one_of = [
@ -800,15 +829,17 @@ class TestSchemaValidate(object):
]
schema = Schema('object', one_of=one_of)
schema.validate(value)
result = schema.validate(value)
@pytest.mark.parametrize('value', [Model(), ])
assert result is None
@pytest.mark.parametrize('value', [{}, ])
def test_object_default_property(self, value):
schema = Schema('object', default='value1')
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [Model(), ])
def test_object_min_properties_invalid_schema(self, value):
@ -818,9 +849,10 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({'a': 1}),
Model({'a': 1, 'b': 2}),
Model({'a': 1, 'b': 2, 'c': 3})])
{'a': 1},
{'a': 1, 'b': 2},
{'a': 1, 'b': 2, 'c': 3},
])
def test_object_min_properties_invalid(self, value):
schema = Schema(
'object',
@ -833,9 +865,10 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({'a': 1}),
Model({'a': 1, 'b': 2}),
Model({'a': 1, 'b': 2, 'c': 3})])
{'a': 1},
{'a': 1, 'b': 2},
{'a': 1, 'b': 2, 'c': 3},
])
def test_object_min_properties(self, value):
schema = Schema(
'object',
@ -846,7 +879,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [Model(), ])
def test_object_max_properties_invalid_schema(self, value):
@ -856,9 +889,10 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({'a': 1}),
Model({'a': 1, 'b': 2}),
Model({'a': 1, 'b': 2, 'c': 3})])
{'a': 1},
{'a': 1, 'b': 2},
{'a': 1, 'b': 2, 'c': 3},
])
def test_object_max_properties_invalid(self, value):
schema = Schema(
'object',
@ -871,9 +905,10 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({'a': 1}),
Model({'a': 1, 'b': 2}),
Model({'a': 1, 'b': 2, 'c': 3})])
{'a': 1},
{'a': 1, 'b': 2},
{'a': 1, 'b': 2, 'c': 3},
])
def test_object_max_properties(self, value):
schema = Schema(
'object',
@ -884,38 +919,31 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [Model({'additional': 1}), ])
@pytest.mark.parametrize('value', [{'additional': 1}, ])
def test_object_additional_propetries(self, value):
schema = Schema('object')
schema.validate(value)
result = schema.validate(value)
@pytest.mark.parametrize('value', [Model({'additional': 1}), ])
assert result is None
@pytest.mark.parametrize('value', [{'additional': 1}, ])
def test_object_additional_propetries_false(self, value):
schema = Schema('object', additional_properties=False)
with pytest.raises(UndefinedSchemaProperty):
with pytest.raises(InvalidSchemaValue):
schema.validate(value)
@pytest.mark.parametrize('value', [Model({'additional': 1}), ])
@pytest.mark.parametrize('value', [{'additional': 1}, ])
def test_object_additional_propetries_object(self, value):
additional_properties = Schema('integer')
schema = Schema('object', additional_properties=additional_properties)
schema.validate(value)
result = schema.validate(value)
@pytest.mark.parametrize('value', [[], ])
def test_list_min_items_invalid_schema(self, value):
schema = Schema(
'array',
items=Schema('number'),
min_items=-1,
)
with pytest.raises(OpenAPISchemaError):
schema.validate(value)
assert result is None
@pytest.mark.parametrize('value', [[], [1], [1, 2]])
def test_list_min_items_invalid(self, value):
@ -938,7 +966,7 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [[], ])
def test_list_max_items_invalid_schema(self, value):
@ -974,16 +1002,16 @@ class TestSchemaValidate(object):
schema.validate(value)
@pytest.mark.parametrize('value', [
Model({
{
'someint': 123,
}),
Model({
},
{
'somestr': u('content'),
}),
Model({
},
{
'somestr': u('content'),
'someint': 123,
}),
},
])
def test_object_with_properties(self, value):
schema = Schema(
@ -996,32 +1024,28 @@ class TestSchemaValidate(object):
result = schema.validate(value)
assert result == value
assert result is None
@pytest.mark.parametrize('value', [
Model({
'somestr': Model(),
'someint': 123,
}),
Model({
{
'somestr': {},
'someint': 123,
}),
Model({
},
{
'somestr': [
'content1', 'content2'
],
'someint': 123,
}),
Model({
},
{
'somestr': 123,
'someint': 123,
}),
Model({
},
{
'somestr': 'content',
'someint': 123,
'not_in_scheme_prop': 123,
}),
},
])
def test_object_with_invalid_properties(self, value):
schema = Schema(