From 713f4b62aec6981dd5d466280288f750761e6463 Mon Sep 17 00:00:00 2001 From: Correl Roush Date: Tue, 22 Jan 2019 23:44:21 -0500 Subject: [PATCH] WIP: MaybeT --- monads/maybe.py | 64 ++++++++++++++++++++++- monads/monad.py | 3 ++ monads/transformers/__init__.py | 0 monads/transformers/maybe.py | 92 +++++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 monads/transformers/__init__.py create mode 100644 monads/transformers/maybe.py diff --git a/monads/maybe.py b/monads/maybe.py index 64b80a1..6eef6ab 100644 --- a/monads/maybe.py +++ b/monads/maybe.py @@ -1,6 +1,16 @@ from __future__ import annotations import functools -from typing import Any, Callable, Generic, Iterable, List, Optional, TypeVar +from typing import ( + Any, + Awaitable, + Callable, + Generic, + Iterable, + List, + Optional, + Type, + TypeVar, +) from . import result from .monad import Monad from .monoid import Monoid @@ -99,6 +109,58 @@ class Nothing(Maybe[T]): return "" +M = TypeVar("M", bound=Monad[Maybe]) + + +class MaybeT(Monad[T], Generic[M, T]): + def __init__(self, value: Monad[Maybe[T]]) -> None: + raise NotImplementedError + + +def transformer(outer: Type[M]) -> Type[MaybeT[M, T]]: + M = TypeVar("M", bound=Monad[Maybe]) + T = TypeVar("T") + + class _MaybeT(MaybeT[M, T]): + def __init__(self, value: Monad[Maybe[T]]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> MaybeT[M, T]: + return _MaybeT(outer.pure(Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> _MaybeT[M, S]: + return _MaybeT(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], _MaybeT[M, S]]) -> _MaybeT[M, S]: + def bind_inner(inner: Maybe[T]) -> Monad[Maybe[S]]: + if isinstance(inner, Just): + return function(inner.value).value + else: + empty: Maybe[S] = Nothing() + return outer.pure(empty) + + return _MaybeT(self.value.bind(bind_inner)) + + def __repr__(self): + return f"" + + if hasattr(outer, "__await__"): + + def __await__(self) -> Maybe[T]: + if isinstance(self.value, Awaitable): + return self.value.__await__() + else: + raise TypeError("Not awaitable") + + return _MaybeT + + +def transform(outer: Type[M], instance: Monad[Maybe[T]]) -> MaybeT[M, T]: + Transformer: Type[MaybeT[M, T]] = transformer(outer) + return Transformer(instance) + + def maybe(value: T, predicate: Optional[Callable[[T], bool]] = None) -> Maybe[T]: predicate = predicate or (lambda x: x is not None) if predicate(value): diff --git a/monads/monad.py b/monads/monad.py index 0c75171..8854f05 100644 --- a/monads/monad.py +++ b/monads/monad.py @@ -30,4 +30,7 @@ class Monad(Applicative[T]): raise NotImplementedError + def map(self, function: Callable[[T], S]) -> Monad[S]: # pragma: no cover + raise NotImplementedError + __rshift__ = bind diff --git a/monads/transformers/__init__.py b/monads/transformers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monads/transformers/maybe.py b/monads/transformers/maybe.py new file mode 100644 index 0000000..b0c9e3e --- /dev/null +++ b/monads/transformers/maybe.py @@ -0,0 +1,92 @@ +from typing import Any, Awaitable, Callable, Generic, Type, TypeVar + +from ..monad import Monad +from ..maybe import Maybe, Just, Nothing, transformer +from .. import future, list + +T = TypeVar("T") +S = TypeVar("S") +M = TypeVar("M", bound=Monad[Maybe]) +MM = TypeVar("MM", bound=Monad[Monad[Any]]) + + +class MonadTransformer(Monad[T]): + def __init__(self, value: Monad) -> None: + """Lift a wrapped Monad into the transformer.""" + self.value = value + + +class MaybeT(MonadTransformer[T], Generic[M, T]): + outer: Type[Monad[Maybe[T]]] = Monad + + def __init__(self, value: Monad[Maybe[T]]) -> None: + """Lift a wrapped Maybe into the transformer.""" + self.value = value + + @classmethod + def pure(cls, value: T) -> MaybeT[M, T]: + return MaybeT(cls.outer.pure(Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> MaybeT[M, S]: + return MaybeT(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], MaybeT[M, S]]) -> MaybeT[M, S]: + def bind_inner(inner: Maybe[T]) -> Monad[Maybe[S]]: + if isinstance(inner, Just): + return function(inner.value).value + else: + empty: Maybe[S] = Nothing() + return self.outer.pure(empty) + + return MaybeT(self.value.bind(bind_inner)) + + def __repr__(self): + return f"" + + +class MaybeMaybe(Monad[T]): + def __init__(self, value: Maybe[Maybe[T]]) -> None: + self.value = value + + @classmethod + def pure(cls, value: T) -> MaybeMaybe[T]: + return MaybeMaybe(Maybe.pure(Maybe.pure(value))) + + def map(self, function: Callable[[T], S]) -> MaybeMaybe[S]: + return MaybeMaybe(self.value.map(lambda inner: inner.map(function))) + + def bind(self, function: Callable[[T], MaybeMaybe[S]]) -> MaybeMaybe[S]: + def bind_inner(inner: Maybe[T]) -> Maybe[Maybe[S]]: + if isinstance(inner, Just): + return function(inner.value).value + else: + empty: Maybe[S] = Nothing() + return Maybe.pure(empty) + + return MaybeMaybe(self.value.bind(bind_inner)) + + def __repr__(self): + return f"" + + +class FutureMaybe(MaybeT[future.Future, T]): + outer = future.Future + + @classmethod + def pure(cls, value: T) -> MaybeT[future.Future, T]: + return FutureMaybe(future.Future.pure(Maybe.pure(value))) + + def __await__(self) -> Maybe[T]: + if isinstance(self.value, Awaitable): + return self.value.__await__() + else: + raise TypeError("Not awaitable") + + +ListMaybe = transformer(list.List) + +reveal_type(MaybeMaybe.pure(5)) +reveal_type(MaybeMaybe.pure(5).value) +reveal_type(FutureMaybe.pure(5)) +reveal_type(FutureMaybe.pure(5).value) +reveal_type(ListMaybe.pure(5))