Quick Recap: What Is Read Through?
In a Read Through setup, your application never talks to the database directly when it wants to read data. Instead, it always asks a middle layer. That middle layer checks the cache (Redis) first. If the data is there, it returns it immediately. If the data is not there (a cache miss), the middle layer goes to the database, gets the data, stores it in Redis for next time, and then returns it to the application.
The important part: your application code does not know or care whether the data came from the cache or the database. It just calls a function and gets data back. All the caching logic is hidden behind that function.
- Your app only ever calls the cache layer
- The cache layer handles everything: check, fetch, store, return
- First request for any data is always slow (cache miss)
- Cache layer needs to know how to talk to your DB
Why Redis?
Redis is an in-memory data store. That means it keeps everything in RAM, which makes it extremely fast. A typical database query might take 5 to 50 milliseconds depending on complexity. A Redis lookup takes less than 1 millisecond in most cases.
Redis also supports setting expiration times on keys (TTL), which is exactly what you need for caching. You store something, tell Redis "forget this after 5 minutes," and Redis handles the cleanup automatically.
Other reasons Redis fits well here:
It supports many data types (strings, hashes, lists, sets) so you can store simple values or complex objects.
It is widely supported with client libraries in every major language.
It can handle hundreds of thousands of operations per second on a single instance.
It is battle-tested. Companies like Twitter, GitHub, and Instagram use it heavily.
Setting Up: What You Need
Before we write any code, you need two things installed:
1. Redis server
If you are on Mac: brew install redis then redis-server
If you are using Docker: docker run -d -p 6379:6379 redis:alpine
If you are on Ubuntu: sudo apt install redis-server
2. Python Redis client
pip install redis
That is it. Redis runs on port 6379 by default. The Python client will connect to it automatically.
Connecting to Redis from Python
Let us start with the basics. Here is how you connect to Redis and do a simple read/write:
import redis
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Store a value
r.set('greeting', 'hello world')
# Read it back
value = r.get('greeting')
print(value) # prints: hello world
# Store with expiration (60 seconds)
r.set('temporary', 'this will disappear', ex=60)
The decode_responses=True part is important. Without it, Redis returns bytes (like b'hello world') instead of regular strings. Setting this flag makes it return normal Python strings, which is almost always what you want.
The db=0 is the database number. Redis has 16 databases by default (0 through 15). Think of them as separate namespaces. Most people just use 0.
The Naive Implementation (Do Not Do This)
Before showing you the right way, let me show you what most people do when they first try to add caching. This is the "Cache Aside" pattern disguised as application code:
import redis
import psycopg2
import json
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def get_user_bad_way(user_id):
"""This works but it is messy. Every function that
needs data has to repeat this same pattern."""
# Check cache
cached = r.get(f'user:{user_id}')
if cached:
return json.loads(cached)
# Cache miss, query database
conn = psycopg2.connect('dbname=myapp')
cur = conn.cursor()
cur.execute('SELECT id, name, email FROM users WHERE id = %s', (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
user = {'id': row[0], 'name': row[1], 'email': row[2]}
# Store in cache
r.set(f'user:{user_id}', json.dumps(user), ex=300)
return user
This works. But the problem is clear: every single function that reads data has to contain the same caching boilerplate. Check Redis, handle miss, query DB, serialize, store in Redis, set TTL. If you have 20 different data-fetching functions, you are copying and pasting this pattern 20 times.
That is not Read Through. That is Cache Aside with extra steps. Let us do it properly.
The Proper Read Through Implementation
The idea is to build a cache layer that sits between your application and the database. Your application calls the cache layer. The cache layer handles everything.
Here is a clean, reusable implementation:
import redis
import json
from typing import Callable, Optional, Any
class ReadThroughCache:
"""A Read Through cache layer using Redis.
Your application calls this instead of the database.
It checks Redis first, and on miss, calls the provided
database function, caches the result, and returns it.
"""
def __init__(self, redis_client: redis.Redis, default_ttl: int = 300):
self.redis = redis_client
self.default_ttl = default_ttl
def get(
self,
key: str,
db_fetch_fn: Callable,
ttl: Optional[int] = None
) -> Any:
"""Get data by key. Returns from cache if available,
otherwise calls db_fetch_fn, caches the result, and returns it.
Args:
key: The Redis key to look up
db_fetch_fn: A function that fetches data from the DB.
Called only on cache miss. Takes no arguments.
ttl: Time to live in seconds. Uses default_ttl if not provided.
"""
ttl = ttl if ttl is not None else self.default_ttl
# Step 1: Check Redis
cached = self.redis.get(key)
if cached is not None:
# Cache hit
return json.loads(cached)
# Step 2: Cache miss. Call the database function.
data = db_fetch_fn()
if data is None:
# Nothing in DB either. Optionally cache the miss
# to prevent repeated DB lookups for non-existent data.
# We cache None as a short-lived entry.
self.redis.set(key, json.dumps(None), ex=60)
return None
# Step 3: Store in Redis
self.redis.set(key, json.dumps(data), ex=ttl)
# Step 4: Return the data
return data
def invalidate(self, key: str):
"""Remove a key from the cache.
Use this when the underlying data changes."""
self.redis.delete(key)
def invalidate_pattern(self, pattern: str):
"""Remove all keys matching a pattern.
Example: invalidate_pattern('user:*') removes all user caches."""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
Now here is how your application uses it:
import redis
import psycopg2
# Setup
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
cache = ReadThroughCache(redis_client=r, default_ttl=300)
# Your database access functions (plain, no caching logic)
def fetch_user_from_db(user_id):
conn = psycopg2.connect('dbname=myapp')
cur = conn.cursor()
cur.execute('SELECT id, name, email FROM users WHERE id = %s', (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return {'id': row[0], 'name': row[1], 'email': row[2]}
def fetch_product_from_db(product_id):
conn = psycopg2.connect('dbname=myapp')
cur = conn.cursor()
cur.execute('SELECT id, name, price FROM products WHERE id = %s', (product_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return {'id': row[0], 'name': row[1], 'price': float(row[2])}
# Usage: clean, simple, no caching boilerplate
user = cache.get(
key='user:42',
db_fetch_fn=lambda: fetch_user_from_db(42),
ttl=600 # cache users for 10 minutes
)
product = cache.get(
key='product:99',
db_fetch_fn=lambda: fetch_product_from_db(99),
ttl=120 # cache products for 2 minutes
)
Notice what happened. Your application code is clean. It just calls cache.get() with a key and a function that knows how to fetch the data. The cache layer does the rest. If you add 50 more data types, you never repeat the caching logic.
Serialization: Storing Complex Objects
Redis stores everything as strings. That means if you want to cache a Python dictionary, a list, or any complex object, you need to convert it to a string and back. This is called serialization.
The implementation above uses json.dumps() and json.loads(). This works great for dictionaries, lists, strings, numbers, and booleans. But it does NOT work for:
Python datetime objects
Decimal types
Custom class instances
Sets
Bytes
If your data contains any of these, you need a custom JSON encoder:
import json
from datetime import datetime, date
from decimal import Decimal
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return str(obj)
if isinstance(obj, set):
return list(obj)
return super().default(obj)
def custom_serialize(data):
return json.dumps(data, cls=CustomEncoder)
def custom_deserialize(data_str):
return json.loads(data_str)
Then update the cache class to use these:
class ReadThroughCache:
def __init__(self, redis_client, default_ttl=300,
serializer=None, deserializer=None):
self.redis = redis_client
self.default_ttl = default_ttl
self.serialize = serializer or json.dumps
self.deserialize = deserializer or json.loads
def get(self, key, db_fetch_fn, ttl=None):
ttl = ttl if ttl is not None else self.default_ttl
cached = self.redis.get(key)
if cached is not None:
return self.deserialize(cached)
data = db_fetch_fn()
if data is None:
self.redis.set(key, self.serialize(None), ex=60)
return None
self.redis.set(key, self.serialize(data), ex=ttl)
return data
Now you can plug in any serialization format. JSON for most cases. Custom encoder for datetime/Decimal. You could even use pickle if you trust the data source, though pickle has security risks with untrusted data, so stick with JSON when possible.
TTL Strategies: How Long Should Data Stay Cached?
TTL stands for Time To Live. It is the number of seconds before Redis automatically deletes a cached entry. Choosing the right TTL is one of the most important decisions in caching.
Too short: You get too many cache misses. The cache barely helps. You are still hammering your database.
Too long: Users see stale (outdated) data. Someone updates their profile but the old version keeps showing for hours.
There is no single right answer. It depends on the data. Here are three strategies:
Strategy 1: Fixed TTL
The simplest approach. Every cached item gets the same expiration time.
# Everything expires after 5 minutes
cache = ReadThroughCache(redis_client=r, default_ttl=300)
Good for getting started. Bad for production, because not all data is the same. User profiles change rarely. Stock prices change every second. They should not have the same TTL.
Strategy 2: Per-Type TTL
Different data types get different TTLs based on how often they change:
# User profiles rarely change: cache for 30 minutes
user = cache.get(
key=f'user:{user_id}',
db_fetch_fn=lambda: fetch_user_from_db(user_id),
ttl=1800
)
# Product prices change more often: cache for 2 minutes
product = cache.get(
key=f'product:{product_id}',
db_fetch_fn=lambda: fetch_product_from_db(product_id),
ttl=120
)
# Configuration settings almost never change: cache for 1 hour
config = cache.get(
key='app:config',
db_fetch_fn=lambda: fetch_config_from_db(),
ttl=3600
)
# Search results are volatile: cache for 30 seconds
results = cache.get(
key=f'search:{query_hash}',
db_fetch_fn=lambda: search_db(query),
ttl=30
)
This is the most common approach in production. You think about each data type and assign a TTL that makes sense for its rate of change.
Strategy 3: Sliding TTL
The expiration timer resets every time the data is accessed. Popular data stays cached longer. Unpopular data expires naturally.
def get_with_sliding_ttl(self, key, db_fetch_fn, ttl=None):
ttl = ttl if ttl is not None else self.default_ttl
cached = self.redis.get(key)
if cached is not None:
# Reset the TTL on every access
self.redis.expire(key, ttl)
return self.deserialize(cached)
data = db_fetch_fn()
if data is None:
self.redis.set(key, self.serialize(None), ex=60)
return None
self.redis.set(key, self.serialize(data), ex=ttl)
return data
The downside: if data changes in the database but is popular in the cache, it could stay stale for a very long time because the TTL keeps getting extended. Use this only for data that rarely changes or where staleness is acceptable.
Cache Invalidation: The Hard Part
There is a famous quote in computer science: "There are only two hard things in computer science: cache invalidation and naming things."
Cache invalidation means: when the data in your database changes, how do you make sure the cache does not keep serving the old version?
There are three main approaches:
Approach 1: Time-Based (TTL Expiration)
The simplest. You do nothing. You let the TTL handle it. When the cached data expires, the next request will fetch fresh data from the database.
# Data cached for 5 minutes
# If someone updates the database, the cache will have stale data
# for at most 5 minutes before it expires and gets refreshed
cache.get(key='user:42', db_fetch_fn=lambda: fetch_user(42), ttl=300)
This is acceptable when:
A few minutes of staleness is not a problem.
The data does not change very often.
You want simplicity over accuracy.
This is NOT acceptable when:
A user changes their password and the old one keeps working for 5 more minutes.
Inventory counts need to be accurate in real time.
Financial data is involved.
Approach 2: Event-Based Invalidation
Whenever data changes in the database, you explicitly delete the cache entry. The next read will trigger a fresh fetch.
def update_user(user_id, new_data):
"""Update user in database AND invalidate cache."""
# Update the database
conn = psycopg2.connect('dbname=myapp')
cur = conn.cursor()
cur.execute(
'UPDATE users SET name = %s, email = %s WHERE id = %s',
(new_data['name'], new_data['email'], user_id)
)
conn.commit()
cur.close()
conn.close()
# Invalidate the cache
cache.invalidate(f'user:{user_id}')
# Next time someone reads user:42, it will be a cache miss
# and fresh data will be fetched from DB
This is better because there is zero staleness. The moment data changes, the old cache entry is gone.
The downside: you have to remember to invalidate everywhere data changes. If you have 10 different places in your code that update users, every single one needs the invalidation call. Miss one and you have a stale cache bug that is very hard to find.
Approach 3: Pattern-Based Invalidation
Sometimes when one thing changes, you need to invalidate multiple related cache entries. For example, if a product price changes, you might need to invalidate the product cache, the category listing cache, the search results cache, and the homepage "featured products" cache.
def update_product_price(product_id, new_price):
# Update database
update_price_in_db(product_id, new_price)
# Invalidate everything related
cache.invalidate(f'product:{product_id}')
cache.invalidate_pattern(f'category:*')
cache.invalidate_pattern(f'search:*')
cache.invalidate('homepage:featured')
Warning about invalidate_pattern: The Redis KEYS command that powers pattern matching scans every single key in the database. On a Redis instance with millions of keys, this can block the server for seconds. In production, use SCAN instead:
def invalidate_pattern_safe(self, pattern: str):
"""Production-safe pattern invalidation using SCAN
instead of KEYS to avoid blocking Redis."""
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor=cursor, match=pattern, count=100)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
Error Handling: What Happens When Things Break
In production, things break. Redis goes down. The database goes down. Network connections time out. Your cache layer needs to handle all of this gracefully.
Scenario 1: Redis Is Down
If Redis is unreachable, your entire application should NOT crash. It should fall back to reading directly from the database. Slower, but functional.
def get(self, key, db_fetch_fn, ttl=None):
ttl = ttl if ttl is not None else self.default_ttl
# Try Redis first
try:
cached = self.redis.get(key)
if cached is not None:
return self.deserialize(cached)
except redis.ConnectionError:
# Redis is down. Log it, but do not crash.
print(f'WARNING: Redis unavailable. Falling back to DB for key: {key}')
except redis.TimeoutError:
# Redis is too slow. Treat as a miss.
print(f'WARNING: Redis timeout for key: {key}')
# Cache miss (or Redis failure). Query the database.
data = db_fetch_fn()
# Try to cache the result, but do not crash if Redis is still down
if data is not None:
try:
self.redis.set(key, self.serialize(data), ex=ttl)
except (redis.ConnectionError, redis.TimeoutError):
pass # Could not cache. That is fine. DB still works.
return data
The key principle: the cache is an optimization, not a requirement. If the cache is gone, the application should still work. It will just be slower.
Scenario 2: The Database Is Down
This is worse. If the database is down and the data is in Redis, you can still serve it. If the data is NOT in Redis, you are stuck.
def get(self, key, db_fetch_fn, ttl=None):
ttl = ttl if ttl is not None else self.default_ttl
# Try Redis
try:
cached = self.redis.get(key)
if cached is not None:
return self.deserialize(cached)
except (redis.ConnectionError, redis.TimeoutError):
pass
# Try database
try:
data = db_fetch_fn()
except Exception as e:
print(f'ERROR: Database query failed: {e}')
# Database is down AND data not in cache.
# You have two options:
# 1. Return None and let the caller handle it
# 2. Raise the exception and let it bubble up
return None
if data is not None:
try:
self.redis.set(key, self.serialize(data), ex=ttl)
except (redis.ConnectionError, redis.TimeoutError):
pass
return data
When the database is down, cached data becomes your lifeline. This is one reason to set longer TTLs for critical data. Even if the data is slightly stale, serving something is better than serving an error page.
Scenario 3: Timeout Handling
Network calls can hang. You do not want your application waiting 30 seconds for a Redis response. Set timeouts:
# Set connection and read timeouts (in seconds)
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_timeout=1, # Read/write timeout: 1 second
socket_connect_timeout=1, # Connection timeout: 1 second
retry_on_timeout=False # Do not retry, just fail fast
)
One second is generous for Redis. Most operations complete in under a millisecond. If Redis takes more than a second to respond, something is seriously wrong and you should fall back to the database rather than wait.
Production Considerations
Connection Pooling
Creating a new Redis connection for every request is expensive. Use a connection pool:
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
decode_responses=True,
max_connections=20,
socket_timeout=1,
socket_connect_timeout=1
)
r = redis.Redis(connection_pool=pool)
The pool reuses connections. When your code calls r.get(), it grabs a connection from the pool, uses it, and returns it. No need to open and close connections repeatedly.
max_connections=20 means at most 20 simultaneous connections to Redis. Adjust based on your application's concurrency. For a web app handling 100 simultaneous requests, 20 to 50 connections is usually enough because each Redis operation is so fast.
Key Naming Conventions
As your application grows, you will have hundreds or thousands of different cache keys. A good naming convention is critical for debugging and maintenance.
Use colons to separate parts of the key. This is the standard Redis convention:
# Good: clear, predictable, debuggable
user:42
user:42:profile
user:42:settings
product:99
product:99:reviews
order:12345
search:results:electronics:page1
# Bad: ambiguous, hard to find, no structure
u42
userprofile42
prod_99
srch_elec_p1
When something goes wrong, you will be connecting to Redis directly and running commands like KEYS user:* to see what is cached. Good key names make this possible. Bad key names make it a nightmare.
Monitoring Cache Hit/Miss Rates
If you do not measure your cache performance, you are flying blind. The most important metric is the hit rate: what percentage of requests are served from cache versus going to the database.
class ReadThroughCache:
def __init__(self, redis_client, default_ttl=300):
self.redis = redis_client
self.default_ttl = default_ttl
self.hits = 0
self.misses = 0
def get(self, key, db_fetch_fn, ttl=None):
ttl = ttl if ttl is not None else self.default_ttl
try:
cached = self.redis.get(key)
if cached is not None:
self.hits += 1
return json.loads(cached)
except (redis.ConnectionError, redis.TimeoutError):
pass
self.misses += 1
data = db_fetch_fn()
if data is not None:
try:
self.redis.set(key, json.dumps(data), ex=ttl)
except (redis.ConnectionError, redis.TimeoutError):
pass
return data
@property
def hit_rate(self):
total = self.hits + self.misses
if total == 0:
return 0.0
return (self.hits / total) * 100
def stats(self):
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': f'{self.hit_rate:.1f}%'
}
In production, you would send these metrics to a monitoring system (Prometheus, Datadog, etc.) rather than storing them in memory. But the principle is the same.
What is a good hit rate?
Below 50%: Your cache is barely helping. Check your TTLs or your access patterns.
50% to 80%: Decent. Room for improvement but the cache is doing useful work.
80% to 95%: Good. Most reads are served from cache.
Above 95%: Excellent. Your database is mostly relaxed.
Putting It All Together: Production-Ready Implementation
Here is the complete, production-ready ReadThroughCache class with everything we covered:
import redis
import json
import logging
from typing import Callable, Optional, Any
from datetime import datetime, date
from decimal import Decimal
logger = logging.getLogger(__name__)
class CustomEncoder(json.JSONEncoder):
"""Handles datetime, date, Decimal, and set serialization."""
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return str(obj)
if isinstance(obj, set):
return list(obj)
return super().default(obj)
class ReadThroughCache:
"""Production-ready Read Through cache using Redis.
Usage:
pool = redis.ConnectionPool(host='localhost', port=6379,
db=0, decode_responses=True,
max_connections=20,
socket_timeout=1,
socket_connect_timeout=1)
r = redis.Redis(connection_pool=pool)
cache = ReadThroughCache(r, default_ttl=300)
user = cache.get(
key='user:42',
db_fetch_fn=lambda: fetch_user_from_db(42),
ttl=600
)
"""
def __init__(self, redis_client: redis.Redis, default_ttl: int = 300):
self.redis = redis_client
self.default_ttl = default_ttl
self.hits = 0
self.misses = 0
self.errors = 0
def get(
self,
key: str,
db_fetch_fn: Callable,
ttl: Optional[int] = None
) -> Any:
"""Get data through the cache layer.
Checks Redis first. On miss, calls db_fetch_fn,
caches the result, and returns it. Handles Redis
failures gracefully by falling back to the database.
"""
ttl = ttl if ttl is not None else self.default_ttl
# Step 1: Try Redis
try:
cached = self.redis.get(key)
if cached is not None:
self.hits += 1
return json.loads(cached)
except (redis.ConnectionError, redis.TimeoutError) as e:
self.errors += 1
logger.warning(f'Redis unavailable for key {key}: {e}')
except json.JSONDecodeError as e:
# Corrupted cache entry. Delete it and fetch fresh.
logger.warning(f'Corrupted cache entry for key {key}: {e}')
try:
self.redis.delete(key)
except (redis.ConnectionError, redis.TimeoutError):
pass
# Step 2: Cache miss or Redis failure. Query database.
self.misses += 1
try:
data = db_fetch_fn()
except Exception as e:
logger.error(f'Database fetch failed for key {key}: {e}')
return None
# Step 3: Cache the result
try:
if data is None:
# Cache the miss for a short time to prevent
# repeated DB lookups for non-existent data
self.redis.set(key, json.dumps(None), ex=60)
else:
serialized = json.dumps(data, cls=CustomEncoder)
self.redis.set(key, serialized, ex=ttl)
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f'Could not cache key {key}: {e}')
return data
def invalidate(self, key: str) -> bool:
"""Remove a single key from cache."""
try:
return bool(self.redis.delete(key))
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f'Could not invalidate key {key}: {e}')
return False
def invalidate_pattern(self, pattern: str) -> int:
"""Remove all keys matching a pattern.
Uses SCAN instead of KEYS to avoid blocking Redis."""
count = 0
try:
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor=cursor, match=pattern, count=100
)
if keys:
self.redis.delete(*keys)
count += len(keys)
if cursor == 0:
break
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f'Could not invalidate pattern {pattern}: {e}')
return count
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
if total == 0:
return 0.0
return (self.hits / total) * 100
def stats(self) -> dict:
return {
'hits': self.hits,
'misses': self.misses,
'errors': self.errors,
'hit_rate': f'{self.hit_rate:.1f}%'
}
def reset_stats(self):
self.hits = 0
self.misses = 0
self.errors = 0
When NOT to Use Read Through
Read Through is not always the right choice. Skip it when:
Your data changes on every read. If the data is different every time (like a random recommendation), caching it is pointless. You will always get a miss or stale data.
Your reads are already fast enough. If your database query takes 2 milliseconds and that is acceptable, adding a cache layer just adds complexity for no real benefit.
Data consistency is critical and real-time. If even 1 second of stale data is unacceptable (like a live trading system), caching introduces risk. Write Through or no cache at all might be better.
Your data set is tiny. If your entire database fits in memory and queries are fast, Redis is not going to help much. The overhead of serialization and network calls might even make things slower.
You have very low traffic. Caching shines under load. If you get 10 requests per minute, the cache will mostly expire before anyone asks for the same data twice.