openapi-core/openapi_core/validators.py

191 lines
5.9 KiB
Python
Raw Normal View History

2017-11-02 16:05:25 +00:00
"""OpenAPI core validators module"""
from six import iteritems
2017-11-14 11:28:45 +00:00
from openapi_core.enums import ParameterLocation
2017-11-02 16:05:25 +00:00
from openapi_core.exceptions import (
2017-11-06 13:32:31 +00:00
OpenAPIMappingError, MissingParameter, MissingBody, InvalidResponse,
2017-11-02 16:05:25 +00:00
)
class RequestParameters(dict):
def __getitem__(self, location):
self.validate_location(location)
return self.setdefault(location, {})
def __setitem__(self, location, value):
raise NotImplementedError
@classmethod
def validate_location(cls, location):
2017-11-14 11:28:45 +00:00
if not ParameterLocation.has_value(location):
2017-11-02 16:05:25 +00:00
raise OpenAPIMappingError(
"Unknown parameter location: {0}".format(str(location)))
class BaseValidationResult(object):
def __init__(self, errors):
self.errors = errors
2017-11-03 14:55:21 +00:00
def raise_for_errors(self):
2017-11-02 16:05:25 +00:00
for error in self.errors:
raise error
class RequestValidationResult(BaseValidationResult):
def __init__(self, errors, body=None, parameters=None):
super(RequestValidationResult, self).__init__(errors)
self.body = body
self.parameters = parameters or RequestParameters()
2017-11-06 13:32:31 +00:00
class ResponseValidationResult(BaseValidationResult):
2017-11-06 14:05:06 +00:00
def __init__(self, errors, data=None, headers=None):
2017-11-06 13:32:31 +00:00
super(ResponseValidationResult, self).__init__(errors)
2017-11-06 14:05:06 +00:00
self.data = data
2017-11-06 13:32:31 +00:00
self.headers = headers
2017-11-02 16:05:25 +00:00
class RequestValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request):
errors = []
body = None
parameters = RequestParameters()
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
operation_pattern = request.full_url_pattern.replace(
server.default_url, '')
try:
2017-11-03 11:18:48 +00:00
operation = self.spec.get_operation(
operation_pattern, request.method)
2017-11-02 16:05:25 +00:00
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
return RequestValidationResult(errors, body, parameters)
for param_name, param in iteritems(operation.parameters):
try:
raw_value = self._get_raw_value(request, param)
2017-11-03 11:18:48 +00:00
except MissingParameter as exc:
2017-11-02 16:05:25 +00:00
if param.required:
errors.append(exc)
if not param.schema or param.schema.default is None:
continue
raw_value = param.schema.default
2017-11-03 11:18:48 +00:00
try:
value = param.unmarshal(raw_value)
except OpenAPIMappingError as exc:
errors.append(exc)
else:
2017-11-14 11:28:45 +00:00
parameters[param.location.value][param_name] = value
2017-11-02 16:05:25 +00:00
if operation.request_body is not None:
try:
media_type = operation.request_body[request.mimetype]
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
raw_body = self._get_raw_body(request)
2017-11-03 11:18:48 +00:00
except MissingBody as exc:
2017-11-02 16:05:25 +00:00
if operation.request_body.required:
errors.append(exc)
else:
2017-11-03 11:18:48 +00:00
try:
body = media_type.unmarshal(raw_body)
except OpenAPIMappingError as exc:
errors.append(exc)
2017-11-02 16:05:25 +00:00
return RequestValidationResult(errors, body, parameters)
def _get_raw_value(self, request, param):
try:
2017-11-14 11:28:45 +00:00
return request.parameters[param.location.value][param.name]
2017-11-02 16:05:25 +00:00
except KeyError:
2017-11-03 11:18:48 +00:00
raise MissingParameter(
2017-11-02 16:05:25 +00:00
"Missing required `{0}` parameter".format(param.name))
def _get_raw_body(self, request):
2017-11-03 11:18:48 +00:00
if not request.body:
raise MissingBody("Missing required request body")
2017-11-02 16:05:25 +00:00
2017-11-03 11:18:48 +00:00
return request.body
2017-11-06 13:32:31 +00:00
class ResponseValidator(object):
def __init__(self, spec):
self.spec = spec
def validate(self, request, response):
errors = []
2017-11-06 14:05:06 +00:00
data = None
2017-11-06 13:32:31 +00:00
headers = {}
try:
server = self.spec.get_server(request.full_url_pattern)
# don't process if server errors
except OpenAPIMappingError as exc:
errors.append(exc)
2017-11-06 14:05:06 +00:00
return ResponseValidationResult(errors, data, headers)
2017-11-06 13:32:31 +00:00
operation_pattern = request.full_url_pattern.replace(
server.default_url, '')
try:
operation = self.spec.get_operation(
operation_pattern, request.method)
# don't process if operation errors
except OpenAPIMappingError as exc:
errors.append(exc)
2017-11-06 14:05:06 +00:00
return ResponseValidationResult(errors, data, headers)
2017-11-06 13:32:31 +00:00
try:
2017-11-06 15:08:21 +00:00
operation_response = operation.get_response(
str(response.status_code))
2017-11-06 13:32:31 +00:00
# don't process if invalid response status code
except InvalidResponse as exc:
errors.append(exc)
2017-11-06 14:05:06 +00:00
return ResponseValidationResult(errors, data, headers)
2017-11-06 13:32:31 +00:00
if operation_response.content:
try:
2017-11-06 13:53:49 +00:00
media_type = operation_response[response.mimetype]
2017-11-06 13:32:31 +00:00
except OpenAPIMappingError as exc:
errors.append(exc)
else:
try:
2017-11-06 14:05:06 +00:00
raw_data = self._get_raw_data(response)
2017-11-06 13:32:31 +00:00
except MissingBody as exc:
errors.append(exc)
else:
try:
2017-11-06 14:05:06 +00:00
data = media_type.unmarshal(raw_data)
2017-11-06 13:32:31 +00:00
except OpenAPIMappingError as exc:
errors.append(exc)
2017-11-06 14:05:06 +00:00
return ResponseValidationResult(errors, data, headers)
2017-11-06 13:32:31 +00:00
2017-11-06 14:05:06 +00:00
def _get_raw_data(self, response):
if not response.data:
raise MissingBody("Missing required response data")
2017-11-06 13:32:31 +00:00
2017-11-06 14:05:06 +00:00
return response.data