Security validation with Api Key support

This commit is contained in:
p1c2u 2020-02-04 01:21:46 +00:00 committed by Artur Maciag
parent 75643da1af
commit d915f23414
18 changed files with 219 additions and 37 deletions

View file

@ -1,6 +1,9 @@
from openapi_core.compat import lru_cache
from openapi_core.schema.components.models import Components
from openapi_core.schema.schemas.generators import SchemasGenerator
from openapi_core.schema.security_schemes.generators import (
SecuritySchemesGenerator,
)
class ComponentsFactory(object):
@ -15,15 +18,18 @@ class ComponentsFactory(object):
schemas_spec = components_deref.get('schemas', {})
responses_spec = components_deref.get('responses', {})
parameters_spec = components_deref.get('parameters', {})
request_bodies_spec = components_deref.get('request_bodies', {})
request_bodies_spec = components_deref.get('requestBodies', {})
security_schemes_spec = components_deref.get('securitySchemes', {})
schemas = self.schemas_generator.generate(schemas_spec)
responses = self._generate_response(responses_spec)
parameters = self._generate_parameters(parameters_spec)
request_bodies = self._generate_request_bodies(request_bodies_spec)
security_schemes = self._generate_security_schemes(
security_schemes_spec)
return Components(
schemas=list(schemas), responses=responses, parameters=parameters,
request_bodies=request_bodies,
request_bodies=request_bodies, security_schemes=security_schemes,
)
@property
@ -39,3 +45,7 @@ class ComponentsFactory(object):
def _generate_request_bodies(self, request_bodies_spec):
return request_bodies_spec
def _generate_security_schemes(self, security_schemes_spec):
return SecuritySchemesGenerator(self.dereferencer).generate(
security_schemes_spec)

View file

@ -3,8 +3,11 @@ class Components(object):
def __init__(
self, schemas=None, responses=None, parameters=None,
request_bodies=None):
request_bodies=None, security_schemes=None):
self.schemas = schemas and dict(schemas) or {}
self.responses = responses and dict(responses) or {}
self.parameters = parameters and dict(parameters) or {}
self.request_bodies = request_bodies and dict(request_bodies) or {}
self.security_schemes = (
security_schemes and dict(security_schemes) or {}
)

View file

@ -11,7 +11,9 @@ from openapi_core.schema.operations.models import Operation
from openapi_core.schema.parameters.generators import ParametersGenerator
from openapi_core.schema.request_bodies.factories import RequestBodyFactory
from openapi_core.schema.responses.generators import ResponsesGenerator
from openapi_core.schema.security.factories import SecurityRequirementFactory
from openapi_core.schema.security_requirements.generators import (
SecurityRequirementsGenerator,
)
from openapi_core.schema.servers.generators import ServersGenerator
@ -39,16 +41,12 @@ class OperationsGenerator(object):
tags_list = operation_deref.get('tags', [])
summary = operation_deref.get('summary')
description = operation_deref.get('description')
security_requirements_list = operation_deref.get('security', [])
security_spec = operation_deref.get('security', [])
servers_spec = operation_deref.get('servers', [])
servers = self.servers_generator.generate(servers_spec)
security = None
if security_requirements_list:
security = list(map(
self.security_requirement_factory.create,
security_requirements_list))
security = self.security_requirements_generator.generate(
security_spec)
external_docs = None
if 'externalDocs' in operation_deref:
@ -67,10 +65,10 @@ class OperationsGenerator(object):
Operation(
http_method, path_name, responses, list(parameters),
summary=summary, description=description,
external_docs=external_docs, security=security,
external_docs=external_docs, security=list(security),
request_body=request_body, deprecated=deprecated,
operation_id=operation_id, tags=list(tags_list),
servers=servers,
servers=list(servers),
),
)
@ -96,8 +94,8 @@ class OperationsGenerator(object):
@property
@lru_cache()
def security_requirement_factory(self):
return SecurityRequirementFactory(self.dereferencer)
def security_requirements_generator(self):
return SecurityRequirementsGenerator(self.dereferencer)
@property
@lru_cache()

View file

@ -1,14 +0,0 @@
"""OpenAPI core security factories module"""
from openapi_core.schema.security.models import SecurityRequirement
class SecurityRequirementFactory(object):
def __init__(self, dereferencer):
self.dereferencer = dereferencer
def create(self, security_requirement_spec):
name = next(iter(security_requirement_spec))
scope_names = security_requirement_spec[name]
return SecurityRequirement(name, scope_names=scope_names)

View file

@ -0,0 +1,18 @@
"""OpenAPI core security requirements generators module"""
from openapi_core.schema.security_requirements.models import (
SecurityRequirement,
)
class SecurityRequirementsGenerator(object):
def __init__(self, dereferencer):
self.dereferencer = dereferencer
def generate(self, security_spec):
security_deref = self.dereferencer.dereference(security_spec)
for security_requirement_spec in security_deref:
name = next(iter(security_requirement_spec))
scope_names = security_requirement_spec[name]
yield SecurityRequirement(name, scope_names=scope_names)

View file

@ -1,4 +1,4 @@
"""OpenAPI core security models module"""
"""OpenAPI core security requirements models module"""
class SecurityRequirement(object):

View file

@ -0,0 +1,21 @@
"""OpenAPI core security schemes enums module"""
from enum import Enum
class SecuritySchemeType(Enum):
API_KEY = 'apiKey'
HTTP = 'http'
OAUTH2 = 'oauth2'
OPEN_ID_CONNECT = 'openIdConnect'
class ApiKeyLocation(Enum):
QUERY = 'query'
HEADER = 'header'
COOKIE = 'cookie'
@classmethod
def has_value(cls, value):
return (any(value == item.value for item in cls))

View file

@ -0,0 +1,37 @@
"""OpenAPI core security schemes generators module"""
import logging
from six import iteritems
from openapi_core.schema.security_schemes.models import SecurityScheme
log = logging.getLogger(__name__)
class SecuritySchemesGenerator(object):
def __init__(self, dereferencer):
self.dereferencer = dereferencer
def generate(self, security_schemes_spec):
security_schemes_deref = self.dereferencer.dereference(
security_schemes_spec)
for scheme_name, scheme_spec in iteritems(security_schemes_deref):
scheme_deref = self.dereferencer.dereference(scheme_spec)
scheme_type = scheme_deref['type']
description = scheme_deref.get('description')
name = scheme_deref.get('name')
apikey_in = scheme_deref.get('in')
scheme = scheme_deref.get('scheme')
bearer_format = scheme_deref.get('bearerFormat')
flows = scheme_deref.get('flows')
open_id_connect_url = scheme_deref.get('openIdConnectUrl')
scheme = SecurityScheme(
scheme_type, description=description, name=name,
apikey_in=apikey_in, scheme=scheme,
bearer_format=bearer_format, flows=flows,
open_id_connect_url=open_id_connect_url,
)
yield scheme_name, scheme

View file

@ -0,0 +1,22 @@
"""OpenAPI core security schemes models module"""
from openapi_core.schema.security_schemes.enums import (
SecuritySchemeType, ApiKeyLocation,
)
class SecurityScheme(object):
"""Represents an OpenAPI Security Scheme."""
def __init__(
self, scheme_type, description=None, name=None, apikey_in=None,
scheme=None, bearer_format=None, flows=None,
open_id_connect_url=None,
):
self.type = SecuritySchemeType(scheme_type)
self.description = description
self.name = name
self.apikey_in = apikey_in and ApiKeyLocation(apikey_in)
self.scheme = scheme
self.bearer_format = bearer_format
self.flows = flows
self.open_id_connect_url = open_id_connect_url

View file

@ -9,6 +9,9 @@ from openapi_core.schema.components.factories import ComponentsFactory
from openapi_core.schema.infos.factories import InfoFactory
from openapi_core.schema.paths.generators import PathsGenerator
from openapi_core.schema.schemas.registries import SchemaRegistry
from openapi_core.schema.security_requirements.generators import (
SecurityRequirementsGenerator,
)
from openapi_core.schema.servers.generators import ServersGenerator
from openapi_core.schema.specs.models import Spec
@ -29,6 +32,7 @@ class SpecFactory(object):
servers_spec = spec_dict_deref.get('servers', [])
paths = spec_dict_deref.get('paths', {})
components_spec = spec_dict_deref.get('components', {})
security_spec = spec_dict_deref.get('security', [])
if not servers_spec:
servers_spec = [
@ -39,8 +43,13 @@ class SpecFactory(object):
servers = self.servers_generator.generate(servers_spec)
paths = self.paths_generator.generate(paths)
components = self.components_factory.create(components_spec)
security = self.security_requirements_generator.generate(
security_spec)
spec = Spec(
info, list(paths), servers=list(servers), components=components,
security=list(security),
_resolver=self.spec_resolver,
)
return spec
@ -74,3 +83,8 @@ class SpecFactory(object):
@lru_cache()
def components_factory(self):
return ComponentsFactory(self.dereferencer, self.schemas_registry)
@property
@lru_cache()
def security_requirements_generator(self):
return SecurityRequirementsGenerator(self.dereferencer)

View file

@ -15,11 +15,13 @@ class Spec(object):
"""Represents an OpenAPI Specification for a service."""
def __init__(
self, info, paths, servers=None, components=None, _resolver=None):
self, info, paths, servers=None, components=None,
security=None, _resolver=None):
self.info = info
self.paths = paths and dict(paths)
self.servers = servers or []
self.components = components
self.security = security
self._resolver = _resolver

View file

@ -71,3 +71,4 @@ class OpenAPIRequest(object):
class RequestValidationResult(BaseValidationResult):
body = attr.ib(default=None)
parameters = attr.ib(factory=RequestParameters)
security = attr.ib(default=None)

View file

@ -1,6 +1,7 @@
"""OpenAPI core validation request validators module"""
from itertools import chain
from six import iteritems
import warnings
from openapi_core.casting.schemas.exceptions import CastError
from openapi_core.deserializing.exceptions import DeserializeError
@ -11,6 +12,7 @@ from openapi_core.schema.parameters.exceptions import (
)
from openapi_core.schema.paths.exceptions import InvalidPath
from openapi_core.schema.request_bodies.exceptions import MissingRequestBody
from openapi_core.schema.security_schemes.enums import SecuritySchemeType
from openapi_core.schema.servers.exceptions import InvalidServer
from openapi_core.unmarshalling.schemas.exceptions import (
UnmarshalError, ValidateError,
@ -37,7 +39,13 @@ class RequestValidator(object):
operation = self._get_operation(request)
# don't process if operation errors
except (InvalidServer, InvalidPath, InvalidOperation) as exc:
return RequestValidationResult([exc, ], None, None)
return RequestValidationResult([exc, ], None, None, None)
try:
security = self._get_security(request, operation)
# TODO narrow exceptions
except Exception as exc:
return RequestValidationResult([exc, ], None, None, None)
params, params_errors = self._get_parameters(
request, chain(
@ -49,7 +57,7 @@ class RequestValidator(object):
body, body_errors = self._get_body(request, operation)
errors = params_errors + body_errors
return RequestValidationResult(errors, body, params)
return RequestValidationResult(errors, body, params, security)
def _validate_parameters(self, request):
try:
@ -64,7 +72,7 @@ class RequestValidator(object):
iteritems(path.parameters)
)
)
return RequestValidationResult(params_errors, None, params)
return RequestValidationResult(params_errors, None, params, None)
def _validate_body(self, request):
try:
@ -73,7 +81,7 @@ class RequestValidator(object):
return RequestValidationResult([exc, ], None, None)
body, body_errors = self._get_body(request, operation)
return RequestValidationResult(body_errors, body, None)
return RequestValidationResult(body_errors, body, None, None)
def _get_operation_pattern(self, request):
server = self.spec.get_server(request.full_url_pattern)
@ -92,6 +100,19 @@ class RequestValidator(object):
return self.spec.get_operation(operation_pattern, request.method)
def _get_security(self, request, operation):
security = operation.security or self.spec.security
if not security:
return
for security_requirement in security:
data = {
security_requirement.name: self._get_security_value(
security_requirement.name, request)
}
if all(value for value in data.values()):
return data
def _get_parameters(self, request, params):
errors = []
seen = set()
@ -166,6 +187,17 @@ class RequestValidator(object):
return body, []
def _get_security_value(self, scheme_name, request):
scheme = self.spec.components.security_schemes.get(scheme_name)
if not scheme:
return
if scheme.type == SecuritySchemeType.API_KEY:
source = getattr(request.parameters, scheme.apikey_in.value)
return source.get(scheme.name)
warnings.warn("Only api key security scheme type supported")
def _get_parameter_value(self, param, request):
location = request.parameters[param.location.value]

View file

@ -11,6 +11,8 @@ info:
license:
name: MIT
url: https://opensource.org/licenses/MIT
security:
- api_key: []
servers:
- url: http://petstore.swagger.io/{version}
variables:
@ -363,3 +365,12 @@ components:
application/json:
schema:
$ref: "#/components/schemas/PetsData"
securitySchemes:
api_key:
type: apiKey
name: api_key
in: query
petstore_auth:
type: apiKey
name: api_key
in: header

View file

@ -9,7 +9,9 @@ from openapi_core.schema.paths.models import Path
from openapi_core.schema.request_bodies.models import RequestBody
from openapi_core.schema.responses.models import Response
from openapi_core.schema.schemas.models import Schema
from openapi_core.schema.security.models import SecurityRequirement
from openapi_core.schema.security_requirements.models import (
SecurityRequirement,
)
from openapi_core.schema.servers.models import Server, ServerVariable
from openapi_core.shortcuts import create_spec
from openapi_core.validation.request.validators import RequestValidator
@ -64,6 +66,15 @@ class TestPetstore(object):
assert spec.info.license.name == license_spec['name']
assert spec.info.license.url == license_spec['url']
security_spec = spec_dict.get('security', [])
for idx, security_req in enumerate(spec.security):
assert type(security_req) == SecurityRequirement
security_req_spec = security_spec[idx]
name = next(iter(security_req_spec))
assert security_req.name == name
assert security_req.scope_names == security_req_spec[name]
assert spec.get_server_url() == url
for idx, server in enumerate(spec.servers):
@ -130,6 +141,15 @@ class TestPetstore(object):
assert variable.default == variable_spec['default']
assert variable.enum == variable_spec.get('enum')
security_spec = operation_spec.get('security', [])
for idx, security_req in enumerate(operation.security):
assert type(security_req) == SecurityRequirement
security_req_spec = security_spec[idx]
name = next(iter(security_req_spec))
assert security_req.name == name
assert security_req.scope_names == security_req_spec[name]
responses_spec = operation_spec.get('responses')
for http_status, response in iteritems(operation.responses):

View file

@ -94,9 +94,10 @@ class TestRequestValidator(object):
)
def test_get_pets(self, validator):
args = {'limit': '10', 'ids': ['1', '2'], 'api_key': self.api_key}
request = MockRequest(
self.host_url, 'get', '/v1/pets',
path_pattern='/v1/pets', args={'limit': '10', 'ids': ['1', '2']},
path_pattern='/v1/pets', args=args,
)
result = validator.validate(request)
@ -111,6 +112,9 @@ class TestRequestValidator(object):
'ids': [1, 2],
},
)
assert result.security == {
'api_key': self.api_key,
}
def test_get_pets_webob(self, validator):
from webob.multidict import GetDict
@ -231,6 +235,9 @@ class TestRequestValidator(object):
'user': 123,
},
)
assert result.security == {
'petstore_auth': self.api_key_encoded,
}
schemas = spec_dict['components']['schemas']
pet_model = schemas['PetCreate']['x-model']