Validators restructure

This commit is contained in:
Artur Maciag 2018-04-17 13:38:23 +01:00
parent 734a4673f8
commit 7111a91cef
14 changed files with 236 additions and 215 deletions

View file

@ -5,7 +5,8 @@ from openapi_spec_validator import default_handlers
from openapi_core.exceptions import OpenAPIParameterError, OpenAPIBodyError
from openapi_core.schema.specs.factories import SpecFactory
from openapi_core.validators import RequestValidator, ResponseValidator
from openapi_core.validation.request.validators import RequestValidator
from openapi_core.validation.response.validators import ResponseValidator
def create_spec(spec_dict, spec_url=''):

View file

View file

@ -0,0 +1,11 @@
"""OpenAPI core validation models module"""
class BaseValidationResult(object):
def __init__(self, errors):
self.errors = errors
def raise_for_errors(self):
for error in self.errors:
raise error

View file

@ -0,0 +1,31 @@
"""OpenAPI core validation request models module"""
from openapi_core.exceptions import OpenAPIMappingError
from openapi_core.validation.models import BaseValidationResult
class RequestParameters(dict):
valid_locations = ['path', 'query', 'headers', 'cookies']
def __getitem__(self, location):
self.validate_location(location)
return self.setdefault(location, {})
def __setitem__(self, location, value):
raise NotImplementedError
@classmethod
def validate_location(cls, location):
if location not in cls.valid_locations:
raise OpenAPIMappingError(
"Unknown parameter location: {0}".format(str(location)))
class RequestValidationResult(BaseValidationResult):
def __init__(self, errors, body=None, parameters=None):
super(RequestValidationResult, self).__init__(errors)
self.body = body
self.parameters = parameters or RequestParameters()

View file

@ -0,0 +1,97 @@
"""OpenAPI core validation request validators module"""
from six import iteritems
from openapi_core.exceptions import (
OpenAPIMappingError, MissingParameter, MissingBody,
)
from openapi_core.validation.request.models import (
RequestParameters, RequestValidationResult,
)
from openapi_core.validation.util import get_operation_pattern
class RequestValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request):
errors = []
body = None
parameters = RequestParameters()
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
operation_pattern = get_operation_pattern(
server.default_url, request.full_url_pattern
)
try:
operation = self.spec.get_operation(
operation_pattern, request.method)
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
for param_name, param in iteritems(operation.parameters):
try:
raw_value = self._get_raw_value(request, param)
except MissingParameter as exc:
if param.required:
errors.append(exc)
if not param.schema or param.schema.default is None:
continue
raw_value = param.schema.default
try:
value = param.unmarshal(raw_value)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
parameters[param.location.value][param_name] = value
if operation.request_body is not None:
try:
media_type = operation.request_body[request.mimetype]
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
raw_body = self._get_raw_body(request)
except MissingBody as exc:
if operation.request_body.required:
errors.append(exc)
else:
try:
body = media_type.unmarshal(raw_body)
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
def _get_raw_value(self, request, param):
location = request.parameters[param.location.value]
try:
raw = location[param.name]
except KeyError:
raise MissingParameter(
"Missing required `{0}` parameter".format(param.name))
if param.aslist and param.explode:
return location.getlist(param.name)
return raw
def _get_raw_body(self, request):
if not request.body:
raise MissingBody("Missing required request body")
return request.body

View file

@ -0,0 +1,10 @@
"""OpenAPI core validation response models module"""
from openapi_core.validation.models import BaseValidationResult
class ResponseValidationResult(BaseValidationResult):
def __init__(self, errors, data=None, headers=None):
super(ResponseValidationResult, self).__init__(errors)
self.data = data
self.headers = headers

View file

@ -0,0 +1,68 @@
"""OpenAPI core validation response validators module"""
from openapi_core.exceptions import (
OpenAPIMappingError, MissingBody, InvalidResponse,
)
from openapi_core.validation.response.models import ResponseValidationResult
from openapi_core.validation.util import get_operation_pattern
class ResponseValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request, response):
errors = []
data = None
headers = {}
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
operation_pattern = get_operation_pattern(
server.default_url, request.full_url_pattern
)
try:
operation = self.spec.get_operation(
operation_pattern, request.method)
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
try:
operation_response = operation.get_response(
str(response.status_code))
# don't process if invalid response status code
except InvalidResponse as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
if operation_response.content:
try:
media_type = operation_response[response.mimetype]
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
raw_data = self._get_raw_data(response)
except MissingBody as exc:
errors.append(exc)
else:
try:
data = media_type.unmarshal(raw_data)
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
def _get_raw_data(self, response):
if not response.data:
raise MissingBody("Missing required response data")
return response.data

View file

@ -0,0 +1,12 @@
"""OpenAPI core validation util module"""
from yarl import URL
def get_operation_pattern(server_url, request_url_pattern):
"""Return an updated request URL pattern with the server URL removed."""
if server_url[-1] == "/":
# operations have to start with a slash, so do not remove it
server_url = server_url[:-1]
if URL(server_url).is_absolute():
return request_url_pattern.replace(server_url, "", 1)
return URL(request_url_pattern).path_qs.replace(server_url, "", 1)

View file

@ -1,211 +0,0 @@
"""OpenAPI core validators module"""
from six import iteritems
from yarl import URL
from openapi_core.exceptions import (
OpenAPIMappingError, MissingParameter, MissingBody, InvalidResponse,
)
class RequestParameters(dict):
valid_locations = ['path', 'query', 'headers', 'cookies']
def __getitem__(self, location):
self.validate_location(location)
return self.setdefault(location, {})
def __setitem__(self, location, value):
raise NotImplementedError
@classmethod
def validate_location(cls, location):
if location not in cls.valid_locations:
raise OpenAPIMappingError(
"Unknown parameter location: {0}".format(str(location)))
class BaseValidationResult(object):
def __init__(self, errors):
self.errors = errors
def raise_for_errors(self):
for error in self.errors:
raise error
class RequestValidationResult(BaseValidationResult):
def __init__(self, errors, body=None, parameters=None):
super(RequestValidationResult, self).__init__(errors)
self.body = body
self.parameters = parameters or RequestParameters()
class ResponseValidationResult(BaseValidationResult):
def __init__(self, errors, data=None, headers=None):
super(ResponseValidationResult, self).__init__(errors)
self.data = data
self.headers = headers
def get_operation_pattern(server_url, request_url_pattern):
"""Return an updated request URL pattern with the server URL removed."""
if server_url[-1] == "/":
# operations have to start with a slash, so do not remove it
server_url = server_url[:-1]
if URL(server_url).is_absolute():
return request_url_pattern.replace(server_url, "", 1)
return URL(request_url_pattern).path_qs.replace(server_url, "", 1)
class RequestValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request):
errors = []
body = None
parameters = RequestParameters()
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
operation_pattern = get_operation_pattern(
server.default_url, request.full_url_pattern
)
try:
operation = self.spec.get_operation(
operation_pattern, request.method)
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
for param_name, param in iteritems(operation.parameters):
try:
raw_value = self._get_raw_value(request, param)
except MissingParameter as exc:
if param.required:
errors.append(exc)
if not param.schema or param.schema.default is None:
continue
raw_value = param.schema.default
try:
value = param.unmarshal(raw_value)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
parameters[param.location.value][param_name] = value
if operation.request_body is not None:
try:
media_type = operation.request_body[request.mimetype]
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
raw_body = self._get_raw_body(request)
except MissingBody as exc:
if operation.request_body.required:
errors.append(exc)
else:
try:
body = media_type.unmarshal(raw_body)
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
def _get_raw_value(self, request, param):
location = request.parameters[param.location.value]
try:
raw = location[param.name]
except KeyError:
raise MissingParameter(
"Missing required `{0}` parameter".format(param.name))
if param.aslist and param.explode:
return location.getlist(param.name)
return raw
def _get_raw_body(self, request):
if not request.body:
raise MissingBody("Missing required request body")
return request.body
class ResponseValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request, response):
errors = []
data = None
headers = {}
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
operation_pattern = get_operation_pattern(
server.default_url, request.full_url_pattern
)
try:
operation = self.spec.get_operation(
operation_pattern, request.method)
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
try:
operation_response = operation.get_response(
str(response.status_code))
# don't process if invalid response status code
except InvalidResponse as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
if operation_response.content:
try:
media_type = operation_response[response.mimetype]
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
raw_data = self._get_raw_data(response)
except MissingBody as exc:
errors.append(exc)
else:
try:
data = media_type.unmarshal(raw_data)
except OpenAPIMappingError as exc:
errors.append(exc)
return ResponseValidationResult(errors, data, headers)
def _get_raw_data(self, response):
if not response.data:
raise MissingBody("Missing required response data")
return response.data

View file

@ -2,7 +2,7 @@ import pytest
from openapi_core.exceptions import InvalidOperation
from openapi_core.shortcuts import create_spec
from openapi_core.validators import RequestValidator
from openapi_core.validation.request.validators import RequestValidator
from openapi_core.wrappers import MockRequest

View file

@ -16,7 +16,8 @@ from openapi_core.schema.responses.models import Response
from openapi_core.schema.schemas.models import Schema
from openapi_core.schema.servers.models import Server, ServerVariable
from openapi_core.shortcuts import create_spec
from openapi_core.validators import RequestValidator, ResponseValidator
from openapi_core.validation.request.validators import RequestValidator
from openapi_core.validation.response.validators import ResponseValidator
from openapi_core.wrappers import MockRequest, MockResponse

View file

@ -7,7 +7,8 @@ from openapi_core.exceptions import (
InvalidValue,
)
from openapi_core.shortcuts import create_spec
from openapi_core.validators import RequestValidator, ResponseValidator
from openapi_core.validation.request.validators import RequestValidator
from openapi_core.validation.response.validators import ResponseValidator
from openapi_core.wrappers import MockRequest, MockResponse