redpipe package

Redpipe makes redis pipelines easier to use in python.

Usage:

import redpipe
import redis

redpipe.connect_redis(redis.Redis())
with redpipe.pipeline() as pipe:
    foo = pipe.incr('foo')
    bar = pipe.incr('bar)
    pipe.execute()
print([foo, bar])

Module Structure

This is the structure of the top level of the package, grouped by category.

Connections

  • connect_redis
  • disconnect
  • reset
  • pipeline
  • autoexec

Fields

  • IntegerField
  • FloatField
  • TextField
  • AsciiField
  • BinaryField
  • BooleanField
  • ListField
  • DictField’,
  • StringListField

Keyspaces

  • String
  • Set
  • List
  • SortedSet
  • Hash
  • HyperLogLog

Exceptions

  • Error
  • ResultNotReady
  • InvalidOperation
  • InvalidValue
  • AlreadyConnected
  • InvalidPipeline

Misc

  • Future
  • Struct
  • enable_threads
  • disable_threads

You shouldn’t need to import the submodules directly.

Submodules

redpipe.connections module

Bind instances of the redis-py client to redpipe. Assign named connections to be able to talk to multiple redis servers in your project.

The ConnectionManager is a singleton class.

These functions are all you will need to call from your code:

  • connect_redis
  • disconnect
  • reset

Everything else is for internal use.

redpipe.connections.connect_redis(redis_client, name=None, transaction=False) → None[source]

Connect your redis-py instance to redpipe.

Example:

redpipe.connect_redis(redis.Redis(), name='users')

Do this during your application bootstrapping.

You can also pass a redis cluster instance to this method.

redpipe.connect_redis(redis.RedisCluster(), name='users')

You are allowed to pass in any redis-py client instance.

redpipe.connect_redis(redis.Redis(), name='a')
redpipe.connect_redis(redis.RedisCluster(...), name='b')
Parameters:
  • redis_client
  • name – nickname you want to give to your connection.
  • transaction
Returns:

redpipe.connections.disconnect(name: Optional[str] = None) → None[source]

remove a connection by name. If no name is passed in, it assumes default.

redpipe.disconnect('users')
redpipe.disconnect()

Useful for testing.

Parameters:name
Returns:None
redpipe.connections.reset() → None[source]

remove all connections.

redpipe.reset()

Useful for testing scenarios.

Not sure when you’d want to call this explicitly unless you need an explicit teardown of your application. In most cases, python garbage collection will do the right thing on shutdown and close all the redis connections.

Returns:None

redpipe.exceptions module

This module contains the set of all of redpipe exceptions.

exception redpipe.exceptions.Error[source]

Bases: Exception

Base class for all redpipe errors

exception redpipe.exceptions.ResultNotReady[source]

Bases: redpipe.exceptions.Error

Raised when you access a data from a Future before it is assigned.

exception redpipe.exceptions.InvalidOperation[source]

Bases: redpipe.exceptions.Error

Raised when trying to perform an operation disallowed by the redpipe api.

exception redpipe.exceptions.InvalidValue[source]

Bases: redpipe.exceptions.Error

Raised when data assigned to a field is the wrong type

exception redpipe.exceptions.AlreadyConnected[source]

Bases: redpipe.exceptions.Error

raised when you try to connect and change the ORM connection without explicitly disconnecting first.

exception redpipe.exceptions.InvalidPipeline[source]

Bases: redpipe.exceptions.Error

raised when you try to use a pipeline that isn’t configured correctly.

redpipe.fields module

A module for marshalling data in and out of redis and back into the python data type we expect.

Used extensively in the redpipe.keyspaces module for type-casting keys and values.

class redpipe.fields.IntegerField[source]

Bases: object

Used for integer numeric fields.

classmethod decode(value: Optional[bytes]) → Optional[int][source]

read bytes from redis and turn it back into an integer.

Parameters:value – bytes
Returns:int
classmethod encode(value: int) → bytes[source]

take an integer and turn it into a string representation to write into redis.

Parameters:value – int
Returns:str
class redpipe.fields.FloatField[source]

Bases: object

Numeric field that supports integers and floats (values are turned into floats on load from persistence).

classmethod decode(value: Optional[bytes]) → Optional[float][source]

decode the bytes from redis back into a float

Parameters:value – bytes
Returns:float
classmethod encode(value: float) → bytes[source]

encode a floating point number to bytes in redis

Parameters:value – float
Returns:bytes
class redpipe.fields.TextField[source]

Bases: object

A unicode string field.

Encoded via utf-8 before writing to persistence.

classmethod decode(value: Optional[bytes]) → Optional[str][source]

take bytes from redis and turn them into unicode string

Parameters:value
Returns:
classmethod encode(value: str) → bytes[source]

take a valid unicode string and turn it into utf-8 bytes

Parameters:value – unicode, str
Returns:bytes
class redpipe.fields.AsciiField[source]

Bases: redpipe.fields.TextField

Used for ascii-only text

PATTERN = re.compile('^([ -~]+)?$')
classmethod encode(value: str) → bytes[source]

take a list of strings and turn it into utf-8 byte-string

Parameters:value
Returns:
class redpipe.fields.BinaryField[source]

Bases: object

A bytes field. Not encoded.

classmethod decode(value: Optional[bytes]) → Optional[bytes][source]

read binary data from redis and pass it on through.

Parameters:value – bytes
Returns:bytes
classmethod encode(value: bytes) → bytes[source]

write binary data into redis without encoding it.

Parameters:value – bytes
Returns:bytes
class redpipe.fields.BooleanField[source]

Bases: object

Used for boolean fields.

classmethod decode(value: Optional[bytes]) → Optional[bool][source]

convert from redis bytes into a boolean value

Parameters:value – bytes
Returns:bool
classmethod encode(value: bool) → bytes[source]

convert a boolean value into something we can persist to redis. An empty string is the representation for False.

Parameters:value – bool
Returns:bytes
classmethod is_true(val)[source]
class redpipe.fields.ListField[source]

Bases: object

A list field. Marshalled in and out of redis via json. Values of the list can be any arbitrary data.

classmethod decode(value: Union[bytes, None, list]) → Optional[list][source]

take a utf-8 encoded byte-string from redis and turn it back into a list

Parameters:value – bytes
Returns:list
classmethod encode(value: list) → bytes[source]

take a list and turn it into a utf-8 encoded byte-string for redis.

Parameters:value – list
Returns:bytes
class redpipe.fields.DictField[source]

Bases: object

classmethod decode(value: Union[bytes, None, dict]) → Optional[dict][source]

decode the data from a json string in redis back into a dict object.

Parameters:value – bytes
Returns:dict
classmethod encode(value: dict) → bytes[source]

encode the dict as a json string to be written into redis.

Parameters:value – dict
Returns:bytes
class redpipe.fields.StringListField[source]

Bases: object

Used for storing a list of strings, serialized as a comma-separated list.

classmethod decode(value: Union[bytes, None, List[str]]) → Optional[List[str]][source]

decode the data from redis. :param value: bytes :return: list

classmethod encode(value: List[str]) → bytes[source]

encode the list it so it can be stored in redis.

Parameters:value – list
Returns:bytes
class redpipe.fields.Field(*args, **kwargs)[source]

Bases: typing_extensions.Protocol, typing.Generic

classmethod decode(value: Optional[bytes]) → Optional[T][source]
classmethod encode(value: T) → bytes[source]

redpipe.futures module

The Future() object in RedPipe gives us the ability to make the pipeline interface of redis-py look like the non-pipelined interface. You call a command and get a response back. Only the response is not the actual data. It is an empty container called a Future. There is a callback attached to that empty container. When the pipeine is executed, the pipeline injects the response into the container.

This Future container is a very special kind of python object. It can imitate anything it contains. If there is an integer inside, it behaves like an integer. If it holds a dictionary, it behaves like a dictionary. If it holds a list, it behaves like a list. Your application should be able to use it interchangeably.

There are a few gotchas to watch out for:

  • isinstance() checks
  • identity checks like: future is None
  • trying to mutate the object like this: future += 1

You can always type cast the object into the type you expect if you need this behavior.

f = Future()
f.set(1)

# f is 1 fails
assert(int(f) is 1)

This doesn’t work so well for is None checks. You can use equality checks though. Or you can use our handy IS method. Or you can access the underlying result

f = Future()
f.set(None)

assert(f == None)
assert(f.IS(None))
assert(f.result is None)

Hope that helps.

Other than those few caveats, you should be able to access a future object just like the underlying result.

Here are some examples if your result is numeric.

future = Future()
future.set(1)
assert(future == 1)
assert(future != 2)
assert(bool(future))
assert(float(future) == 1.0)
assert(future + 1 == 2)
assert(future * 2 == 2)
assert(future ^ 1 == 0)
assert(repr(future) == '1')

And here is an example if your future is a list:

future = Future()
future.set([1])
assert(future == [1])
assert([v for v in future] == [1])
assert(future + [2] == [1, 2])

And here is a dictionary:

future = Future()
future.set({'a': 1})
assert(future == {'a': 1})
assert(dict(future) == {'a': 1})
assert({k: v for k, v in future.items()} == {'a': 1})

There are many more operations supported but these are the most common. Let me know if you need more examples or explanation.

class redpipe.futures.Future[source]

Bases: typing.Generic

An object returned from all our Pipeline calls.

IS(other) → bool[source]

Allows you to do identity comparisons on the underlying object.

Parameters:other – Mixed
Returns:bool
id()[source]

Get the object id of the underlying result.

isinstance(other) → bool[source]

allows you to check the instance type of the underlying result.

Parameters:other
Returns:
result

Get the underlying result. Usually one of the data types returned by redis-py.

Returns:None, str, int, list, set, dict
set(data: T)[source]

Write the data into the object. Note that I intentionally did not declare result in the constructor. I want an error to happen if you try to access it before it is set.

Parameters:data – any python object
Returns:None
redpipe.futures.IS(instance, other)[source]

Support the future is other use-case. Can’t override the language so we built a function. Will work on non-future objects too.

Parameters:
  • instance – future or any python object
  • other – object to compare.
Returns:

redpipe.futures.ISINSTANCE(instance, A_tuple)[source]

Allows you to do isinstance checks on futures. Really, I discourage this because duck-typing is usually better. But this can provide you with a way to use isinstance with futures. Works with other objects too.

Parameters:
  • instance
  • A_tuple
Returns:

redpipe.keyspaces module

This module provides a way to access keys grouped under a certain keyspace. A keyspace is a convention used often in redis where many keys are grouped logically together. In the SQL world, you could think of this as a table. But in redis each key is independent whereas a record in a table is controlled by the schema.

Examples of a group of keys in a keyspace:

  • user{A}
  • user{B}
  • user{C}

It is inconvient to refer to keys this way. The identifiers for our user records are A, B, C. In addition, we usually know that a user record is always a redis hash. And we know that it has certain fields that have different data types.

These keyspace classes in this module allow you to easily manipulate these keys.

redpipe.connect_redis(redis.Redis(
        # connection params go here.
    ), name='user_redis_db')

class User(redpipe.Hash):
    keyspace = 'user'
    fields = {
        'name': redpipe.TextField,
        'created_at': redpipe.TextField,
    }
    connection = 'user_redis_db'


user_a = User().hgetall('A')

This Keyspace object exposes all the hash-related redis commands as normal. Internally, it rewrites the key name to be ‘user{A}’ for you automatically. You can pass in a pipeline to the constructor. No matter what pipeline you pass in, it routes your commands to the user_redis_db that you set up.

There’s also support for character encoding and complex data types.

class redpipe.keyspaces.String(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a String key in Redis.

append(name: str, value: str) → redpipe.futures.Future[source]

Appends the string value to the value at key. If key doesn’t already exist, create it with a value of value. Returns the new length of the value at key.

Parameters:
  • name – str the name of the redis key
  • value – str
Returns:

Future()

bitcount(name, start=None, end=None) → redpipe.futures.Future[source]

Returns the count of set bits in the value of key. Optional start and end paramaters indicate which bytes to consider

Parameters:
  • name – str the name of the redis key
  • start – int
  • end – int
Returns:

Future()

get(name: str) → redpipe.futures.Future[str][str][source]

Return the value of the key or None if the key doesn’t exist

Parameters:name – str the name of the redis key
Returns:Future()
getbit(name: str, offset: int) → redpipe.futures.Future[source]

Returns a boolean indicating the value of offset in key

Parameters:
  • name – str the name of the redis key
  • offset – int
Returns:

Future()

incr(name: str, amount: int = 1) → redpipe.futures.Future[source]

increment the value for key by 1

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

incrby(name: str, amount: int = 1) → redpipe.futures.Future[source]

increment the value for key by value: int

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

incrbyfloat(name: str, amount: float = 1.0) → redpipe.futures.Future[source]

increment the value for key by value: float

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

mget(keys: Union[str, List[str]], *args) → redpipe.futures.Future[source]

Returns a list of values ordered identically to keys

psetex(name: str, value: str, time_ms: int) → redpipe.futures.Future[source]

Set the value of key name to value that expires in time_ms milliseconds. time_ms can be represented by an integer or a Python timedelta object

set(name: str, value: str, ex: Optional[int] = None, px: Optional[int] = None, nx: bool = False, xx: bool = False) → redpipe.futures.Future[source]

Set the value at key name to value

ex sets an expire flag on key name for ex seconds.

px sets an expire flag on key name for px milliseconds.

nx if set to True, set the value at key name to value if it does not already exist.

xx if set to True, set the value at key name to value if it already exists.

Returns:Future()
setbit(name: str, offset: int, value: str) → redpipe.futures.Future[source]

Flag the offset in the key as value. Returns a boolean indicating the previous value of offset.

Parameters:
  • name – str the name of the redis key
  • offset – int
  • value
Returns:

Future()

setex(name: str, value: str, time: int) → redpipe.futures.Future[source]

Set the value of key to value that expires in time seconds. time can be represented by an integer or a Python timedelta object.

Parameters:
  • name – str the name of the redis key
  • value – str
  • time – secs
Returns:

Future()

setnx(name: str, value: str) → int[source]

Set the value as a string in the key only if the key doesn’t exist.

Parameters:
  • name – str the name of the redis key
  • value
Returns:

Future()

setrange(name: str, offset: int, value: str) → redpipe.futures.Future[source]

Overwrite bytes in the value of name starting at offset with value. If offset plus the length of value exceeds the length of the original value, the new value will be larger than before. If offset exceeds the length of the original value, null bytes will be used to pad between the end of the previous value and the start of what’s being injected.

Returns the length of the new string. :param name: str the name of the redis key :param offset: int :param value: str :return: Future()

strlen(name: str) → redpipe.futures.Future[source]

Return the number of bytes stored in the value of the key

Parameters:name – str the name of the redis key
Returns:Future()
substr(name: str, start: int, end: int = -1) → redpipe.futures.Future[str][str][source]

Return a substring of the string at key name. start and end are 0-based integers specifying the portion of the string to return.

Parameters:
  • name – str the name of the redis key
  • start – int
  • end – int
Returns:

Future()

class redpipe.keyspaces.HashedString(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: object

classmethod core(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]
delete(key: str, *args) → redpipe.futures.Future[int][int][source]
get(key: str) → redpipe.futures.Future[source]

Return the value of the key or None if the key doesn’t exist

Parameters:key – str the name of the redis key
Returns:Future()
incr(name: str, amount: int = 1) → redpipe.futures.Future[source]

increment the value for key by 1

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

incrby(name: str, amount: int = 1) → redpipe.futures.Future[source]

increment the value for key by value: int

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

incrbyfloat(name: str, amount: float = 1.0) → redpipe.futures.Future[source]

increment the value for key by value: float

Parameters:
  • name – str the name of the redis key
  • amount – int
Returns:

Future()

mget(keys: Union[str, List[str]], *args) → redpipe.futures.Future[source]

Returns a list of values ordered identically to keys

pipe

Get a fresh pipeline() to be used in a with block.

Returns:Pipeline or NestedPipeline with autoexec set to true.
scan_iter(match=None, count=None) → Iterable[T_co][source]

Make an iterator using the hscan command so that the client doesn’t need to remember the cursor position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

set(name: str, value: str, nx: bool = False) → redpipe.futures.Future[source]

Set the value at key name to value

nx if set to True, set the value at key name to value if it does not already exist.

Returns:Future()
setnx(name, value) → redpipe.futures.Future[source]

Set the value as a string in the key only if the key doesn’t exist.

Parameters:
  • name – str the name of the redis key
  • value
Returns:

Future()

classmethod shard(key: str)[source]
shard_count = 64
strlen(name: str) → redpipe.futures.Future[source]

Return the number of bytes stored in the value of the key

Parameters:name – str the name of the redis key
Returns:Future()
class redpipe.keyspaces.Set(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a Set key in redis.

sadd(name: str, values: Union[str, List[str]], *args) → redpipe.futures.Future[source]

Add the specified members to the Set.

Parameters:
  • name – str the name of the redis key
  • values – a list of values or a simple value.
Returns:

Future()

scard(name: str) → redpipe.futures.Future[source]

How many items in the set?

Parameters:name – str the name of the redis key
Returns:Future()
sdiff(keys: Union[str, List[str]], *args) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Return the difference of sets specified by keys

Parameters:
  • keys – list
  • args – tuple
Returns:

Future()

sdiffstore(dest: str, *keys) → redpipe.futures.Future[source]

Store the difference of sets specified by keys into a new set named dest. Returns the number of keys in the new set.

sinter(keys, *args) → redpipe.futures.Future[typing.List[str]][List[str]][source]

Return the intersection of sets specified by keys

Parameters:
  • keys – list or str
  • args – tuple
Returns:

Future

sinterstore(dest: str, keys: Union[str, List[str]], *args) → redpipe.futures.Future[source]

Store the intersection of sets specified by keys into a new set named dest. Returns the number of keys in the new set.

sismember(name: str, value: str) → redpipe.futures.Future[source]

Is the provided value is in the Set?

Parameters:
  • name – str the name of the redis key
  • value – str
Returns:

Future()

smembers(name: str) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

get the set of all members for key

Parameters:name – str the name of the redis key
Returns:
spop(name: str) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Remove and return (pop) a random element from the Set.

Parameters:name – str the name of the redis key
Returns:Future()
srandmember(name: str, number: Optional[int] = None) → redpipe.futures.Future[typing.Any][Any][source]

Return a random member of the set.

Parameters:
  • name – str the name of the redis key
  • number – optional int
Returns:

Future()

srem(name: str, *values) → redpipe.futures.Future[source]

Remove the values from the Set if they are present.

Parameters:
  • name – str the name of the redis key
  • values – a list of values or a simple value.
Returns:

Future()

sscan(name: str, cursor: int = 0, match: Optional[str] = None, count: Optional[int] = None) → redpipe.futures.Future[typing.Tuple[int, typing.List[typing.Any]]][Tuple[int, List[Any]]][source]

Incrementally return lists of elements in a set. Also return a cursor indicating the scan position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

Parameters:
  • name – str the name of the redis key
  • cursor – int
  • match – str
  • count – int
sscan_iter(name: str, match: Optional[str] = None, count: Optional[int] = None) → Iterable[T_co][source]

Make an iterator using the SSCAN command so that the client doesn’t need to remember the cursor position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

Parameters:
  • name – str the name of the redis key
  • match – str
  • count – int
sunion(keys: Union[str, List[str]], *args) → redpipe.futures.Future[typing.List[str]][List[str]][source]

Return the union of sets specified by keys

Parameters:
  • keys – list or str
  • args – tuple
Returns:

Future()

sunionstore(dest: str, keys: Union[str, List[str]], *args) → redpipe.futures.Future[source]

Store the union of sets specified by keys into a new set named dest. Returns the number of members in the new set.

class redpipe.keyspaces.List(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a List key in redis

blpop(keys: Union[str, List[str]], timeout: int = 0) → redpipe.futures.Future[typing.Union[typing.Tuple[str, typing.Any], NoneType]][Optional[Tuple[str, Any]]][source]

LPOP a value off of the first non-empty list named in the keys list.

If none of the lists in keys has a value to LPOP, then block for timeout seconds, or until a value gets pushed on to one of the lists.

If timeout is 0, then block indefinitely.

brpop(keys: Union[str, List[str]], timeout: int = 0) → redpipe.futures.Future[typing.Union[typing.Tuple[str, typing.Any], NoneType]][Optional[Tuple[str, Any]]][source]

RPOP a value off of the first non-empty list named in the keys list.

If none of the lists in keys has a value to LPOP, then block for timeout seconds, or until a value gets pushed on to one of the lists.

If timeout is 0, then block indefinitely.

brpoplpush(src: str, dst: str, timeout: int = 0) → redpipe.futures.Future[typing.Union[typing.Tuple[str, typing.Any], NoneType]][Optional[Tuple[str, Any]]][source]

Pop a value off the tail of src, push it on the head of dst and then return it.

This command blocks until a value is in src or until timeout seconds elapse, whichever is first. A timeout value of 0 blocks forever.

lindex(name: str, index: int) → redpipe.futures.Future[typing.Any][Any][source]

Return the value at the index idx

Parameters:
  • name – str the name of the redis key
  • index – the index to fetch the value.
Returns:

Future()

llen(name: str) → redpipe.futures.Future[int][int][source]

Returns the length of the list.

Parameters:name – str the name of the redis key
Returns:Future()
lpop(name: str) → redpipe.futures.Future[typing.Any][Any][source]

Pop the first object from the left.

Parameters:name – str the name of the redis key
Returns:Future()
lpush(name: str, *values) → redpipe.futures.Future[int][int][source]

Push the value into the list from the left side

Parameters:
  • name – str the name of the redis key
  • values – a list of values or single value to push
Returns:

Future()

lrange(name: str, start: int, stop: int) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns a range of items.

Parameters:
  • name – str the name of the redis key
  • start – integer representing the start index of the range
  • stop – integer representing the size of the list.
Returns:

Future()

lrem(name: str, value: str, num: int = 1) → redpipe.futures.Future[int][int][source]

Remove first occurrence of value.

Can’t use redis-py interface. It’s inconstistent between redis.Redis and redis.Redis in terms of the kwargs. Better to use the underlying execute_command instead.

Parameters:
  • name – str the name of the redis key
  • num
  • value
Returns:

Future()

lset(name: str, index: int, value: str) → redpipe.futures.Future[source]

Set the value in the list at index idx

Parameters:
  • name – str the name of the redis key
  • index
  • value
Returns:

Future()

ltrim(name: str, start: int, end: int) → redpipe.futures.Future[source]

Trim the list from start to end.

Parameters:
  • name – str the name of the redis key
  • start
  • end
Returns:

Future()

rpop(name: str) → redpipe.futures.Future[typing.Any][Any][source]

Pop the first object from the right.

Parameters:name – str the name of the redis key
Returns:the popped value.
rpoplpush(src: str, dst: str) → redpipe.futures.Future[typing.Any][Any][source]

RPOP a value off of the src list and atomically LPUSH it on to the dst list. Returns the value.

rpush(name: str, *values) → redpipe.futures.Future[source]

Push the value into the list from the right side

Parameters:
  • name – str the name of the redis key
  • values – a list of values or single value to push
Returns:

Future()

class redpipe.keyspaces.SortedSet(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a SortedSet key in redis.

zadd(name: str, members: Union[str, List[str]], score: float = 1.0, nx: bool = False, xx: bool = False, ch: bool = False, incr: bool = False) → redpipe.futures.Future[source]

Add members in the set and assign them the score.

Parameters:
  • name – str the name of the redis key
  • members – a list of item or a single item
  • score – the score the assign to the item(s)
  • nx
  • xx
  • ch
  • incr
Returns:

Future()

zcard(name: str) → redpipe.futures.Future[int][int][source]

Returns the cardinality of the SortedSet.

Parameters:name – str the name of the redis key
Returns:Future()
zcount(name: str, min: float, max: float) → redpipe.futures.Future[int][int][source]

Returns the number of elements in the sorted set at key name with a score between min and max.

Parameters:
  • name – str
  • min – float
  • max – float
Returns:

Future()

zincrby(name: str, value: Any, amount: float = 1.0) → redpipe.futures.Future[source]

Increment the score of the item by value

Parameters:
  • name – str the name of the redis key
  • value
  • amount
Returns:

zlexcount(name: str, min: float, max: float) → redpipe.futures.Future[int][int][source]

Return the number of items in the sorted set between the lexicographical range min and max.

Parameters:
  • name – str the name of the redis key
  • min – int or ‘-inf’
  • max – int or ‘+inf’
Returns:

Future()

zrange(name: str, start: int, end: int, desc: bool = False, withscores: bool = False, score_cast_func: Callable = <class 'float'>) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns all the elements including between start (non included) and stop (included).

Parameters:
  • name – str the name of the redis key
  • start
  • end
  • desc
  • withscores
  • score_cast_func
Returns:

zrangebylex(name: str, min: float, max: float, start: Optional[int] = None, num: Optional[int] = None) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Return the lexicographical range of values from sorted set name between min and max.

If start and num are specified, then return a slice of the range.

Parameters:
  • name – str the name of the redis key
  • min – int or ‘-inf’
  • max – int or ‘+inf’
  • start – int
  • num – int
Returns:

Future()

zrangebyscore(name: str, min: float, max: float, start: Optional[int] = None, num: Optional[int] = None, withscores: bool = False, score_cast_func: Callable = <class 'float'>) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns the range of elements included between the scores (min and max)

Parameters:
  • name – str the name of the redis key
  • min
  • max
  • start
  • num
  • withscores
  • score_cast_func
Returns:

Future()

zrank(name: str, value: str) → redpipe.futures.Future[int][int][source]

Returns the rank of the element.

Parameters:
  • name – str the name of the redis key
  • value – the element in the sorted set
zrem(name: str, *values) → redpipe.futures.Future[source]

Remove the values from the SortedSet

Parameters:
  • name – str the name of the redis key
  • values
Returns:

True if at least one value is successfully removed, False otherwise

zremrangebylex(name: str, min: float, max: float) → redpipe.futures.Future[int][int][source]

Remove all elements in the sorted set between the lexicographical range specified by min and max.

Returns the number of elements removed. :param name: str the name of the redis key :param min: int or -inf :param max: into or +inf :return: Future()

zremrangebyrank(name: str, min: float, max: float) → redpipe.futures.Future[int][int][source]

Remove a range of element between the rank start and stop both included.

Parameters:
  • name – str the name of the redis key
  • min
  • max
Returns:

Future()

zremrangebyscore(name: str, min: int, max: int) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Remove a range of element by between score min_value and max_value both included.

Parameters:
  • name – str the name of the redis key
  • min
  • max
Returns:

Future()

zrevrange(name: str, start: int, end: int, withscores: bool = False, score_cast_func: Callable = <class 'float'>) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns the range of items included between start and stop in reverse order (from high to low)

Parameters:
  • name – str the name of the redis key
  • start
  • end
  • withscores
  • score_cast_func
Returns:

zrevrangebylex(name: str, max: float, min: float, start: Optional[int] = None, num: Optional[int] = None) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]
Return the reversed lexicographical range of values from the sorted set
between max and min.

If start and num are specified, then return a slice of the range.

Parameters:
  • name – str the name of the redis key
  • max – int or ‘+inf’
  • min – int or ‘-inf’
  • start – int
  • num – int
Returns:

Future()

zrevrangebyscore(name: str, max: float, min: float, start: Optional[int] = None, num: Optional[int] = None, withscores: bool = False, score_cast_func: Callable = <class 'float'>) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns the range of elements between the scores (min and max).

If start and num are specified, then return a slice of the range.

withscores indicates to return the scores along with the values. The return type is a list of (value, score) pairs

score_cast_func` a callable used to cast the score return value

Parameters:
  • name – str the name of the redis key
  • max – int
  • min – int
  • start – int
  • num – int
  • withscores – bool
  • score_cast_func
Returns:

Future()

zrevrank(name: str, value: str)[source]

Returns the ranking in reverse order for the member

Parameters:
  • name – str the name of the redis key
  • value – str
zscan(name: str, cursor: int = 0, match: Optional[str] = None, count: Optional[int] = None, score_cast_func: Callable = <class 'float'>) → redpipe.futures.Future[typing.Tuple[int, typing.List[typing.Tuple[str, typing.Any]]]][Tuple[int, List[Tuple[str, Any]]]][source]

Incrementally return lists of elements in a sorted set. Also return a cursor indicating the scan position.

match allows for filtering the members by pattern

count allows for hint the minimum number of returns

score_cast_func a callable used to cast the score return value

zscan_iter(name: str, match: Optional[str] = None, count: Optional[int] = None, score_cast_func: Callable = <class 'float'>) → Iterable[Tuple[str, Any]][source]

Make an iterator using the ZSCAN command so that the client doesn’t need to remember the cursor position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

score_cast_func a callable used to cast the score return value

zscore(name: str, value: Any) → redpipe.futures.Future[float][float][source]

Return the score of an element

Parameters:
  • name – str the name of the redis key
  • value – the element in the sorted set key
Returns:

Future()

zunionstore(dest: str, keys: List[str], aggregate: Optional[str] = None) → redpipe.futures.Future[int][int][source]

Union multiple sorted sets specified by keys into a new sorted set, dest. Scores in the destination will be aggregated based on the aggregate, MIN, MAX, or SUM if none is provided.

class redpipe.keyspaces.Hash(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a Hash key in Redis.

fields = {}
hdel(name: str, *keys) → redpipe.futures.Future[int][int][source]

Delete one or more hash field.

Parameters:
  • name – str the name of the redis key
  • keys – on or more members to remove from the key.
Returns:

Future()

hexists(name: str, key: str) → redpipe.futures.Future[bool][bool][source]

Returns True if the field exists, False otherwise.

Parameters:
  • name – str the name of the redis key
  • key – the member of the hash
Returns:

Future()

hget(name: str, key: str) → redpipe.futures.Future[typing.Any][Any][source]

Returns the value stored in the field, None if the field doesn’t exist.

Parameters:
  • name – str the name of the redis key
  • key – the member of the hash
Returns:

Future()

hgetall(name: str) → redpipe.futures.Future[typing.Dict[str, typing.Any]][Dict[str, Any]][source]

Returns all the fields and values in the Hash.

Parameters:name – str the name of the redis key
Returns:Future()
hincrby(name: str, key: str, amount: int = 1) → redpipe.futures.Future[int][int][source]

Increment the value of the field.

Parameters:
  • name – str the name of the redis key
  • key – str
  • amount – int
Returns:

Future()

hincrbyfloat(name: str, key: str, amount: float = 1.0) → redpipe.futures.Future[float][float][source]

Increment the value of the field.

Parameters:
  • name – str the name of the redis key
  • key – the name of the emement in the hash
  • amount – float
Returns:

Future()

hkeys(name: str) → redpipe.futures.Future[typing.List[str]][List[str]][source]

Returns all fields name in the Hash.

Parameters:name – str the name of the redis key
Returns:Future
hlen(name: str) → redpipe.futures.Future[int][int][source]

Returns the number of elements in the Hash.

Parameters:name – str the name of the redis key
Returns:Future()
hmget(name: str, keys: List[str], *args) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns the values stored in the fields.

Parameters:
  • name – str the name of the redis key
  • keys
Returns:

Future()

hmset(name: str, mapping: Dict[str, Any]) → redpipe.futures.Future[NoneType][None][source]

Sets or updates the fields with their corresponding values.

Parameters:
  • name – str the name of the redis key
  • mapping – a dict with keys and values
Returns:

Future()

hscan(name: str, cursor: int = 0, match: Optional[str] = None, count: Optional[int] = None) → redpipe.futures.Future[typing.Tuple[int, typing.Dict[str, typing.Any]]][Tuple[int, Dict[str, Any]]][source]

Incrementally return key/value slices in a hash. Also return a cursor indicating the scan position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

hscan_iter(name: str, match: Optional[str] = None, count: Optional[int] = None) → Iterable[Tuple[str, Any]][source]

Make an iterator using the HSCAN command so that the client doesn’t need to remember the cursor position.

match allows for filtering the keys by pattern

count allows for hint the minimum number of returns

hset(name: str, key: str, value: Any) → redpipe.futures.Future[int][int][source]

Set member in the Hash at value.

Parameters:
  • name – str the name of the redis key
  • value
  • key – the member of the hash key
Returns:

Future()

hsetnx(name: str, key: str, value: Any) → redpipe.futures.Future[int][int][source]

Set member in the Hash at value.

Parameters:
  • name – str the name of the redis key
  • value
  • key
Returns:

Future()

hstrlen(name: str, key: str) → redpipe.futures.Future[source]

Return the number of bytes stored in the value of key within hash name

hvals(name: str) → redpipe.futures.Future[typing.List[typing.Any]][List[Any]][source]

Returns all the values in the Hash Unfortunately we can’t type cast these fields. it is a useless call anyway imho.

Parameters:name – str the name of the redis key
Returns:Future()
memberparse

alias of redpipe.fields.TextField

class redpipe.keyspaces.HyperLogLog(pipe: Optional[redpipe.pipelines.PipelineInterface] = None)[source]

Bases: redpipe.keyspaces.Keyspace

Manipulate a HyperLogLog key in redis.

pfadd(name: str, *values) → redpipe.futures.Future[int][int][source]

Adds the specified elements to the specified HyperLogLog.

Parameters:
  • name – str the name of the redis key
  • values – list of str
pfcount(*sources) → redpipe.futures.Future[int][int][source]

Return the approximated cardinality of the set observed by the HyperLogLog at key(s).

Using the execute_command because redis-py disabled it unnecessarily for cluster. You can only send one key at a time in that case, or only keys that map to the same keyslot. Use at your own risk.

Parameters:sources – [str] the names of the redis keys
pfmerge(dest: str, *sources) → redpipe.futures.Future[NoneType][None][source]

Merge N different HyperLogLogs into a single one.

Parameters:
  • dest
  • sources
Returns:

redpipe.luascripts module

Some utility lua scripts used to extend some functionality in redis. It also let’s me exercise the eval code path a bit.

redpipe.pipelines module

This is where the magic happens. The most important components of redpipe are here. The Pipeline and NestedPipeline classes and the pipeline function enable Use to pass pipeline functions into each other and attach redis calls to them.

The main function exposed here is the pipeline function. You will use it everywhere, so get used to this syntax:

def incr(name, pipe=None):
    with redpipe.autoexec(pipe=pipe) as pipe:
        return pipe.incr(name)

with redpipe.autoexec() as pipe:
    a = incr('a', pipe=pipe)
    b = incr('b', pipe=pipe)

print([a, b])

Look at the incr function. The call to redpipe.pipeline will return a Pipeline object if None is passed in. And if a Pipeline object is passed in, it will return a NestedPipeline object. Those two objects present the same interface but behave very differently.

Pipeline objects execute your pipelined calls. NestedPipeline objects pass their commands up the chain to the parent pipeline they wrap. This could be another NestedPipeline object, or a Pipeline() object.

redpipe.pipelines.pipeline(pipe: Optional[redpipe.pipelines.PipelineInterface] = None, name: Optional[str] = None, autoexec: bool = False, exit_handler: Optional[Callable] = None) → Union[redpipe.pipelines.Pipeline, redpipe.pipelines.NestedPipeline][source]

This is the foundational function for all of redpipe. Everything goes through here. create pipelines, nest pipelines, get pipelines for a specific name. It all happens here.

Here’s a simple example:

with pipeline() as pipe:
    pipe.set('foo', 'bar')
    foo = pipe.get('foo')
    pipe.execute()
print(foo)
> bar

Now let’s look at how we can nest a pipeline.

def process(key, pipe=None):
    with pipeline(pipe, autoexec=True) as pipe:
        return pipe.incr(key)

with pipeline() as pipe:
    key1 = process('key1', pipe)
    key2 = process('key2', pipe)
    pipe.execute()

print([key1, key2])

> [1, 1]
Parameters:
  • pipe – a Pipeline() or NestedPipeline() object, or None
  • name – str, optional. the name of the connection to use.
  • autoexec – bool, if true, implicitly execute the pipe
  • exit_handler – Callable
Returns:

Pipeline or NestedPipeline

redpipe.pipelines.autoexec(pipe: Optional[redpipe.pipelines.PipelineInterface] = None, name: Optional[str] = None, exit_handler: Optional[Callable] = None) → Union[redpipe.pipelines.Pipeline, redpipe.pipelines.NestedPipeline][source]

create a pipeline with a context that will automatically execute the pipeline upon leaving the context if no exception was raised.

Parameters:
  • pipe
  • name
  • exit_handler
Returns:

class redpipe.pipelines.PipelineInterface(*args, **kwargs)[source]

Bases: typing_extensions.Protocol

execute() → None[source]
on_execute(callback: Callable) → None[source]
reset() → None[source]

redpipe.structs module

The Struct is a convenient way to access data in a hash. Makes it possible to load data from redis as an object and access the fields. Then store changes back into redis.

class redpipe.structs.Struct(_key_or_data: Union[str, Dict[KT, VT]], pipe: Optional[redpipe.pipelines.PipelineInterface] = None, fields: Union[str, List[str], None] = None, no_op: bool = False, nx: bool = False)[source]

Bases: object

load and store structured data in redis using OOP patterns.

If you pass in a dictionary-like object, redpipe will write all the values you pass in to redis to the key you specify.

By default, the primary key name is _key. But you should override this in your Struct with the key_name property.

class Beer(redpipe.Struct):
    fields = {'name': redpipe.TextField}
    key_name = 'beer_id'

beer = Beer({'beer_id': '1', 'name': 'Schlitz'})

This will store the data you pass into redis. It will also load any additional fields to hydrate the object. RedPipe does this in the same pipelined call.

If you need a stub record that neither loads or saves data, do:

beer = Beer({'beer_id': '1'}, no_op=True)

You can later load the fields you want using, load.

If you pass in a string we assume it is the key of the record. redpipe loads the data from redis:

beer = Beer('1')
assert(beer['beer_id'] == '1')
assert(beer['name'] == 'Schlitz')

If you need to load a record but only specific fields, you can say so.

beer = Beer('1', fields=['name'])

This will exclude all other fields.

RedPipe cares about pipelining and efficiency, so if you need to bundle a bunch of reads or writes together, by all means do so!

beer_ids = ['1', '2', '3']
with redpipe.pipeline() as pipe:
    beers = [Beer(i, pipe=pipe) for i in beer_ids]
print(beers)

This will pipeline all 3 together and load them in a single pass from redis.

The following methods all accept a pipe:

  • __init__
  • update
  • incr
  • decr
  • pop
  • remove
  • clear
  • delete

You can pass a pipeline into them to make sure that the network i/o is combined with another pipeline operation. The other methods on the object are about accessing the data already loaded. So you shouldn’t need to pipeline them.

One more thing … suppose you are storing temporary data and you want it to expire after a few days. You can easily make that happen just by changing the object definition:

class Beer(redpipe.Struct):
    fields = {'name': redpipe.TextField}
    key_name = 'beer_id'
    ttl = 24 * 60 * 60 * 3

This makes sure that any set operations on the Struct will set the expiry at the same time. If the object isn’t modified for more than the seconds specified in the ttl (stands for time-to-live), then the object will be expired from redis. This is useful for temporary objects.

clear(pipe=None)[source]

delete the current redis key.

Parameters:pipe
Returns:
connection = None
copy()[source]

like the dictionary copy method.

Returns:
decr(field: str, amount: int = 1, pipe: Optional[redpipe.pipelines.PipelineInterface] = None) → redpipe.futures.Future[source]

Inverse of incr function.

Parameters:
  • field
  • amount
  • pipe
Returns:

Pipeline, NestedPipeline, or None

default_fields = 'all'
classmethod delete(keys, pipe=None)[source]

Delete one or more keys from the Struct namespace.

This is a class method and unlike the clear method, can be invoked without instantiating a Struct.

Parameters:
  • keys – the names of the keys to remove from the keyspace
  • pipe – Pipeline, NestedPipeline, or None
Returns:

None

field_attr_on = False
fields = {}
get(item, default=None)[source]

works like the dict get method.

Parameters:
  • item
  • default
Returns:

incr(field: str, amount: int = 1, pipe: Optional[redpipe.pipelines.PipelineInterface] = None) → redpipe.futures.Future[source]

Increment a field by a given amount. Return the future

Also update the field.

Parameters:
  • field
  • amount
  • pipe
Returns:

items() → List[Tuple[str, Any]][source]

We return the list of key/value pair tuples. Similar to iteritems but in list form instead of as a generator. The reason we do this is because python2 code probably expects this to be a list. Not sure if I could care, but just covering my bases.

Example:

u = User('1')
data = {k: v for k, v in u.items()}

Or:

u = User('1')
for k, v in u.items():
    print("%s: %s" % (k, v)
Returns:list, containing key/value pair tuples.
iteritems() → Iterable[Tuple[str, Any]][source]

Support for the python 2 iterator of key/value pairs. This includes the primary key name and value.

Example:

u = User('1')
data = {k: v for k, v in u.iteritems()}

Or:

u = User('1')
for k, v in u.iteritems():
    print("%s: %s" % (k, v)
Returns:generator, a list of key/value pair tuples
key
key_name = '_key'
keys() → List[str][source]

Get a list of all the keys in the Struct. This includes the primary key name, and all the elements that are set into redis.

Note: even if you define fields on the Struct, those keys won’t be returned unless the fields are actually written into the redis hash.

u = User('1')
assert(u.keys() == ['_key', 'name'])
Returns:list
keyspace = None
load(fields: Union[str, List[str], None] = None, pipe: Optional[redpipe.pipelines.PipelineInterface] = None) → None[source]

Load data from redis. Allows you to specify what fields to load. This method is also called implicitly from the constructor.

Parameters:
  • fields – ‘all’, ‘defined’, or array of field names
  • pipe – Pipeline(), NestedPipeline() or None
Returns:

None

persisted

Not certain I want to keep this around. Is it useful?

Returns:
pop(name, default=None, pipe=None)[source]

works like the dictionary pop method.

IMPORTANT!

This method removes the key from redis. If this is not the behavior you want, first convert your Struct data to a dict.

Parameters:
  • name
  • default
  • pipe
Returns:

remove(fields, pipe=None)[source]

remove some fields from the struct. This will remove data from the underlying redis hash object. After the pipe executes successfully, it will also remove it from the current instance of Struct.

Parameters:
  • fields – list or iterable, names of the fields to remove.
  • pipe – Pipeline, NestedPipeline, or None
Returns:

None

required = {}
ttl = None
update(changes: Dict[str, Any], pipe: Optional[redpipe.pipelines.PipelineInterface] = None, nx: bool = False)[source]

update the data in the Struct.

This will update the values in the underlying redis hash. After the pipeline executes, the changes will be reflected here in the local struct. If any values in the changes dict are None, those fields will be removed from redis and the instance. The changes should be a dictionary representing the fields to change and the values to change them to. If you pass the nx flag, only sets the fields if they don’t exist yet.

Parameters:
  • changes – dict
  • pipe – Pipeline, NestedPipeline, or None
  • nx – bool
Returns:

None

redpipe.tasks module

When sending commands to multiple redis backends in one redpipe.pipeline, this module gives us an api to allow threaded async communication to those different backends, improving parallelism.

The AsynchronousTask is well tested and should work well. But if you see any issues, you can easily disable this in your application.

redpipe.disable_threads()

Please report any issues.

redpipe.tasks.enable_threads()[source]

used to enable threaded behavior when talking to multiple redis backends in one pipeline execute call. Otherwise we don’t need it. :return: None

redpipe.tasks.disable_threads()[source]

used to disable threaded behavior when talking to multiple redis backends in one pipeline execute call. Use this option if you are really concerned about python threaded behavior in your application. Doesn’t apply if you are only ever talking to one redis backend at a time. :return: None

redpipe.version module

Utility for grabbing the redpipe version string from the VERSION file.