Source code for redpipe.tasks

# -*- coding: utf-8 -*-
"""
When sending commands to multiple redis backends in one redpipe.pipeline,
this module gives us an api to allow threaded async communication to those
different backends, improving parallelism.

The AsynchronousTask is well tested and should work well.
But if you see any issues, you can easily disable this in your application.

.. code-block:: python

    redpipe.disable_threads()

Please report any `issues <https://github.com/72squared/redpipe/issues>`_.
"""

import threading

__all__ = ['enable_threads', 'disable_threads']


def reraise(tp, value, tb=None):
    if value is None:
        value = tp()
    if value.__traceback__ is not tb:
        raise value.with_traceback(tb)
    raise value


class SynchronousTask(object):
    """
    Iterate through each backend sequentially.
    Fallback method if you aren't comfortable with threads.
    """

    def __init__(self, target, args=None, kwargs=None):
        if args is None:
            args = ()
        if kwargs is None:
            kwargs = {}
        self._target = target
        self._args = args
        self._kwargs = kwargs
        self._exception = None
        self._result = None

    def run(self):
        # noinspection PyBroadException
        try:
            self._result = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self._exception = e
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

    start = run

    @property
    def result(self):
        if self._exception is not None:
            raise self._exception

        return self._result


class AsynchronousTask(threading.Thread):
    """
    use threads to talk to multiple redis backends simulaneously.
    Should decrease latency for the case when sending commands to multiple
    redis backends in one `redpipe.pipeline`.
    """

    def __init__(self, target, args=None, kwargs=None):
        super(AsynchronousTask, self).__init__()
        if args is None:
            args = ()
        if kwargs is None:
            kwargs = {}
        self._target = target
        self._args = args
        self._kwargs = kwargs
        self._exception = None
        self._result = None

    def run(self):
        # noinspection PyBroadException
        try:
            self._result = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self._exception = e
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

    @property
    def result(self):
        if self.is_alive():
            self.join()
        if self._exception is not None:
            raise self._exception

        return self._result


class TaskManager(object):
    """
    standardized interface for processing async vs synchronous tasks.
    """
    task = AsynchronousTask

    @classmethod
    def set_task_type(cls, task):
        cls.task = task

    @classmethod
    def promise(cls, fn, *args, **kwargs):
        """
        Used to build a task based on a callable function and the arguments.
        Kick it off and start execution of the task.

        :param fn: callable
        :param args: tuple
        :param kwargs: dict
        :return: SynchronousTask or AsynchronousTask
        """
        task = cls.task(target=fn, args=args, kwargs=kwargs)
        task.start()
        return task

    @classmethod
    def wait(cls, *tasks):
        """
        Wait for all tasks to finish completion.

        :param tasks: tulple of tasks, AsynchronousTask or SynchronousTask.
        :return: list of the results from each task.
        """
        return [f.result for f in tasks]


[docs]def enable_threads(): """ used to enable threaded behavior when talking to multiple redis backends in one pipeline execute call. Otherwise we don't need it. :return: None """ TaskManager.set_task_type(AsynchronousTask)
[docs]def disable_threads(): """ used to disable threaded behavior when talking to multiple redis backends in one pipeline execute call. Use this option if you are really concerned about python threaded behavior in your application. Doesn't apply if you are only ever talking to one redis backend at a time. :return: None """ TaskManager.set_task_type(SynchronousTask)