# -*- coding: utf-8 -*-
"""
This is where the magic happens.
The most important components of redpipe are here.
The Pipeline and NestedPipeline classes and the pipeline function enable
Use to pass pipeline functions into each other and attach redis calls to them.
The main function exposed here is the `pipeline` function.
You will use it everywhere, so get used to this syntax:
.. code-block:: python
def incr(name, pipe=None):
with redpipe.autoexec(pipe=pipe) as pipe:
return pipe.incr(name)
with redpipe.autoexec() as pipe:
a = incr('a', pipe=pipe)
b = incr('b', pipe=pipe)
print([a, b])
Look at the `incr` function.
The call to `redpipe.pipeline` will return a `Pipeline` object if None
is passed in. And if a Pipeline object is passed in, it will return a
`NestedPipeline` object. Those two objects present the same interface but
behave very differently.
`Pipeline` objects execute your pipelined calls.
`NestedPipeline` objects pass their commands up the chain to the parent
pipeline they wrap. This could be another `NestedPipeline` object, or
a Pipeline() object.
"""
from .futures import Future
from .connections import ConnectionManager
from .tasks import TaskManager
from .exceptions import InvalidPipeline
__all__ = [
'pipeline',
'autoexec',
]
class Pipeline(object):
"""
Wrapper for redispy pipeline object.
It returns a reference that contains a result
once the pipeline executes.
This allows us to be able to pipeline
lots of calls within nested functions
and not have to wait for the execute call.
Don't instantiate this class directly.
Instead, use the redpipe.pipeline(pipe) function which
will set up this object correctly.
"""
__slots__ = ['connection_name', 'autoexec', '_stack', '_callbacks',
'_pipelines']
def __init__(self, name, autoexec=False):
"""
Instantiate a new base pipeline object.
This pipeline will be responsible for executing all the others that
potentially get attached to it, including other named pipelines
and any commands from nested pipelines.
:param name: str The name of the connection
:param autoexec: bool, whether or not to implicitly execute the pipe.
"""
self.connection_name = name
self._stack = []
self._callbacks = []
self.autoexec = autoexec
self._pipelines = {}
def __getattr__(self, item):
"""
when you call a command like `pipeline().incr('foo')` it winds up here.
the item would be 'incr', because python can't find that attribute.
We build a custom function for it on the fly.
:param item: str, the name of the function we are wrapping.
:return: callable
"""
def command(*args, **kwargs):
"""
track all the arguments passed to this function along with the
function name (item). That way when pipe.execute() happens, we'll
be able to run it.
Return a Future object that will eventually contain the result
of a redis call.
:param args: array
:param kwargs: dict
:return: Future
"""
future = Future()
self._stack.append((item, args, kwargs, future))
return future
return command
@staticmethod
def supports_redpipe_pipeline():
return True
def _pipeline(self, name):
"""
Don't call this function directly.
Used by the NestedPipeline class when it executes.
:param name:
:return:
"""
if name == self.connection_name:
return self
try:
return self._pipelines[name]
except KeyError:
pipe = Pipeline(name=name, autoexec=True)
self._pipelines[name] = pipe
return pipe
def execute(self):
"""
Invoke the redispy pipeline.execute() method and take all the values
returned in sequential order of commands and map them to the
Future objects we returned when each command was queued inside
the pipeline.
Also invoke all the callback functions queued up.
:param raise_on_error: boolean
:return: None
"""
stack = self._stack
callbacks = self._callbacks
promises = []
if stack:
def process():
"""
take all the commands and pass them to redis.
this closure has the context of the stack
:return: None
"""
# get the connection to redis
pipe = ConnectionManager.get(self.connection_name)
# keep track of all the commands
call_stack = []
# build a corresponding list of the futures
futures = []
# we need to do this because we need to make sure
# all of these are callable.
# there shouldn't be any non-callables.
for item, args, kwargs, future in stack:
f = getattr(pipe, item)
if callable(f):
futures.append(future)
call_stack.append((f, args, kwargs))
# here's where we actually pass the commands to the
# underlying redis-py pipeline() object.
for f, args, kwargs in call_stack:
f(*args, **kwargs)
# execute the redis-py pipeline.
# map all of the results into the futures.
for i, v in enumerate(pipe.execute()):
futures[i].set(v)
promises.append(process)
# collect all the other pipelines for other named connections attached.
promises += [p.execute for p in self._pipelines.values()]
if len(promises) == 1:
promises[0]()
else:
# if there are no promises, this is basically a no-op.
TaskManager.wait(*[TaskManager.promise(p) for p in promises])
for cb in callbacks:
cb()
def __enter__(self):
"""
magic method to allow us to use in context like this:
with Pipeline(redis.StrictRedis().pipeline()) as pipe:
ref = pipe.set('foo', 'bar')
pipe.execute()
we are overriding the behavior in redispy.
:return: Pipeline instance
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
context manager cleanup method.
:param exc_type:
:param exc_val:
:param exc_tb:
:return:
"""
try:
if exc_type is None and self.autoexec:
self.execute()
finally:
self.reset()
def reset(self):
"""
cleanup method. get rid of the stack and callbacks.
:return:
"""
self._stack = []
self._callbacks = []
pipes = self._pipelines
self._pipelines = {}
for pipe in pipes.values():
pipe.reset()
def on_execute(self, callback):
"""
attach a callback to be called when the pipe finally executes.
:param callback:
:return:
"""
self._callbacks.append(callback)
def _inject_callbacks(self, callbacks):
self._callbacks[0:0] = callbacks
class NestedPipeline(object):
"""
Keep track of a parent pipeline object (either a `Pipeline` object or
another `NestedPipeline` object.
Queue the commands and pass them to the parent on execute.
Don't instantiate this class directly.
Instead, use the redpipe.pipeline(pipe) function which
will set up this object correctly.
"""
__slots__ = ['connection_name', 'parent', 'autoexec', '_stack',
'_callbacks']
def __init__(self, parent, name=None, autoexec=False):
"""
Similar interface to the Pipeline object, but with the ability
to also track a parent pipeline object.
:param parent: Pipeline() or NestedPipeline()
:param name: str, the name of the connection
:param autoexec: bool, implicitly call execute?
"""
self.connection_name = name
self.parent = parent
self._stack = []
self._callbacks = []
self.autoexec = autoexec
@staticmethod
def supports_redpipe_pipeline():
"""
used by the `redpipe.pipeline()` function to determine if it can be
nested inside other pipeline objects.
Do not call directly.
:return:
"""
return True
def __getattr__(self, item):
"""
when you call a command like `pipeline(pipe).incr('foo')` it
winds up here.
the item would be 'incr', because python can't find that
attribute.
We build a custom function for it on the fly.
:param item: str, the name of the function we are wrapping.
:return: callable
"""
def command(*args, **kwargs):
"""
track all the arguments passed to this function along with the
function name (item). That way when pipe.execute() happens, we'll
be able to run it.
Return a Future object that will eventually contain the result
of a redis call.
:param args: array
:param kwargs: dict
:return: Future
"""
future = Future()
self._stack.append((item, args, kwargs, future))
return future
return command
def _pipeline(self, name):
"""
Don't call directly.
Used by other NestedPipeline objects.
:param name:
:return:
"""
return getattr(self.parent, '_pipeline')(name)
def execute(self):
"""
execute the commands inside the nested pipeline.
This causes all queued up commands to be passed upstream to the
parent, including callbacks.
The state of this pipeline object gets cleaned up.
:return:
"""
stack = self._stack
callbacks = self._callbacks
self._stack = []
self._callbacks = []
deferred = []
build = self._nested_future
pipe = self._pipeline(self.connection_name)
for item, args, kwargs, ref in stack:
f = getattr(pipe, item)
deferred.append(build(f(*args, **kwargs), ref))
inject_callbacks = getattr(self.parent, '_inject_callbacks')
inject_callbacks(deferred + callbacks)
def on_execute(self, callback):
"""
same purpose as the Pipeline().on_execute() method.
In this case, it queues them so that when the nested pipeline
executes,
:param callback: callable
:return: None
"""
self._callbacks.append(callback)
def _inject_callbacks(self, callbacks):
self._callbacks[0:0] = callbacks
@staticmethod
def _nested_future(r, future):
"""
A utility function to map one future result into
another future via callback.
:param r:
:param future:
:return:
"""
def cb():
future.set(r.result)
return cb
def reset(self):
self._stack = []
self._callbacks = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if exc_type is None and self.autoexec:
self.execute()
finally:
self.reset()
[docs]def pipeline(pipe=None, name=None, autoexec=False):
"""
This is the foundational function for all of redpipe.
Everything goes through here.
create pipelines, nest pipelines, get pipelines for a specific name.
It all happens here.
Here's a simple example:
.. code:: python
with pipeline() as pipe:
pipe.set('foo', 'bar')
foo = pipe.get('foo')
pipe.execute()
print(foo)
> bar
Now let's look at how we can nest a pipeline.
.. code:: python
def process(key, pipe=None):
with pipeline(pipe, autoexec=True) as pipe:
return pipe.incr(key)
with pipeline() as pipe:
key1 = process('key1', pipe)
key2 = process('key2', pipe)
pipe.execute()
print([key1, key2])
> [1, 1]
:param pipe: a Pipeline() or NestedPipeline() object, or None
:param name: str, optional. the name of the connection to use.
:param autoexec: bool, if true, implicitly execute the pipe
:return: Pipeline or NestedPipeline
"""
if pipe is None:
return Pipeline(name=name, autoexec=autoexec)
try:
if pipe.supports_redpipe_pipeline():
return NestedPipeline(
parent=pipe,
name=name,
autoexec=autoexec)
except AttributeError:
pass
raise InvalidPipeline('check your configuration')
[docs]def autoexec(pipe=None, name=None):
"""
create a pipeline with a context that will automatically execute the
pipeline upon leaving the context if no exception was raised.
:param pipe:
:param name:
:return:
"""
return pipeline(pipe=pipe, name=name, autoexec=True)