Atomic Actions¶
Rapyer provides powerful atomic operations that ensure data consistency and prevent race conditions in concurrent environments. The most user-friendly approach is using the pipeline context manager, which allows you to perform multiple operations atomically while working with your models in a natural, Pythonic way.
Pipeline Context Manager¶
The pipeline is a context manager that batches all Redis operations performed on a model into a single atomic transaction. When you enter the pipeline context, the model is automatically loaded to its current state, and all changes are applied atomically when the context exits.
Basic Usage¶
from rapyer import AtomicRedisModel
from typing import List, Dict
class User(AtomicRedisModel):
name: str
score: int = 0
achievements: List[str] = []
metadata: Dict[str, str] = {}
async def update_user_progress(user: User, points: int, achievement: str):
# All operations inside this context are atomic
async with user.pipeline() as pipeline_user:
# Model is automatically loaded to current state
user.score += points
user.achievements.append(achievement)
user.metadata["last_update"] = "2024-01-15"
# All changes are saved atomically when context exits
Key Benefits¶
- Simplicity: No need to call async methods - just modify attributes directly
- Atomicity: All operations succeed or fail together
- Auto-loading: Model state is refreshed when entering the context
- Performance: Multiple operations batched into single Redis transaction
Supported Operations¶
The pipeline context manager supports atomic operations for all Redis types:
- List operations:
append(),extend(),insert(),pop(),remove(),clear() - Dictionary operations:
update(), item assignment (dict[key] = value),pop(),clear() - String operations: Direct assignment and modification
- Integer operations: Direct assignment, arithmetic operations
- Bytes operations: Direct assignment and modification
Real-World Examples¶
User Score and Achievement System¶
class GameUser(AtomicRedisModel):
username: str
total_score: int = 0
level: int = 1
achievements: List[str] = []
stats: Dict[str, int] = {}
async def complete_level(user: GameUser, level_score: int, level_name: str):
async with user.pipeline():
# Update score and level
user.total_score += level_score
if user.total_score > user.level * 1000:
user.level += 1
user.achievements.append(f"Reached Level {user.level}")
# Add level-specific achievement
user.achievements.append(f"Completed {level_name}")
# Update stats
user.stats["levels_completed"] = user.stats.get("levels_completed", 0) + 1
user.stats["last_level_score"] = level_score
Shopping Cart Operations¶
class ShoppingCart(AtomicRedisModel):
user_id: str
items: List[str] = []
quantities: Dict[str, int] = {}
total_price: int = 0
async def add_items_to_cart(cart: ShoppingCart, items_with_prices: List[tuple]):
async with cart.pipeline():
for item_id, quantity, price_per_item in items_with_prices:
# Add item to cart
cart.items.append(item_id)
# Set quantity
cart.quantities[item_id] = cart.quantities.get(item_id, 0) + quantity
# Update total price
cart.total_price += price_per_item * quantity
User Profile Updates¶
class UserProfile(AtomicRedisModel):
username: str
email: str
preferences: Dict[str, str] = {}
activity_log: List[str] = []
last_login: str = ""
async def update_user_settings(profile: UserProfile, new_email: str, theme: str):
async with profile.pipeline():
# Update email
old_email = profile.email
profile.email = new_email
# Update preferences
profile.preferences["theme"] = theme
profile.preferences["email_notifications"] = "enabled"
# Log the activity
profile.activity_log.append(f"Email changed from {old_email} to {new_email}")
profile.activity_log.append(f"Theme changed to {theme}")
# Update last login
from datetime import datetime
profile.last_login = datetime.now().isoformat()
Error Handling¶
Pipeline operations are atomic - if any operation fails, all changes are rolled back:
async def safe_user_update(user: User):
try:
async with user.pipeline():
user.score += 100
user.achievements.append("New Achievement")
user.metadata["invalid_key"] = "some_value" # This might fail
except Exception as e:
print(f"Update failed: {e}")
# All changes are automatically rolled back
# User state remains unchanged in Redis
Advanced Usage with Error Recovery¶
async def robust_cart_update(cart: ShoppingCart, items: List[dict]):
max_retries = 3
for attempt in range(max_retries):
try:
async with cart.pipeline():
for item in items:
cart.items.append(item["id"])
cart.quantities[item["id"]] = item["quantity"]
cart.total_price += item["price"] * item["quantity"]
print("Cart updated successfully")
break
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(0.1) # Brief delay before retry
else:
print(f"All attempts failed: {e}")
raise
When to Use Pipeline Context Manager¶
Perfect for: - Multiple related changes that should succeed or fail together - Batch updates for performance - Natural Python-like operations on lists and dictionaries - User-facing operations where atomicity prevents data corruption
Consider alternatives for:
- Single field updates (use field.asave())
- Complex conditional logic requiring current values (use lock context manager)
- Operations requiring complex Python logic or conditional branching
The pipeline context manager provides the most intuitive way to ensure your Redis operations are atomic while keeping your code clean and readable.
Lock Context Manager¶
When you need atomic operations but require more complex Python logic or conditional operations, the lock context manager provides exclusive access to a model. The lock ensures that only one process can modify the model at a time, and it automatically updates the local model state to reflect the current values in Redis.
How Lock Works¶
The lock context manager:
- Acquires an exclusive lock on the model in Redis
- Refreshes the local model with the latest data from Redis
- Allows any Python operations on the model
- Automatically saves changes when the context exits
- Releases the lock after saving
Basic Lock Usage¶
from rapyer import AtomicRedisModel
class BankAccount(AtomicRedisModel):
balance: int = 0
transaction_count: int = 0
account_holder: str = ""
async def transfer_money(from_account_key: str, to_account_key: str, amount: int):
# Lock both accounts to prevent race conditions
async with BankAccount.lock_from_key(from_account_key, "transfer") as from_account:
async with BankAccount.lock_from_key(to_account_key, "transfer") as to_account:
# Models are automatically refreshed with latest Redis state
if from_account.balance >= amount:
# Pure Python operations - no async needed
from_account.balance -= amount
from_account.transaction_count += 1
to_account.balance += amount
to_account.transaction_count += 1
return f"Transferred ${amount} successfully"
else:
return "Insufficient funds"
# All changes are automatically saved when contexts exit
Lock with Operation Names¶
You can specify operation names to allow multiple types of operations on the same model instance to run concurrently. The lock is specific to both the model instance and the operation name - different operation names can execute simultaneously, while the same operation name will be serialized.
Important: The lock is scoped to the specific model instance (by its key). Different model instances have completely separate locks and won't wait for each other, even if using the same operation name.
class User(AtomicRedisModel):
name: str
email: str
profile_views: int = 0
last_login: str = ""
settings: Dict[str, str] = {}
# These can run concurrently (different lock actions)
async def update_profile(user_key: str, new_name: str, new_email: str):
async with User.lock_from_key(user_key, "profile_update") as user:
# Model state refreshed from Redis
user.name = new_name
user.email = new_email
async def track_page_view(user_key: str):
async with User.lock_from_key(user_key, "view_tracking") as user:
# Independent operation with separate lock
user.profile_views += 1
async def update_login_time(user_key: str):
async with User.lock_from_key(user_key, "login_update") as user:
from datetime import datetime
user.last_login = datetime.now().isoformat()
# This would be serialized with other "profile_update" locks on the SAME user
async def another_profile_update(user_key: str):
async with User.lock_from_key(user_key, "profile_update") as user:
# Must wait for other "profile_update" operations on this specific user to complete
user.settings["theme"] = "dark"
Model Instance Lock Isolation¶
Each model instance has its own independent locks. Operations on different instances never interfere with each other:
# These operations can ALL run concurrently, even with the same operation name
async def concurrent_example():
import asyncio
# All of these will run simultaneously - different model instances
await asyncio.gather(
User.lock_from_key("user:123", "profile_update"), # User 123
User.lock_from_key("user:456", "profile_update"), # User 456
User.lock_from_key("user:789", "profile_update"), # User 789
)
# But these would be serialized - same instance, same operation
# Only one can run at a time for user:123 with "profile_update"
async with User.lock_from_key("user:123", "profile_update") as user:
user.name = "First Update"
async with User.lock_from_key("user:123", "profile_update") as user:
user.name = "Second Update" # Waits for first to complete
# Different operation names on same instance - these CAN run concurrently
async def concurrent_operations_same_user():
import asyncio
user_key = "user:123"
# These run simultaneously - same instance, different operations
await asyncio.gather(
update_profile(user_key, "New Name", "new@email.com"), # "profile_update"
track_page_view(user_key), # "view_tracking"
update_login_time(user_key) # "login_update"
)
Locking Existing Model Instances¶
You can also lock an existing model instance:
async def update_user_safely(user: User):
async with user.lock("settings_update") as locked_user:
# locked_user has refreshed state from Redis
locked_user.settings["last_updated"] = "2024-01-15"
locked_user.settings["updated_by"] = "admin"
# Changes saved automatically when context exits
Complex Business Logic Example¶
The lock context manager is perfect for operations requiring conditional logic:
class GameCharacter(AtomicRedisModel):
name: str
level: int = 1
experience: int = 0
health: int = 100
inventory: List[str] = []
skills: Dict[str, int] = {}
async def level_up_character(character_key: str, exp_gained: int):
async with GameCharacter.lock_from_key(character_key, "level_up") as character:
# Character state is refreshed from Redis
character.experience += exp_gained
# Complex conditional logic
if character.experience >= character.level * 1000:
old_level = character.level
character.level += 1
character.health = 100 # Full heal on level up
# Give level-based rewards
if character.level % 5 == 0: # Every 5 levels
character.inventory.append(f"legendary_item_level_{character.level}")
# Update skills based on new level
for skill in character.skills:
if character.level > 10:
character.skills[skill] += 2
else:
character.skills[skill] += 1
return f"Leveled up from {old_level} to {character.level}!"
else:
needed_exp = (character.level * 1000) - character.experience
return f"Need {needed_exp} more experience to level up"
Error Handling with Locks¶
If an error occurs within the lock context, changes are not saved:
async def safe_account_operation(account_key: str, amount: int):
try:
async with BankAccount.lock_from_key(account_key, "withdraw") as account:
if account.balance < amount:
raise ValueError("Insufficient funds")
if amount > 10000:
raise ValueError("Daily limit exceeded")
account.balance -= amount
account.transaction_count += 1
except ValueError as e:
print(f"Operation failed: {e}")
# No changes saved to Redis - account state unchanged
return False
return True
When to Use Pipeline vs Lock¶
| Feature | Pipeline | Lock |
|---|---|---|
| Best for | Batch operations, list/dict changes | Complex logic, conditional operations |
| Python operations | Limited to supported types | Any Python code |
| Concurrency | High performance batching | Exclusive access with action-based concurrency |
| State refresh | Automatic on entry | Automatic on entry |
| Use when | Multiple related changes | Need current values for decisions |
Pipeline Example (Good for batch operations)¶
async with user.pipeline():
user.score += 100
user.achievements.append("New Achievement")
user.stats["games_played"] = user.stats.get("games_played", 0) + 1
Lock Example (Good for complex logic)¶
async with user.lock("score_update") as locked_user:
if locked_user.score >= 1000:
locked_user.level += 1
locked_user.achievements.append(f"Reached Level {locked_user.level}")
if locked_user.level % 10 == 0:
locked_user.inventory.append("special_reward")
locked_user.score += 100
The lock context manager provides the flexibility to perform any Python operations while ensuring data consistency and preventing race conditions through exclusive access control.