openapi-core/openapi_core/schemas.py

229 lines
7.1 KiB
Python
Raw Normal View History

2017-09-21 11:51:37 +00:00
"""OpenAPI core schemas module"""
import logging
from collections import defaultdict
2017-10-17 13:23:26 +00:00
import warnings
2017-09-22 08:54:37 +00:00
from distutils.util import strtobool
from functools import lru_cache
2017-09-21 11:51:37 +00:00
from json import loads
from six import iteritems
2017-11-14 13:36:05 +00:00
from openapi_core.enums import SchemaType, SchemaFormat
2017-09-25 14:15:00 +00:00
from openapi_core.exceptions import (
2017-11-03 11:18:48 +00:00
InvalidValueType, UndefinedSchemaProperty, MissingProperty, InvalidValue,
2017-09-25 14:15:00 +00:00
)
from openapi_core.models import ModelFactory
2017-09-21 11:51:37 +00:00
log = logging.getLogger(__name__)
DEFAULT_CAST_CALLABLE_GETTER = {
2017-11-14 13:36:05 +00:00
SchemaType.INTEGER: int,
SchemaType.NUMBER: float,
SchemaType.BOOLEAN: lambda x: bool(strtobool(x)),
2017-09-21 11:51:37 +00:00
}
class Schema(object):
"""Represents an OpenAPI Schema."""
def __init__(
self, schema_type, model=None, properties=None, items=None,
2017-11-14 13:36:05 +00:00
schema_format=None, required=None, default=None, nullable=False,
2017-11-06 16:50:00 +00:00
enum=None, deprecated=False, all_of=None):
2017-11-14 13:36:05 +00:00
self.type = SchemaType(schema_type)
self.model = model
2017-09-21 11:51:37 +00:00
self.properties = properties and dict(properties) or {}
self.items = items
2017-11-14 13:36:05 +00:00
self.format = SchemaFormat(schema_format)
2017-11-06 16:50:00 +00:00
self.required = required or []
2017-09-25 14:15:00 +00:00
self.default = default
2017-10-17 13:02:21 +00:00
self.nullable = nullable
2017-10-17 13:23:26 +00:00
self.enum = enum
2017-10-17 13:33:46 +00:00
self.deprecated = deprecated
2017-11-06 16:50:00 +00:00
self.all_of = all_of and list(all_of) or []
2017-09-21 11:51:37 +00:00
def __getitem__(self, name):
return self.properties[name]
2017-11-06 16:50:00 +00:00
def get_all_properties(self):
properties = self.properties.copy()
for subschema in self.all_of:
subschema_props = subschema.get_all_properties()
properties.update(subschema_props)
return properties
2017-09-21 11:51:37 +00:00
def get_cast_mapping(self):
mapping = DEFAULT_CAST_CALLABLE_GETTER.copy()
mapping.update({
2017-11-14 13:36:05 +00:00
SchemaType.ARRAY: self._unmarshal_collection,
SchemaType.OBJECT: self._unmarshal_object,
})
2017-09-21 11:51:37 +00:00
return defaultdict(lambda: lambda x: x, mapping)
def cast(self, value):
"""Cast value to schema type"""
if value is None:
2017-10-17 13:02:21 +00:00
if not self.nullable:
raise InvalidValueType(
2017-11-03 11:18:48 +00:00
"Failed to cast value of {0} to {1}".format(
value, self.type)
2017-10-17 13:02:21 +00:00
)
return self.default
2017-09-21 11:51:37 +00:00
cast_mapping = self.get_cast_mapping()
if self.type in cast_mapping and value == '':
return None
cast_callable = cast_mapping[self.type]
try:
return cast_callable(value)
except ValueError:
2017-09-25 14:15:00 +00:00
raise InvalidValueType(
2017-11-03 11:18:48 +00:00
"Failed to cast value of {0} to {1}".format(value, self.type)
2017-09-21 11:51:37 +00:00
)
def unmarshal(self, value):
"""Unmarshal parameter from the value."""
2017-10-17 13:33:46 +00:00
if self.deprecated:
warnings.warn(
"The schema is deprecated", DeprecationWarning)
2017-09-21 11:51:37 +00:00
casted = self.cast(value)
if casted is None and not self.required:
return None
2017-10-17 13:23:26 +00:00
if self.enum and casted not in self.enum:
raise InvalidValue(
2017-11-03 11:18:48 +00:00
"Value of {0} not in enum choices: {1}".format(
value, self.enum)
2017-10-17 13:23:26 +00:00
)
2017-09-21 11:51:37 +00:00
return casted
def _unmarshal_collection(self, value):
return list(map(self.items.unmarshal, value))
def _unmarshal_object(self, value):
if isinstance(value, (str, bytes)):
value = loads(value)
2017-11-06 16:50:00 +00:00
all_properties = self.get_all_properties()
all_properties_keys = all_properties.keys()
2017-09-25 14:15:00 +00:00
value_keys = value.keys()
2017-11-06 16:50:00 +00:00
extra_props = set(value_keys) - set(all_properties_keys)
2017-09-25 14:15:00 +00:00
if extra_props:
raise UndefinedSchemaProperty(
"Undefined properties in schema: {0}".format(extra_props))
properties = {}
2017-11-06 16:50:00 +00:00
for prop_name, prop in iteritems(all_properties):
2017-09-25 14:15:00 +00:00
try:
prop_value = value[prop_name]
except KeyError:
if prop_name in self.required:
2017-11-03 11:18:48 +00:00
raise MissingProperty(
2017-09-25 14:15:00 +00:00
"Missing schema property {0}".format(prop_name))
2017-10-17 13:02:21 +00:00
if not prop.nullable and not prop.default:
continue
2017-09-25 14:15:00 +00:00
prop_value = prop.default
properties[prop_name] = prop.unmarshal(prop_value)
return ModelFactory().create(properties, name=self.model)
2017-09-21 11:51:37 +00:00
class PropertiesGenerator(object):
def __init__(self, dereferencer):
self.dereferencer = dereferencer
def generate(self, properties):
for property_name, schema_spec in iteritems(properties):
schema = self._create_schema(schema_spec)
yield property_name, schema
def _create_schema(self, schema_spec):
return SchemaFactory(self.dereferencer).create(schema_spec)
class SchemaFactory(object):
def __init__(self, dereferencer):
self.dereferencer = dereferencer
def create(self, schema_spec):
schema_deref = self.dereferencer.dereference(schema_spec)
2017-09-21 11:51:37 +00:00
schema_type = schema_deref['type']
2017-11-14 13:36:05 +00:00
schema_format = schema_deref.get('format')
model = schema_deref.get('x-model', None)
2017-09-21 11:51:37 +00:00
required = schema_deref.get('required', False)
default = schema_deref.get('default', None)
2017-09-21 11:51:37 +00:00
properties_spec = schema_deref.get('properties', None)
items_spec = schema_deref.get('items', None)
2017-10-17 13:02:21 +00:00
nullable = schema_deref.get('nullable', False)
2017-10-17 13:23:26 +00:00
enum = schema_deref.get('enum', None)
2017-10-17 13:33:46 +00:00
deprecated = schema_deref.get('deprecated', False)
2017-11-06 16:50:00 +00:00
all_of_spec = schema_deref.get('allOf', None)
2017-09-21 11:51:37 +00:00
properties = None
if properties_spec:
2017-09-22 08:54:37 +00:00
properties = self.properties_generator.generate(properties_spec)
2017-09-21 11:51:37 +00:00
2017-11-06 16:50:00 +00:00
all_of = []
if all_of_spec:
all_of = map(self.create, all_of_spec)
2017-09-21 11:51:37 +00:00
items = None
if items_spec:
items = self._create_items(items_spec)
return Schema(
schema_type, model=model, properties=properties, items=items,
2017-11-14 13:36:05 +00:00
schema_format=schema_format, required=required, default=default,
nullable=nullable, enum=enum, deprecated=deprecated, all_of=all_of,
)
2017-09-21 11:51:37 +00:00
2017-09-22 08:54:37 +00:00
@property
@lru_cache()
def properties_generator(self):
return PropertiesGenerator(self.dereferencer)
2017-09-21 11:51:37 +00:00
def _create_items(self, items_spec):
return self.create(items_spec)
class SchemaRegistry(SchemaFactory):
def __init__(self, dereferencer):
super(SchemaRegistry, self).__init__(dereferencer)
self._schemas = {}
def get_or_create(self, schema_spec):
schema_deref = self.dereferencer.dereference(schema_spec)
model = schema_deref.get('x-model', None)
if model and model in self._schemas:
return self._schemas[model], False
return self.create(schema_deref), True
class SchemasGenerator(object):
2017-09-22 08:54:37 +00:00
def __init__(self, dereferencer, schemas_registry):
self.dereferencer = dereferencer
2017-09-22 08:54:37 +00:00
self.schemas_registry = schemas_registry
def generate(self, schemas_spec):
schemas_deref = self.dereferencer.dereference(schemas_spec)
for schema_name, schema_spec in iteritems(schemas_deref):
2017-09-22 08:54:37 +00:00
schema, _ = self.schemas_registry.get_or_create(schema_spec)
yield schema_name, schema