Source code for redpipe.tasks
# -*- coding: utf-8 -*-
"""
When sending commands to multiple redis backends in one redpipe.pipeline,
this module gives us an api to allow threaded async communication to those
different backends, improving parallelism.
Out of an abundance of caution, the SynchronousTask is the default.
I've used these patterns in my own applications before.
The threads were not true threads because I was using gevent.
But the AsynchronousTask is well tested and should work well.
"""
import sys
from six import reraise
import threading
__all__ = ['enable_threads', 'disable_threads']
class SynchronousTask(object):
"""
This is the default for now.
Just iterate through each backend sequentially.
Slow but reliable.
I'll make this a fallback once I feel confident in threaded behavior.
"""
def __init__(self, target, args=(), kwargs=None):
self._target = target
self._args = args
self._kwargs = kwargs
self._exc_info = None
self._result = None
def run(self):
# noinspection PyBroadException
try:
self._result = self._target(*self._args, **self._kwargs)
except Exception:
self._exc_info = sys.exc_info()
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
start = run
@property
def result(self):
if self._exc_info is not None:
reraise(*self._exc_info)
return self._result
class AsynchronousTask(threading.Thread):
"""
use threads to talk to multiple redis backends simulaneously.
Should decrease latency for the case when sending commands to multiple
redis backends in one `redpipe.pipeline`.
"""
def __init__(self, target, args=None, kwargs=None):
super(AsynchronousTask, self).__init__()
if args is None:
args = ()
if kwargs is None:
kwargs = {}
self._target = target
self._args = args
self._kwargs = kwargs
self._exc_info = None
self._result = None
def run(self):
# noinspection PyBroadException
try:
self._result = self._target(*self._args, **self._kwargs)
except Exception:
self._exc_info = sys.exc_info()
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
@property
def result(self):
if self.is_alive():
self.join()
if self._exc_info is not None:
reraise(*self._exc_info)
return self._result
class TaskManager(object):
"""
standardized interface for processing async vs synchronous tasks.
"""
task = SynchronousTask
@classmethod
def set_task_type(cls, task):
cls.task = task
@classmethod
def promise(cls, fn, *args, **kwargs):
task = cls.task(target=fn, args=args, kwargs=kwargs)
task.start()
return task
@classmethod
def wait(cls, *futures):
return [f.result for f in futures]
[docs]def enable_threads():
"""
used to enable threaded behavior when talking to multiple redis backends
in one pipeline execute call.
Otherwise we don't need it.
:return: None
"""
TaskManager.set_task_type(SynchronousTask)
[docs]def disable_threads():
"""
used to disable threaded behavior when talking to multiple redis backends
in one pipeline execute call.
Use this option if you are really concerned about python threaded behavior
in your application.
Doesn't apply if you are only ever talking to one redis backend at a time.
:return: None
"""
TaskManager.set_task_type(AsynchronousTask)