Commit f88925a1 authored by Sean Bleier's avatar Sean Bleier

Move master-slave setup to default cache backend and refactor a bit.

parent a8b809a8
This diff is collapsed.
...@@ -11,98 +11,38 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -11,98 +11,38 @@ class ShardedRedisCache(BaseRedisCache):
def __init__(self, server, params): def __init__(self, server, params):
super(ShardedRedisCache, self).__init__(server, params) super(ShardedRedisCache, self).__init__(server, params)
self._params = params
self._server = server
self._pickle_version = None
self.__master_client = None
self.clients = {}
self.sharder = HashRing() self.sharder = HashRing()
if not isinstance(server, (list, tuple)): for server in self.servers:
servers = [server]
else:
servers = server
for server in servers:
client = self.create_client(server) client = self.create_client(server)
self.clients[client.connection_pool.connection_identifier] = client self.clients[client.connection_pool.connection_identifier] = client
self.sharder.add(client.connection_pool.connection_identifier) self.sharder.add(client.connection_pool.connection_identifier)
@property self.client_list = self.clients.values()
def master_client(self):
"""
Get the write server:port of the master cache def get_client(self, key, write=False):
"""
if not hasattr(self, '_master_client') and self.__master_client is None:
cache = self.options.get('MASTER_CACHE', None)
if cache is None:
self._master_client = None
else:
self._master_client = self.create_client(cache)
return self._master_client
def get_client(self, key, for_write=False):
if for_write and self.master_client is not None:
return self.master_client
node = self.sharder.get_node(unicode(key)) node = self.sharder.get_node(unicode(key))
return self.clients[node] return self.clients[node]
def shard(self, keys, for_write=False, version=None): def shard(self, keys, write=False, version=None):
""" """
Returns a dict of keys that belong to a cache's keyspace. Returns a dict of keys that belong to a cache's keyspace.
""" """
clients = defaultdict(list) clients = defaultdict(list)
for key in keys: for key in keys:
clients[self.get_client(key, for_write)].append(self.make_key(key, version)) clients[self.get_client(key, write)].append(self.make_key(key, version))
return clients return clients
#################### ####################
# Django cache api # # Django cache api #
#################### ####################
def add(self, key, value, timeout=None, version=None):
"""
Add a value to the cache, failing if the key already exists.
Returns ``True`` if the object was added, ``False`` if not.
"""
client = self.get_client(key)
key = self.make_key(key, version=version)
return self._add(client, key, value, timeout)
def get(self, key, default=None, version=None):
"""
Retrieve a value from the cache.
Returns unpickled value if key is found, the default if not.
"""
client = self.get_client(key)
key = self.make_key(key, version=version)
return self._get(client, key, default)
def set(self, key, value, timeout=None, version=None, client=None):
"""
Persist a value to the cache, and set an optional expiration time.
"""
if client is None:
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
return self._set(key, value, timeout, client=client)
def delete(self, key, version=None):
"""
Remove a key from the cache.
"""
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
return self._delete(client, key)
def delete_many(self, keys, version=None): def delete_many(self, keys, version=None):
""" """
Remove multiple keys at once. Remove multiple keys at once.
""" """
clients = self.shard(keys, for_write=True, version=version) clients = self.shard(keys, write=True, version=version)
for client, keys in clients.items(): for client, keys in clients.items():
self._delete_many(client, keys) self._delete_many(client, keys)
...@@ -114,11 +54,8 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -114,11 +54,8 @@ class ShardedRedisCache(BaseRedisCache):
namespace will be deleted. Otherwise, all keys will be deleted. namespace will be deleted. Otherwise, all keys will be deleted.
""" """
if version is None: if version is None:
if self.master_client is None: for client in self.clients.itervalues():
for client in self.clients.itervalues(): self._clear(client)
self._clear(client)
else:
self._clear(self.master_client)
else: else:
self.delete_pattern('*', version=version) self.delete_pattern('*', version=version)
...@@ -138,31 +75,23 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -138,31 +75,23 @@ class ShardedRedisCache(BaseRedisCache):
If timeout is given, that timeout will be used for the key; otherwise If timeout is given, that timeout will be used for the key; otherwise
the default cache timeout will be used. the default cache timeout will be used.
""" """
clients = self.shard(data.keys(), for_write=True, version=version) clients = self.shard(data.keys(), write=True, version=version)
if timeout is None: if timeout is None:
for client, keys in clients.items(): for client, keys in clients.items():
subset = {} subset = {}
for key in keys: for key in keys:
subset[key] = data[key._original_key] subset[key] = self.prep_value(data[key._original_key])
self._set_many(client, subset) self._set_many(client, subset)
return return
for client, keys in clients.items(): for client, keys in clients.items():
pipeline = client.pipeline() pipeline = client.pipeline()
for key in keys: for key in keys:
self._set(key, data[key._original_key], timeout, client=pipeline) value = self.prep_value(data[key._original_key])
self._set(pipeline, key, value, timeout)
pipeline.execute() pipeline.execute()
def incr(self, key, delta=1, version=None):
"""
Add delta to value in the cache. If the key does not exist, raise a
ValueError exception.
"""
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
return self._incr(client, key, delta=delta)
def incr_version(self, key, delta=1, version=None): def incr_version(self, key, delta=1, version=None):
""" """
Adds delta to the cache version for the supplied key. Returns the Adds delta to the cache version for the supplied key. Returns the
...@@ -172,7 +101,7 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -172,7 +101,7 @@ class ShardedRedisCache(BaseRedisCache):
if version is None: if version is None:
version = self.version version = self.version
client = self.get_client(key, for_write=True) client = self.get_client(key, write=True)
old = self.make_key(key, version=version) old = self.make_key(key, version=version)
new = self.make_key(key, version=version + delta) new = self.make_key(key, version=version + delta)
...@@ -182,27 +111,10 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -182,27 +111,10 @@ class ShardedRedisCache(BaseRedisCache):
# Extra api methods # # Extra api methods #
##################### #####################
def has_key(self, key, version=None):
client = self.get_client(key, for_write=False)
return self._has_key(client, key, version)
def ttl(self, key, version=None):
client = self.get_client(key, for_write=False)
key = self.make_key(key, version=version)
return self._ttl(client, key)
def delete_pattern(self, pattern, version=None): def delete_pattern(self, pattern, version=None):
pattern = self.make_key(pattern, version=version) pattern = self.make_key(pattern, version=version)
if self.master_client is None: for client in self.clients.itervalues():
for client in self.clients.itervalues(): self._delete_pattern(client, pattern)
self._delete_pattern(client, pattern)
else:
self._delete_pattern(self.master_client, pattern)
def get_or_set(self, key, func, timeout=None, version=None):
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
return self._get_or_set(client, key, func, timeout)
def reinsert_keys(self): def reinsert_keys(self):
""" """
...@@ -210,14 +122,3 @@ class ShardedRedisCache(BaseRedisCache): ...@@ -210,14 +122,3 @@ class ShardedRedisCache(BaseRedisCache):
""" """
for client in self.clients.itervalues(): for client in self.clients.itervalues():
self._reinsert_keys(client) self._reinsert_keys(client)
print
def persist(self, key, version=None):
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
self._persist(client, key, version)
def expire(self, key, timeout, version=None):
client = self.get_client(key, for_write=True)
key = self.make_key(key, version=version)
self._expire(client, key, timeout, version)
...@@ -2,6 +2,7 @@ try: ...@@ -2,6 +2,7 @@ try:
import cPickle as pickle import cPickle as pickle
except ImportError: except ImportError:
import pickle import pickle
import random
from redis_cache.backends.base import BaseRedisCache from redis_cache.backends.base import BaseRedisCache
from redis_cache.compat import bytes_type, DEFAULT_TIMEOUT from redis_cache.compat import bytes_type, DEFAULT_TIMEOUT
...@@ -15,75 +16,41 @@ class RedisCache(BaseRedisCache): ...@@ -15,75 +16,41 @@ class RedisCache(BaseRedisCache):
""" """
super(RedisCache, self).__init__(server, params) super(RedisCache, self).__init__(server, params)
if not isinstance(server, bytes_type): for server in self.servers:
self._server, = server client = self.create_client(server)
self.clients[client.connection_pool.connection_identifier] = client
self.client = self.create_client(server) self.client_list = self.clients.values()
self.clients = { self.master_client = self.get_master_client()
self.client.connection_pool.connection_identifier: self.client
}
def get_client(self, *args): def get_client(self, key, write=False):
return self.client if write and self.master_client is not None:
return self.master_client
return random.choice(self.client_list)
#################### ####################
# Django cache api # # Django cache api #
#################### ####################
def add(self, key, value, timeout=None, version=None):
"""
Add a value to the cache, failing if the key already exists.
Returns ``True`` if the object was added, ``False`` if not.
"""
key = self.make_key(key, version=version)
return self._add(self.client, key, value, timeout)
def get(self, key, default=None, version=None):
"""
Retrieve a value from the cache.
Returns unpickled value if key is found, the default if not.
"""
key = self.make_key(key, version=version)
return self._get(self.client, key, default)
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None, client=None):
"""
Persist a value to the cache, and set an optional expiration time.
"""
key = self.make_key(key, version=version)
return self._set(key, value, timeout, client=self.client)
def delete(self, key, version=None):
"""
Remove a key from the cache.
"""
key = self.make_key(key, version=version)
return self._delete(self.client, key)
def delete_many(self, keys, version=None): def delete_many(self, keys, version=None):
""" """Remove multiple keys at once."""
Remove multiple keys at once.
"""
versioned_keys = self.make_keys(keys, version=version) versioned_keys = self.make_keys(keys, version=version)
self._delete_many(self.client, versioned_keys) self._delete_many(self.master_client, versioned_keys)
def clear(self, version=None): def clear(self, version=None):
""" """Flush cache keys.
Flush cache keys.
If version is specified, all keys belonging the version's key If version is specified, all keys belonging the version's key
namespace will be deleted. Otherwise, all keys will be deleted. namespace will be deleted. Otherwise, all keys will be deleted.
""" """
if version is None: if version is None:
self._clear(self.client) self._clear(self.master_client)
else: else:
self.delete_pattern('*', version=version) self.delete_pattern('*', version=version)
def get_many(self, keys, version=None): def get_many(self, keys, version=None):
versioned_keys = self.make_keys(keys, version=version) versioned_keys = self.make_keys(keys, version=version)
return self._get_many(self.client, keys, versioned_keys=versioned_keys) return self._get_many(self.master_client, keys, versioned_keys=versioned_keys)
def set_many(self, data, timeout=None, version=None): def set_many(self, data, timeout=None, version=None):
""" """
...@@ -97,22 +64,15 @@ class RedisCache(BaseRedisCache): ...@@ -97,22 +64,15 @@ class RedisCache(BaseRedisCache):
if timeout is None: if timeout is None:
new_data = {} new_data = {}
for key in versioned_keys: for key in versioned_keys:
new_data[key] = data[key._original_key] new_data[key] = self.prep_value(data[key._original_key])
return self._set_many(self.client, new_data) return self._set_many(self.master_client, new_data)
pipeline = self.client.pipeline() pipeline = self.master_client.pipeline()
for key in versioned_keys: for key in versioned_keys:
self._set(key, data[key._original_key], timeout, client=pipeline) value = self.prep_value(data[key._original_key])
self._set(pipeline, key, value, timeout)
pipeline.execute() pipeline.execute()
def incr(self, key, delta=1, version=None):
"""
Add delta to value in the cache. If the key does not exist, raise a
ValueError exception.
"""
key = self.make_key(key, version=version)
return self._incr(self.client, key, delta=delta)
def incr_version(self, key, delta=1, version=None): def incr_version(self, key, delta=1, version=None):
""" """
Adds delta to the cache version for the supplied key. Returns the Adds delta to the cache version for the supplied key. Returns the
...@@ -125,37 +85,18 @@ class RedisCache(BaseRedisCache): ...@@ -125,37 +85,18 @@ class RedisCache(BaseRedisCache):
old = self.make_key(key, version) old = self.make_key(key, version)
new = self.make_key(key, version=version + delta) new = self.make_key(key, version=version + delta)
return self._incr_version(self.client, old, new, delta, version) return self._incr_version(self.master_client, old, new, delta, version)
##################### #####################
# Extra api methods # # Extra api methods #
##################### #####################
def has_key(self, key, version=None):
return self._has_key(self.client, key, version)
def ttl(self, key, version=None):
key = self.make_key(key, version=version)
return self._ttl(self.client, key)
def delete_pattern(self, pattern, version=None): def delete_pattern(self, pattern, version=None):
pattern = self.make_key(pattern, version=version) pattern = self.make_key(pattern, version=version)
self._delete_pattern(self.client, pattern) self._delete_pattern(self.master_client, pattern)
def get_or_set(self, key, func, timeout=None, version=None):
key = self.make_key(key, version=version)
return self._get_or_set(self.client, key, func, timeout)
def reinsert_keys(self): def reinsert_keys(self):
""" """
Reinsert cache entries using the current pickle protocol version. Reinsert cache entries using the current pickle protocol version.
""" """
self._reinsert_keys(self.client) self._reinsert_keys(self.master_client)
def persist(self, key, version=None):
key = self.make_key(key, version=version)
self._persist(self.client, key, version)
def expire(self, key, timeout, version=None):
key = self.make_key(key, version=version)
self._expire(self.client, key, timeout, version)
...@@ -21,7 +21,7 @@ LOCATIONS = [ ...@@ -21,7 +21,7 @@ LOCATIONS = [
@override_settings(CACHES={ @override_settings(CACHES={
'default': { 'default': {
'BACKEND': 'redis_cache.ShardedRedisCache', 'BACKEND': 'redis_cache.RedisCache',
'LOCATION': LOCATIONS, 'LOCATION': LOCATIONS,
'OPTIONS': { 'OPTIONS': {
'DB': 1, 'DB': 1,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment