diff --git a/docs/api.rst b/docs/api.rst index b1b0f87..c324972 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -54,6 +54,12 @@ Bundled Transcoders .. autoclass:: MsgPackTranscoder :members: +.. autoclass:: FormUrlEncodedTranscoder + :members: + +.. autoclass:: FormUrlEncodingOptions + :members: + .. _type-info: Python Type Information diff --git a/docs/history.rst b/docs/history.rst index 818ea42..4dea9bc 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -3,11 +3,16 @@ Version History :compare:`Next <3.0.4...master>` -------------------------------- +- Add a transcoder for `application/x-www-formurlencoded`_ - Add type annotations (see :ref:`type-info`) - Return a "406 Not Acceptable" if the :http:header:`Accept` header values cannot be matched and there is no default content type configured - Deprecate not having a default content type configured - Fail gracefully when a transcoder does not exist for the default content type +- Fail gracefully when a transcoder raises a :exc:`TypeError` or :exc:`ValueError` when encoding + the response + +.. _application/x-www-formurlencoded: https://url.spec.whatwg.org/#application/x-www-form-urlencoded :compare:`3.0.4 <3.0.3...3.0.4>` (2 Nov 2020) --------------------------------------------- diff --git a/setup.cfg b/setup.cfg index 7086654..bdd866b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -87,3 +87,6 @@ exclude = build,env,.eggs [mypy] mypy_path = typestubs strict = True + +[yapf] +allow_split_before_dict_value = False diff --git a/sprockets/mixins/mediatype/content.py b/sprockets/mixins/mediatype/content.py index 2ba01a1..2a27d3f 100644 --- a/sprockets/mixins/mediatype/content.py +++ b/sprockets/mixins/mediatype/content.py @@ -423,8 +423,15 @@ class ContentMixin(web.RequestHandler): settings.default_content_type) raise web.HTTPError(500) else: - content_type, data_bytes = handler.to_bytes(body) - if set_content_type: - self.set_header('Content-Type', content_type) - self.add_header('Vary', 'Accept') - self.write(data_bytes) + try: + content_type, data_bytes = handler.to_bytes(body) + except (TypeError, ValueError) as e: + self._logger.error( + 'selected transcoder (%s) failed to encode response ' + 'body: %s', handler.__class__.__name__, e) + raise web.HTTPError(500, reason='Response Encoding Failure') + else: + if set_content_type: + self.set_header('Content-Type', content_type) + self.add_header('Vary', 'Accept') + self.write(data_bytes) diff --git a/sprockets/mixins/mediatype/transcoders.py b/sprockets/mixins/mediatype/transcoders.py index 1f8d2aa..879b705 100644 --- a/sprockets/mixins/mediatype/transcoders.py +++ b/sprockets/mixins/mediatype/transcoders.py @@ -3,13 +3,17 @@ Bundled media type transcoders. - :class:`.JSONTranscoder` implements JSON encoding/decoding - :class:`.MsgPackTranscoder` implements msgpack encoding/decoding +- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding """ from __future__ import annotations import base64 +import dataclasses import json +import string import typing +import urllib.parse import uuid import collections.abc @@ -21,6 +25,14 @@ except ImportError: # pragma: no cover from sprockets.mixins.mediatype import handlers, type_info +_FORM_URLENCODING = {c: '%{:02X}'.format(c) for c in range(0, 255)} +_FORM_URLENCODING.update({ord(c): c for c in string.ascii_letters}) +_FORM_URLENCODING.update({ord(c): c for c in string.digits}) +_FORM_URLENCODING.update({ord(c): c for c in '*-_.'}) + +_FORM_URLENCODING_PLUS = _FORM_URLENCODING.copy() +_FORM_URLENCODING_PLUS[ord(' ')] = '+' + class JSONTranscoder(handlers.TextContentHandler): """ @@ -238,3 +250,226 @@ class MsgPackTranscoder(handlers.BinaryContentHandler): raise TypeError('{} is not msgpackable'.format( datum.__class__.__name__)) + + +@dataclasses.dataclass +class FormUrlEncodingOptions: + """Configuration knobs for :class:`.FormUrlEncodedTranscoder`""" + encoding: str = 'utf-8' + """Encoding use when generating the byte stream from character data.""" + + encode_sequences: bool = False + """Encode sequence values as multiple name=value instances.""" + + literal_mapping: dict[typing.Literal[None, True, False], + str] = dataclasses.field(default_factory=lambda: { + None: '', + True: 'true', + False: 'false' + }) + """Mapping from supported literal values to strings.""" + + space_as_plus: bool = False + """Quote spaces as ``%20`` or ``+``.""" + + +class FormUrlEncodedTranscoder: + """Opinionated transcoder for the venerable x-www-formurlencoded. + + :param encoding_options: keyword parameters are used to initialize + :class:`FormUrlEncodingOptions` + + This transcoder implements transcoding according to the current + W3C documentation. The encoding interface takes mappings or + sequences of pairs and encodes both the name and value. The + following table describes how each supported type is encoded. + + +----------------------------+---------------------------------------+ + | Value / Type | Encoding | + +============================+=======================================+ + | character strings | UTF-8 codepoints before percent- | + | | encoding the resulting bytes | + +----------------------------+---------------------------------------+ + | space character | ``%20`` or ``+`` | + +----------------------------+---------------------------------------+ + | :data:`False` | ``false`` | + +----------------------------+---------------------------------------+ + | :data:`True` | ``true`` | + +----------------------------+---------------------------------------+ + | :data:`None` | the empty string | + +----------------------------+---------------------------------------+ + | numbers | ``str(n)`` | + +----------------------------+---------------------------------------+ + | byte sequences | percent-encoded bytes | + +----------------------------+---------------------------------------+ + | :class:`uuid.UUID` | ``str(u)`` | + +----------------------------+---------------------------------------+ + | :class:`datetime.datetime` | result of calling | + | | :meth:`~datetime.datetime.isoformat` | + +----------------------------+---------------------------------------+ + + https://url.spec.whatwg.org/#application/x-www-form-urlencoded + + .. warning:: + + Types that are not explicitly mentioned above will result in + :meth:`to_bytes` simply calling ``str(value)`` and encoding + the result. This causes nested sequences to be encoded as + their ``repr``. For example, encoding ``{'a': [1, 2]}`` will + result in ``a=%5B1%2C%202%5D``. This matches what + :func:`urllib.parse.urlencode` does by default. + + Better support for sequence values can be enabled by setting + the :attr:`~FormUrlEncodingOptions.encode_sequences` attribute + of :attr:`.options`. This mimics the ``doseq`` parameter of + :func:`urllib,parse.urlencode`. + + .. attribute:: options + :type: FormUrlEncodingOptions + + Controls the behavior of the transcoder + + """ + content_type = 'application/x-www-formurlencoded' + + def __init__(self, **encoding_options: typing.Any) -> None: + self.options = FormUrlEncodingOptions(**encoding_options) + + def to_bytes( + self, + inst_data: type_info.Serializable, + encoding: typing.Optional[str] = None) -> typing.Tuple[str, bytes]: + """Serialize `inst_data` into a byte stream and content type spec. + + :param inst_data: the data to serialize + :param encoding: optional encoding override + + Serialization is implemented as described in the W3C + `urlencoded serialization`_ algorithm. The :attr:`.options` + attribute controls the configurable details of the encoding + process. + + The character encoding can be further overridden by specifying the + `encoding` parameter. + + :returns: tuple of the content type and the resulting bytes + :raises: :exc:`TypeError` if a supplied value cannot be serialized + + .. _urlencoded serialization: https://url.spec.whatwg.org/ + #urlencoded-serializing + + """ + # Select the appropriate encoding table and use the default + # character encoding if necessary. Binding these to local + # names removes branches from the inner loop. + chr_map: typing.Mapping[int, str] + chr_map = (_FORM_URLENCODING_PLUS + if self.options.space_as_plus else _FORM_URLENCODING) + if encoding is None: + encoding = self.options.encoding + + # Generate a sequence of name+value tuples to encode or + # directly encode primitives + try: + tuples = self._convert_to_tuple_sequence(inst_data) + except TypeError: + # hopefully this is a primitive ... if not then the + # call to _encode will fail below + tuples = [(inst_data, None)] + + prefix = '' # another micro-optimization + buf = [] + for name, value in tuples: + buf.append(prefix) + buf.extend(self._encode(name, chr_map, encoding)) + if value is not None: + buf.append('=') + buf.extend(self._encode(value, chr_map, encoding)) + prefix = '&' + encoded = ''.join(buf) + + return self.content_type, encoded.encode('ascii') + + def from_bytes( + self, + data_bytes: bytes, + encoding: typing.Optional[str] = None) -> type_info.Deserialized: + """Deserialize `bytes` into a Python object instance. + + :param data_bytes: byte string to deserialize + :param encoding: optional encoding override + + Deserialization is implemented according to the W3C + `urlencoded deserialization`_ algorithm. The :attr:`.options` + attribute controls the configurable details of the encoding + process. + + :returns: the decoded Python object + + .. _urlencoded deserialization: https://url.spec.whatwg.org/ + #urlencoded-parsing + + """ + dequote = (urllib.parse.unquote_plus + if self.options.space_as_plus else urllib.parse.unquote) + if encoding is None: + encoding = self.options.encoding + + output = [] + for part in data_bytes.decode('ascii').split('&'): + if not part: + continue + name, eq_present, value = part.partition('=') + name = dequote(name, encoding=encoding) + if eq_present: + output.append((name, dequote(value, encoding=encoding))) + else: + output.append((name, '')) + + return dict(output) + + def _encode(self, datum: typing.Union[bool, None, float, int, str, + type_info.DefinesIsoFormat], + char_map: typing.Mapping[int, str], encoding: str) -> str: + if isinstance(datum, str): + pass # optimization: skip additional checks for strings + elif (isinstance(datum, (float, int, str, uuid.UUID)) + and not isinstance(datum, bool)): + datum = str(datum) + elif (isinstance(datum, collections.abc.Hashable) + and datum in self.options.literal_mapping): + # the isinstance Hashable check confuses mypy + datum = self.options.literal_mapping[datum] # type: ignore + elif isinstance(datum, (bytearray, bytes, memoryview)): + return ''.join(char_map[c] for c in datum) + elif isinstance(datum, type_info.DefinesIsoFormat): + datum = datum.isoformat() + else: + datum = str(datum) + + return ''.join(char_map[c] for c in datum.encode(encoding)) + + def _convert_to_tuple_sequence( + self, value: type_info.Serializable + ) -> typing.Iterable[typing.Tuple[typing.Any, typing.Any]]: + tuples: typing.Iterable[typing.Tuple[typing.Any, typing.Any]] + if isinstance(value, collections.abc.Mapping): + tuples = value.items() + else: + try: + tuples = [(a, b) for a, b in value] # type: ignore + except (TypeError, ValueError): + raise TypeError('Cannot convert value to sequence of tuples') + + if self.options.encode_sequences: + out_tuples = [] + for a, b in tuples: + if (not isinstance(b, (bytes, bytearray, memoryview, str)) + and isinstance(b, collections.abc.Iterable)): + for value in b: + out_tuples.append((a, value)) + else: + out_tuples.append((a, b)) + tuples = out_tuples + + return tuples diff --git a/sprockets/mixins/mediatype/type_info.py b/sprockets/mixins/mediatype/type_info.py index 06e0e72..53e6f22 100644 --- a/sprockets/mixins/mediatype/type_info.py +++ b/sprockets/mixins/mediatype/type_info.py @@ -4,13 +4,14 @@ import typing import uuid try: - from typing import Protocol + from typing import Protocol, runtime_checkable except ImportError: # "ignore" is required to avoid an incompatible import # error due to different bindings of _SpecialForm - from typing_extensions import Protocol # type: ignore + from typing_extensions import Protocol, runtime_checkable # type: ignore +@runtime_checkable class DefinesIsoFormat(Protocol): """An object that has an isoformat method.""" def isoformat(self) -> str: @@ -24,6 +25,10 @@ class HasSettings(Protocol): """Application settings.""" +SerializablePrimitives = (type(None), bool, bytearray, bytes, float, int, + memoryview, str, uuid.UUID) +"""Use this with isinstance to identify simple values.""" + Serializable = typing.Union[DefinesIsoFormat, None, bool, bytearray, bytes, float, int, memoryview, str, typing.Mapping, typing.Sequence, typing.Set, uuid.UUID] diff --git a/tests.py b/tests.py index f2ea89a..550bb2d 100644 --- a/tests.py +++ b/tests.py @@ -1,6 +1,7 @@ import base64 import datetime import json +import math import os import pickle import struct @@ -12,7 +13,8 @@ from ietfparse import algorithms from tornado import httputil, testing, web import umsgpack -from sprockets.mixins.mediatype import content, handlers, transcoders +from sprockets.mixins.mediatype import (content, handlers, transcoders, + type_info) import examples @@ -175,6 +177,35 @@ class SendResponseTests(testing.AsyncHTTPTestCase): self.assertEqual('application/foo+json', response.headers.get('Content-Type')) + def test_that_transcoder_failures_result_in_500(self): + class FailingTranscoder: + content_type = 'application/vnd.com.example.bad' + + def __init__(self): + self.exc_class = TypeError + + def to_bytes(self, inst_data, encoding=None): + raise self.exc_class('I always fail at this') + + def from_bytes(self, data_bytes, encoding=None): + return {} + + transcoder = FailingTranscoder() + content.add_transcoder(self.application, transcoder) + for _ in range(2): + response = self.fetch( + '/', + method='POST', + body=b'{}', + headers={ + 'Accept': 'application/vnd.com.example.bad', + 'Content-Type': 'application/json', + }, + ) + self.assertEqual(500, response.code) + self.assertEqual('Response Encoding Failure', response.reason) + transcoder.exc_class = ValueError + class GetRequestBodyTests(testing.AsyncHTTPTestCase): def setUp(self): @@ -520,3 +551,148 @@ class MsgPackTranscoderTests(unittest.TestCase): new_callable=lambda: None): with self.assertRaises(RuntimeError): transcoders.MsgPackTranscoder() + + +class FormUrlEncodingTranscoderTests(unittest.TestCase): + transcoder: type_info.Transcoder + + def setUp(self): + super().setUp() + self.transcoder = transcoders.FormUrlEncodedTranscoder() + + def test_simple_deserialization(self): + body = self.transcoder.from_bytes( + b'number=12&boolean=true&null=null&string=anything%20really&empty=' + ) + self.assertEqual(body['number'], '12') + self.assertEqual(body['boolean'], 'true') + self.assertEqual(body['empty'], '') + self.assertEqual(body['null'], 'null') + self.assertEqual(body['string'], 'anything really') + + def test_deserialization_edge_cases(self): + body = self.transcoder.from_bytes(b'') + self.assertEqual({}, body) + + body = self.transcoder.from_bytes(b'&') + self.assertEqual({}, body) + + body = self.transcoder.from_bytes(b'empty&&=no-name&no-value=') + self.assertEqual({'empty': '', '': 'no-name', 'no-value': ''}, body) + + body = self.transcoder.from_bytes(b'repeated=1&repeated=2') + self.assertEqual({'repeated': '2'}, body) + + def test_that_deserialization_encoding_can_be_overridden(self): + body = self.transcoder.from_bytes(b'kolor=%bf%F3%b3ty', + encoding='iso-8859-2') + self.assertEqual({'kolor': 'żółty'}, body) + + def test_simple_serialization(self): + now = datetime.datetime.now() + id_val = uuid.uuid4() + content_type, result = self.transcoder.to_bytes({ + 'integer': 12, + 'float': math.pi, + 'string': 'percent quoted', + 'datetime': now, + 'id': id_val, + }) + self.assertEqual(content_type, 'application/x-www-formurlencoded') + self.assertEqual( + result.decode(), '&'.join([ + 'integer=12', + f'float={math.pi}', + 'string=percent%20quoted', + 'datetime=' + now.isoformat().replace(':', '%3A'), + f'id={id_val}', + ])) + + def test_that_serialization_encoding_can_be_overridden(self): + _, result = self.transcoder.to_bytes([('kolor', 'żółty')], + encoding='iso-8859-2') + self.assertEqual(b'kolor=%bf%f3%b3ty', result.lower()) + + def test_serialization_edge_cases(self): + _, result = self.transcoder.to_bytes([ + ('', ''), + ('', True), + ('', False), + ('', None), + ('name', None), + ]) + self.assertEqual(b'=&=true&=false&&name', result) + + def test_serialization_using_plusses(self): + self.transcoder: transcoders.FormUrlEncodedTranscoder + + self.transcoder.options.space_as_plus = True + _, result = self.transcoder.to_bytes({'value': 'with space'}) + self.assertEqual(b'value=with+space', result) + + self.transcoder.options.space_as_plus = False + _, result = self.transcoder.to_bytes({'value': 'with space'}) + self.assertEqual(b'value=with%20space', result) + + def test_that_serializing_unsupported_types_stringifies(self): + obj = object() + # quick & dirty URL encoding + expected = str(obj).translate({0x20: '%20', 0x3C: '%3C', 0x3E: '%3E'}) + + _, result = self.transcoder.to_bytes({'unsupported': obj}) + self.assertEqual(f'unsupported={expected}'.encode(), result) + + def test_that_required_octets_are_encoded(self): + # build the set of all characters required to be encoded by + # https://url.spec.whatwg.org/#percent-encoded-bytes + pct_chrs = typing.cast(typing.Set[str], set()) + pct_chrs.update({c for c in ' "#<>'}) # query set + pct_chrs.update({c for c in '?`{}'}) # path set + pct_chrs.update({c for c in '/:;=@[^|'}) # userinfo set + pct_chrs.update({c for c in '$%&+,'}) # component set + pct_chrs.update({c for c in "!'()~"}) # formurlencoding set + + test_string = ''.join(pct_chrs) + expected = ''.join('%{:02X}'.format(ord(c)) for c in test_string) + expected = f'test_string={expected}'.encode() + _, result = self.transcoder.to_bytes({'test_string': test_string}) + self.assertEqual(expected, result) + + def test_serialization_of_primitives(self): + id_val = uuid.uuid4() + expectations = { + None: b'', + 'a string': b'a%20string', + 10: b'10', + 2.3: str(2.3).encode(), + True: b'true', + False: b'false', + b'\xfe\xed\xfa\xce': b'%FE%ED%FA%CE', + memoryview(b'\xfe\xed\xfa\xce'): b'%FE%ED%FA%CE', + id_val: str(id_val).encode(), + } + for value, expected in expectations.items(): + _, result = self.transcoder.to_bytes(value) + self.assertEqual(expected, result) + + def test_serialization_with_empty_literal_map(self): + self.transcoder: transcoders.FormUrlEncodedTranscoder + self.transcoder.options.literal_mapping.clear() + for value in {None, True, False}: + _, result = self.transcoder.to_bytes(value) + self.assertEqual(str(value).encode(), result) + + def test_serialization_of_sequences(self): + self.transcoder: transcoders.FormUrlEncodedTranscoder + + value = {'list': [1, 2], 'tuple': (1, 2), 'set': {1, 2}, 'str': 'val'} + + self.transcoder.options.encode_sequences = False + _, result = self.transcoder.to_bytes(value) + self.assertEqual((b'list=%5B1%2C%202%5D&tuple=%281%2C%202%29' + b'&set=%7B1%2C%202%7D&str=val'), result) + + self.transcoder.options.encode_sequences = True + _, result = self.transcoder.to_bytes(value) + self.assertEqual(b'list=1&list=2&tuple=1&tuple=2&set=1&set=2&str=val', + result)