Rapyer Roadmap¶
This is the current roadmap for the rayper package, note that this is just a basic POC. You are welcome to add suggestions and contribute to our development.
Bulk Operations Support¶
Goal: Enable efficient bulk insert, update, and delete operations for multiple models
Tasks¶
- [ ] Bulk Insert: Create multiple models in a single Redis transaction
- [ ] Bulk Update: Update multiple models efficiently
- [ ] Bulk Delete: Delete multiple models in one operation
- [ ] Batch Loading: Load multiple models by keys efficiently
Example Usage¶
# Bulk insert
users = [User(name=f"user{i}") for i in range(100)]
await User.bulk_insert(users)
# Bulk delete
await User.bulk_delete(["user:1", "user:2", "user:3"])
# Batch load
users = await User.bulk_get(["user:1", "user:2", "user:3"])
Benefits: Better performance for large datasets, reduced Redis round trips
Complex Selection and Querying¶
Goal: Enable complex field-based queries using Python comparison operators
Tasks¶
- [ ] Field Comparisons: Support
>,<,>=,<=,==,!=operators on model fields - [ ] Logical Operators: Support
&(and),|(or),~(not) for complex queries - [ ] Query Builder: Build Redis search queries from Python expressions
- [ ] Index Management: Automatic field indexing for query performance
Example Usage¶
# Simple field comparisons
adults = await User.find(User.age > 18).to_list()
active_users = await User.find(User.status == "active").to_list()
# Complex queries with logical operators
target_users = await User.find(
(User.age > 25) & (User.score >= 100) & (User.active == True)
).to_list()
# Single result queries
admin = await User.find_one(User.role == "admin")
Benefits: Pythonic query syntax, improved developer experience, efficient Redis search
Conditional Pipeline Actions¶
Goal: Enable complex conditional logic within pipeline operations
Tasks¶
- [ ] Conditional Execution: Support
if_true()andelse()methods on field comparisons - [ ] Loop Operations: Support iteration over collections in pipelines
- [ ] Nested Conditions: Allow chaining of conditional statements
- [ ] Pipeline Context: Maintain pipeline context through conditional blocks
Example Usage¶
async with user.pipeline() as p:
# Simple conditional
(p.age > 17).if_true(
lambda: p.status.set("adult")
).else(
lambda: p.status.set("minor")
)
# Complex conditional with multiple actions
(p.score >= 100).if_true([
lambda: p.level.set("premium"),
lambda: p.badges.append("high_scorer"),
lambda: p.notifications.update({"achievement": "unlocked"})
])
# Loop operations
for item in p.inventory:
(item.expired).if_true(
lambda: p.inventory.remove(item)
)
Benefits: Advanced pipeline logic, reduced round trips, atomic conditional operations
Field Access and Extraction Support¶
Goal: Enable extraction of specific fields from nested structures using dot notation
Tasks¶
- [ ] Nested Field Access: Support
model.field_lst[0].field_int.get(redis_id)syntax - [ ] Dynamic Field Extraction: Allow runtime field path resolution
- [ ] Type Safety: Maintain type hints through nested access
- [ ] Performance Optimization: Efficient Redis operations for nested field access
Example Usage¶
# Extract specific nested field
value = await model.field_lst[0].field_int.get(redis_id)
# Multiple nested field extractions
values = await model.extract_fields([
"field_lst[0].field_int",
"field_lst[1].field_str",
"nested_dict.sub_field"
], redis_id)
Benefits: Simplified nested data access, improved developer experience, efficient field-specific operations
TTL Postponement on Model Usage¶
Goal: Allow postponing TTL expiration every time a model is accessed or used, preventing deletion while the model remains active
Tasks¶
- [ ] Usage-Based TTL: Update TTL automatically when model is accessed or modified
- [ ] Configurable Behavior: Option to enable/disable TTL postponement per model class
- [ ] TTL Refresh Strategy: Define when and how TTL gets refreshed (read, write, or both)
- [ ] Performance Optimization: Efficient TTL updates without impacting model operations
- [ ] Configuration Options: Allow setting TTL refresh interval and grace periods
Example Usage¶
class User(AtomicRedisModel):
name: str
email: str
class Config:
ttl_postpone_on_access = True # Refresh TTL on any access
ttl_postpone_on_write = True # Refresh TTL on writes only
# TTL gets postponed automatically
user = await User.get("user:123") # TTL refreshed to another 3600 seconds
user.name = "Updated Name" # TTL refreshed again
await user.save()
Benefits: Prevents premature deletion of active models, better cache behavior for frequently accessed data, configurable per-model basis
Unique Data Structure Support¶
Goal: Add support for specialized data structures like priority queues, counters, bloom filters, and other advanced Redis-backed collections
Tasks¶
- [ ] Priority Queue: Implement Redis-backed priority queue using sorted sets with customizable priority logic
- [ ] Counter: Thread-safe counter with atomic increment/decrement operations
- [ ] Bloom Filter: Probabilistic data structure for membership testing
- [ ] Rate Limiter: Token bucket or sliding window rate limiting
- [ ] Circular Buffer: Fixed-size buffer with automatic overwrite behavior
- [ ] Type Registry: System to register and discover custom data structure types
- [ ] Serialization Strategy: Pluggable serialization for complex priority/value types
Example Usage¶
class TaskQueue(AtomicRedisModel):
pending_tasks: RedisPriorityQueue[Task, int] # Task objects with int priority
request_counter: RedisCounter
user_filter: RedisBloomFilter[str] # String membership testing
class Config:
priority_queue_order = "min" # or "max" for max-heap behavior
# Priority queue operations
await queue.pending_tasks.push(task, priority=5)
high_priority_task = await queue.pending_tasks.pop() # Returns highest priority
# Counter operations
await queue.request_counter.increment(5)
current_count = await queue.request_counter.get()
# Bloom filter operations
await queue.user_filter.add("user123")
exists = await queue.user_filter.contains("user123") # True/False with probability
Benefits: Support for advanced use cases, better performance for specialized operations, extensible type system for custom data structures
Lazy and Safe Field Loading¶
Goal: Implement lazy-loading fields that gracefully handle corrupted or unpicklable data without breaking the entire model
Tasks¶
- [ ] Lazy Field Implementation: Create field types that defer loading until accessed
- [ ] Safe Loading Mechanism: Handle corrupted/unpicklable data gracefully without raising errors during model initialization
- [ ] Error Deferral: Store loading errors and raise them only when the specific field is accessed
- [ ] Fallback Strategies: Configurable fallback values or behaviors for failed field loading
- [ ] Type Safety: Maintain type hints and IDE support for lazy fields
- [ ] Performance Optimization: Minimize overhead for successfully loaded fields
Example Usage¶
class User(AtomicRedisModel):
name: str
email: str
profile_data: LazyField[dict] # Will not break model if corrupted
cached_results: SafeField[list, default_factory=list] # Fallback to empty list
class Config:
lazy_fields_on_error = "defer" # or "ignore", "log"
# Model loads successfully even if profile_data is corrupted
user = await User.get("user:123") # Works even with corrupted pickle data
# Error is raised only when accessing the corrupted field
try:
profile = user.profile_data # PickleError raised here, not during model load
except PickleError:
# Handle corrupted data gracefully
user.profile_data = {} # Reset to valid state
Benefits: Resilient model loading, graceful degradation with corrupted data, partial model functionality when some fields fail to load
Sync Redis Client Support¶
Goal: Enable synchronous Redis client support alongside the existing async implementation, providing sync equivalents for all operations
Tasks¶
- [ ] Sync Client Implementation: Add synchronous Redis client wrapper and connection management
- [ ] Sync Model Operations: Implement sync versions of core model operations (sync_get, sync_save, sync_delete, etc.)
- [ ] Sync Pipeline Support: Enable pipeline operations with synchronous client (sync_pipeline context manager)
- [ ] Sync Field Operations: Add synchronous versions for all Redis field types (RedisStr, RedisInt, RedisList, etc.)
- [ ] Configuration Management: Support both sync and async clients in model configuration
- [ ] Testing: Comprehensive test coverage for all sync operations
- [ ] Documentation: Update docs with sync usage patterns and migration guides
Example Usage¶
# Sync model operations
user = User.sync_get("user:123")
user.name = "Updated Name"
user.sync_save()
# Sync pipeline operations
with user.sync_pipeline() as p:
p.score += 10
p.tags.append("new_tag")
# All changes applied atomically
# Sync field operations
user.score.sync_update(100)
tags = user.tags.sync_get_all()
Benefits: Support for synchronous codebases, easier integration with existing sync applications, consistent API patterns between async and sync operations