Source code for redpipe.structs

# -*- coding: utf-8 -*-
"""
The Struct is a convenient way to access data in a hash.
Makes it possible to load data from redis as an object and access the fields.
Then store changes back into redis.
"""
from six import add_metaclass
from json.encoder import JSONEncoder
from functools import wraps
from .pipelines import autoexec
from .keyspaces import Hash
from .fields import TextField
from .exceptions import InvalidOperation
from .futures import Future, IS

__all__ = ['Struct']


class StructMeta(type):
    """
    Data binding of a redpipe.Hash to the core of the Struct object.
    Creates it dynamically on class construction.
    uses the keyspace and connection fields
    Meta Classes are strange beasts.
    """

    def __new__(mcs, name, bases, d):
        if name in ['Struct']:
            return type.__new__(mcs, name, bases, d)

        class StructHash(Hash):
            keyspace = d.get('keyspace', name)
            connection = d.get('connection', None)
            fields = d.get('fields', {})
            keyparse = d.get('keyparse', TextField)
            valueparse = d.get('valueparse', TextField)
            memberparse = d.get('memberparse', TextField)

        d['core'] = StructHash

        return type.__new__(mcs, name, bases, d)


@add_metaclass(StructMeta)
[docs]class Struct(object): """ load and store structured data in redis using OOP patterns. If you pass in a dictionary-like object, redpipe will write all the values you pass in to redis to the key you specify. By default, the primary key name is `_key`. But you should override this in your Struct with the `key_name` property. .. code-block:: python class Beer(redpipe.Struct): fields = {'name': redpipe.StringField} key_name = 'beer_id' beer = Beer({'beer_id': '1', 'name': 'Schlitz'}) This will store the data you pass into redis. It will also load any additional fields to hydrate the object. **RedPipe** does this in the same pipelined call. If you need a stub record that neither loads or saves data, do: .. code-block:: python beer = Beer({'beer_id': '1'}, no_op=True) You can later load the fields you want using, load. If you pass in a string we assume it is the key of the record. redpipe loads the data from redis: .. code-block:: python beer = Beer('1') assert(beer['beer_id'] == '1') assert(beer['name'] == 'Schlitz') If you need to load a record but only specific fields, you can say so. .. code-block:: python beer = Beer('1', fields=['name']) This will exclude all other fields. **RedPipe** cares about pipelining and efficiency, so if you need to bundle a bunch of reads or writes together, by all means do so! .. code-block:: python beer_ids = ['1', '2', '3'] with redpipe.pipeline() as pipe: beers = [Beer(i, pipe=pipe) for i in beer_ids] print(beers) This will pipeline all 3 together and load them in a single pass from redis. The following methods all accept a pipe: * __init__ * update * incr * decr * pop * remove * clear * delete You can pass a pipeline into them to make sure that the network i/o is combined with another pipeline operation. The other methods on the object are about accessing the data already loaded. So you shouldn't need to pipeline them. """ __slots__ = ['key', '_data'] keyspace = None connection = None key_name = '_key' fields = {} default_fields = 'all' # set as 'defined', 'all', or ['a', b', 'c'] def __init__(self, _key_or_data=None, pipe=None, fields=None, no_op=False): """ class constructor :param _key_or_data: :param pipe: :param fields: """ keyname = self.key_name self._data = {} with self._pipe(pipe=pipe) as pipe: try: coerced = dict(_key_or_data) self.key = coerced[keyname] del coerced[keyname] if no_op: self._data = coerced return self.update(coerced, pipe=pipe) except KeyError: raise InvalidOperation( 'must specify primary key when cloning a struct') except (ValueError, TypeError): self.key = _key_or_data if not no_op: self.load(fields=fields, pipe=pipe)
[docs] def load(self, fields=None, pipe=None): """ Load data from redis. restrict to just the fields specified. :param fields: 'all', 'defined', or array of field names :param pipe: Pipeline(), NestedPipeline() or None :return: None """ if fields is None: fields = self.default_fields if fields == 'all': return self._load_all(pipe=pipe) if fields == 'defined': fields = [k for k in self.fields.keys()] if not fields: return with self._pipe(pipe) as pipe: ref = self.core(pipe=pipe).hmget(self.key, fields) def cb(): for i, v in enumerate(ref.result): k = fields[i] if k != self.key_name: self._data[k] = v pipe.on_execute(cb)
def _load_all(self, pipe=None): with self._pipe(pipe) as pipe: ref = self.core(pipe=pipe).hgetall(self.key) def cb(): if not ref.result: return for k, v in ref.result.items(): if k != self.key_name: self._data[k] = v pipe.on_execute(cb)
[docs] def incr(self, field, amount=1, pipe=None): """ Increment a field by a given amount. Return the future Also update the field. :param field: :param amount: :param pipe: :return: """ with self._pipe(pipe) as pipe: core = self.core(pipe=pipe) new_amount = core.hincrby(self.key, field, amount) ref = core.hget(self.key, field) def cb(): self._data[field] = ref.result pipe.on_execute(cb) return new_amount
[docs] def decr(self, field, amount=1, pipe=None): """ Inverse of incr function. :param field: :param amount: :param pipe: :return: Pipeline, NestedPipeline, or None """ return self.incr(field, amount * -1, pipe=pipe)
[docs] def update(self, changes, pipe=None): """ update the data in the Struct. This will update the values in the underlying redis hash. After the pipeline executes, the changes will be reflected here in the local struct. If any values in the changes dict are None, those fields will be removed from redis and the instance. The changes should be a dictionary representing the fields to change and the values to change them to. :param changes: dict :param pipe: Pipeline, NestedPipeline, or None :return: None """ if not changes: return if self.key_name in changes: raise InvalidOperation('cannot update the redis key') deletes = {k for k, v in changes.items() if IS(v, None)} updates = {k: v for k, v in changes.items() if k not in deletes} with self._pipe(pipe) as pipe: core = self.core(pipe=pipe) def build(k, v): core.hset(self.key, k, v) def cb(): self._data[k] = v pipe.on_execute(cb) for k, v in updates.items(): build(k, v) self.remove(deletes, pipe=pipe)
[docs] def remove(self, fields, pipe=None): """ remove some fields from the struct. This will remove data from the underlying redis hash object. After the pipe executes successfully, it will also remove it from the current instance of Struct. :param fields: list or iterable, names of the fields to remove. :param pipe: Pipeline, NestedPipeline, or None :return: None """ if not fields: return if self.key_name in fields: raise InvalidOperation('cannot remove the redis key') with self._pipe(pipe) as pipe: core = self.core(pipe=pipe) core.hdel(self.key, *fields) def cb(): for k in fields: try: del self._data[k] except KeyError: pass pipe.on_execute(cb)
[docs] def copy(self): """ like the dictionary copy method. :return: """ return self.__class__(dict(self))
@property def persisted(self): """ Not certain I want to keep this around. Is it useful? :return: """ return True if self._data else False
[docs] def clear(self, pipe=None): """ delete the current redis key. :param pipe: :return: """ with self._pipe(pipe) as pipe: self.core(pipe=pipe).delete(self.key) def cb(): self._data = {} pipe.on_execute(cb)
[docs] def get(self, item, default=None): """ works like the dict get method. :param item: :param default: :return: """ return self._data.get(item, default)
[docs] def pop(self, name, default=None, pipe=None): """ works like the dictionary pop method. IMPORTANT! This method removes the key from redis. If this is not the behavior you want, first convert your Struct data to a dict. :param name: :param default: :param pipe: :return: """ f = Future() with self._pipe(pipe) as pipe: c = self.core(pipe) ref = c.hget(self.key, name) c.hdel(self.key, name) def cb(): f.set(default if ref.result is None else ref.result) self._data.pop(name) pipe.on_execute(cb) return f
@classmethod
[docs] def delete(cls, keys, pipe=None): """ Delete one or more keys from the Struct namespace. This is a class method and unlike the `clear` method, can be invoked without instantiating a Struct. :param keys: the names of the keys to remove from the keyspace :param pipe: Pipeline, NestedPipeline, or None :return: None """ with cls._pipe(pipe) as pipe: core = cls.core(pipe) core.delete(*keys)
@classmethod def _pipe(cls, pipe=None): return autoexec(pipe, name=cls.connection) def __getitem__(self, item): if item == self.key_name: return self.key return self._data[item] def __delitem__(self, key): tpl = 'cannot delete %s from %s indirectly. Use the delete method.' raise InvalidOperation(tpl % (key, self)) def __setitem__(self, key, value): tpl = 'cannot set %s key on %s indirectly. Use the set method.' raise InvalidOperation(tpl % (key, self)) def __iter__(self): for k in self.keys(): yield k def __len__(self): return len(dict(self)) def __contains__(self, item): if item == self.key_name: return True return item in self._data
[docs] def iteritems(self): yield self.key_name, self.key for k, v in self._data.items(): yield k, v
[docs] def items(self): return [row for row in self.iteritems()]
def __eq__(self, other): if self is other: return True try: if dict(self) == dict(other): return True except (TypeError, ValueError): pass return False
[docs] def keys(self): return [row[0] for row in self.items()]
def __str__(self): return "<%s:%s>" % (self.__class__.__name__, self.key) def __repr__(self): return repr(dict(self)) def __getstate__(self): return self.key, self._data, def __setstate__(self, state): self.key = state[0] self._data = state[1] @property def _redpipe_struct_as_dict(self): return dict(self)
def _json_default_encoder(func): """ Monkey-Patch the core json encoder library. This isn't as bad as it sounds. We override the default method so that if an object falls through and can't be encoded normally, we see if it is a Future object and return the result to be encoded. I set a special attribute on the Struct object so I can tell that's what it is. If that doesn't work, I fall back to the earlier behavior. The nice thing about patching the library this way is that it won't inerfere with existing code and it can itself be wrapped by other methods. So it's very extensible. :param func: the JSONEncoder.default method. :return: an object that can be json serialized. """ @wraps(func) def inner(self, o): try: return o._redpipe_struct_as_dict # noqa except AttributeError: pass return func(self, o) return inner JSONEncoder.default = _json_default_encoder(JSONEncoder.default)