From 127dcc90f56d95dcc3dafd8e4bad646f72fb9a9f Mon Sep 17 00:00:00 2001 From: Correl Roush Date: Fri, 5 Apr 2019 15:48:10 -0400 Subject: [PATCH] WIP: Monad Transformers --- monads/maybet.py | 98 ++++++++++++++++++++++++++++ monads/resultt.py | 110 ++++++++++++++++++++++++++++++++ monads/transformers/__init__.py | 0 monads/transformers/maybe.py | 92 -------------------------- tests/test_resultt.py | 33 ++++++++++ 5 files changed, 241 insertions(+), 92 deletions(-) create mode 100644 monads/maybet.py create mode 100644 monads/resultt.py delete mode 100644 monads/transformers/__init__.py delete mode 100644 monads/transformers/maybe.py create mode 100644 tests/test_resultt.py diff --git a/monads/maybet.py b/monads/maybet.py new file mode 100644 index 0000000..1fd7eab --- /dev/null +++ b/monads/maybet.py @@ -0,0 +1,98 @@ +from typing import Any, Awaitable, Callable, Generic, Type, TypeVar + +from .monad import Monad +from . import maybe, future, result + +T = TypeVar("T") +S = TypeVar("S") +E = TypeVar("E") + + +class MaybeT(Monad[T]): + ... + + +class Maybe(MaybeT[T]): + def __init__(self, value: maybe.Maybe[maybe.Maybe[T]]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> Maybe[T]: + return Maybe(maybe.Maybe.pure(maybe.Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> Maybe[S]: + return Maybe(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Maybe[S]]) -> Maybe[S]: + def bind_inner(inner: maybe.Maybe[T]) -> maybe.Maybe[maybe.Maybe[S]]: + if isinstance(inner, maybe.Just): + return function(inner.value).value + else: + empty: maybe.Maybe[S] = maybe.Nothing() + return maybe.Maybe.pure(empty) + + return Maybe(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + +class Result(MaybeT[T], Generic[T, E]): + def __init__(self, value: result.Result[maybe.Maybe[T], E]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> Result[T, E]: + return Result(result.Result.pure(maybe.Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> Result[S, E]: + return Result(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Result[S, E]]) -> Result[S, E]: + def bind_inner(inner: maybe.Maybe[T]) -> result.Result[maybe.Maybe[S], E]: + if isinstance(inner, maybe.Just): + return function(inner.value).value + else: + empty: maybe.Maybe[S] = maybe.Nothing() + return result.Result.pure(empty) + + return Result(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + __rshift__ = bind + __and__ = lambda other, self: Result.apply(self, other) + __mul__ = __rmul__ = map + + +class Future(MaybeT[T]): + def __init__(self, value: Awaitable[maybe.Maybe[T]]) -> None: + self.value = future.Future(value) + + @classmethod + def pure(cls, value: T) -> Future[T]: + return Future(future.Future.pure(maybe.Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> Future[S]: + return Future(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Future[S]]) -> Future[S]: + def bind_inner(inner: maybe.Maybe[T]) -> future.Future[maybe.Maybe[S]]: + if isinstance(inner, maybe.Just): + return function(inner.value).value + else: + empty: maybe.Maybe[S] = maybe.Nothing() + return future.Future.pure(empty) + + return Future(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + def __await__(self) -> Maybe[T]: + return self.value.__await__() + + __rshift__ = bind + __and__ = lambda other, self: Future.apply(self, other) + __mul__ = __rmul__ = map diff --git a/monads/resultt.py b/monads/resultt.py new file mode 100644 index 0000000..47f34b1 --- /dev/null +++ b/monads/resultt.py @@ -0,0 +1,110 @@ +from __future__ import annotations +from typing import Any, Awaitable, Callable, Generic, Type, TypeVar + +from .monad import Monad +from . import maybe, future, result + +T = TypeVar("T") +S = TypeVar("S") +E = TypeVar("E") + + +class ResultT(Monad[T], Generic[T, E]): + ... + + +class Maybe(ResultT[T, E]): + def __init__(self, value: maybe.Maybe[result.Result[T, E]]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> Maybe[T, E]: + return Maybe(maybe.Maybe.pure(result.Result.pure(value))) + + def map(self, function: Callable[[T], S]) -> Maybe[S, E]: + return Maybe(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Maybe[S, E]]) -> Maybe[S, E]: + def bind_inner(inner: result.Result[T, E]) -> maybe.Maybe[result.Result[S, E]]: + if isinstance(inner, result.Ok): + return function(inner.value).value + elif isinstance(inner, result.Err): + return maybe.Maybe.pure(inner.err) + else: + raise TypeError + + return Maybe(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + __rshift__ = bind + __and__ = lambda other, self: Maybe.apply(self, other) + __mul__ = __rmul__ = map + + +class Result(ResultT[T, E]): + def __init__(self, value: result.Result[result.Result[T, E], E]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> Result[T, E]: + return Result(result.Result.pure(result.Result.pure(value))) + + def map(self, function: Callable[[T], S]) -> Result[S, E]: + return Result(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Result[S, E]]) -> Result[S, E]: + def bind_inner( + inner: result.Result[T, E] + ) -> result.Result[result.Result[S, E], E]: + if isinstance(inner, result.Ok): + return function(inner.value).value + elif isinstance(inner, result.Err): + return result.Result.pure(inner.err) + else: + raise TypeError + + return Result(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + __rshift__ = bind + __and__ = lambda other, self: Result.apply(self, other) + __mul__ = __rmul__ = map + + +class Future(ResultT[T, E]): + def __init__(self, value: Awaitable[result.Result[T, E]]) -> None: + self.value = future.Future(value) + + @classmethod + def pure(cls, value: T) -> Future[T, E]: + return Future(future.Future.pure(result.Result.pure(value))) + + def map(self, function: Callable[[T], S]) -> Future[S, E]: + return Future(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], Future[S, E]]) -> Future[S, E]: + def bind_inner( + inner: result.Result[T, E] + ) -> future.Future[result.Result[S, E]]: + if isinstance(inner, result.Ok): + return function(inner.value).value + elif isinstance(inner, result.Err): + return future.Future.pure(inner.err) + else: + raise TypeError + + return Future(self.value.bind(bind_inner)) + + def __repr__(self): + return f" {self.value}>" + + def __await__(self): + return self.value.__await__() + + __rshift__ = bind + __and__ = lambda other, self: Future.apply(self, other) + __mul__ = __rmul__ = map diff --git a/monads/transformers/__init__.py b/monads/transformers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/monads/transformers/maybe.py b/monads/transformers/maybe.py deleted file mode 100644 index b0c9e3e..0000000 --- a/monads/transformers/maybe.py +++ /dev/null @@ -1,92 +0,0 @@ -from typing import Any, Awaitable, Callable, Generic, Type, TypeVar - -from ..monad import Monad -from ..maybe import Maybe, Just, Nothing, transformer -from .. import future, list - -T = TypeVar("T") -S = TypeVar("S") -M = TypeVar("M", bound=Monad[Maybe]) -MM = TypeVar("MM", bound=Monad[Monad[Any]]) - - -class MonadTransformer(Monad[T]): - def __init__(self, value: Monad) -> None: - """Lift a wrapped Monad into the transformer.""" - self.value = value - - -class MaybeT(MonadTransformer[T], Generic[M, T]): - outer: Type[Monad[Maybe[T]]] = Monad - - def __init__(self, value: Monad[Maybe[T]]) -> None: - """Lift a wrapped Maybe into the transformer.""" - self.value = value - - @classmethod - def pure(cls, value: T) -> MaybeT[M, T]: - return MaybeT(cls.outer.pure(Maybe.pure(value))) - - def map(self, function: Callable[[T], S]) -> MaybeT[M, S]: - return MaybeT(self.value.map(lambda inner: inner.map(function))) - - def bind(self, function: Callable[[T], MaybeT[M, S]]) -> MaybeT[M, S]: - def bind_inner(inner: Maybe[T]) -> Monad[Maybe[S]]: - if isinstance(inner, Just): - return function(inner.value).value - else: - empty: Maybe[S] = Nothing() - return self.outer.pure(empty) - - return MaybeT(self.value.bind(bind_inner)) - - def __repr__(self): - return f"" - - -class MaybeMaybe(Monad[T]): - def __init__(self, value: Maybe[Maybe[T]]) -> None: - self.value = value - - @classmethod - def pure(cls, value: T) -> MaybeMaybe[T]: - return MaybeMaybe(Maybe.pure(Maybe.pure(value))) - - def map(self, function: Callable[[T], S]) -> MaybeMaybe[S]: - return MaybeMaybe(self.value.map(lambda inner: inner.map(function))) - - def bind(self, function: Callable[[T], MaybeMaybe[S]]) -> MaybeMaybe[S]: - def bind_inner(inner: Maybe[T]) -> Maybe[Maybe[S]]: - if isinstance(inner, Just): - return function(inner.value).value - else: - empty: Maybe[S] = Nothing() - return Maybe.pure(empty) - - return MaybeMaybe(self.value.bind(bind_inner)) - - def __repr__(self): - return f"" - - -class FutureMaybe(MaybeT[future.Future, T]): - outer = future.Future - - @classmethod - def pure(cls, value: T) -> MaybeT[future.Future, T]: - return FutureMaybe(future.Future.pure(Maybe.pure(value))) - - def __await__(self) -> Maybe[T]: - if isinstance(self.value, Awaitable): - return self.value.__await__() - else: - raise TypeError("Not awaitable") - - -ListMaybe = transformer(list.List) - -reveal_type(MaybeMaybe.pure(5)) -reveal_type(MaybeMaybe.pure(5).value) -reveal_type(FutureMaybe.pure(5)) -reveal_type(FutureMaybe.pure(5).value) -reveal_type(ListMaybe.pure(5)) diff --git a/tests/test_resultt.py b/tests/test_resultt.py new file mode 100644 index 0000000..f7fb92c --- /dev/null +++ b/tests/test_resultt.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import pytest # type: ignore + +from monads import resultt +from monads.future import Future +from monads.result import Result, Ok, Err + + +@pytest.mark.asyncio +async def test_bind_ok_future() -> None: + def get_thing() -> resultt.Future[int, Exception]: + return resultt.Future.pure(3) + + def do_thing(thing: int) -> resultt.Future[int, Exception]: + return resultt.Future.pure(thing + 3) + + expected: Result[int, Exception] = Ok(6) + assert expected == await (get_thing() >> do_thing) + + +@pytest.mark.asyncio +async def test_bind_ok_async() -> None: + async def get_thing() -> Result[int, Exception]: + return Result.pure(3) + + async def do_thing(thing: int) -> Result[int, Exception]: + return Result.pure(thing + 3) + + expected: Result[int, Exception] = Ok(6) + assert expected == await resultt.Future(get_thing()).bind( + lambda thing: resultt.Future(do_thing(thing)) + )