API Reference
This document consolidates all public APIs for RxDjango in one place.
ContextChannel (Backend)
Base class for creating real-time channels.
- class ContextChannel
Base class for defining a real-time data channel. Subclass this in a file named
channels.pyinside a Django app.- Meta.state: ModelSerializer
The serializer defining the state structure. Can be a single serializer or
serializer(many=True)for list states.
- Meta.auto_update: bool = False
For
many=Truestates, automatically add/remove instances when they match/unmatch the queryset.
- Meta.optimize_anchors: bool = False
Add
_rx_*boolean fields to models for efficient filtering. When enabled, the metaclass adds aBooleanField(withdb_index=True) to the anchor model, allowing optimized queries for active channels.
- Meta.cache_ttl: int | None = None
Per-channel TTL override in seconds. When set, this channel’s cache will expire after the specified duration instead of using the global
RX_CACHE_TTLsetting. Set toNone(default) to use the global setting.
- has_permission(user, **kwargs) bool
Check if a user can access this channel.
- Parameters:
user – The authenticated Django user
kwargs – URL parameters from the WebSocket route
- Returns:
True if access is allowed
- async is_visible(instance_id) bool
For
many=Truechannels, check if an instance should be visible to the connected user.- Parameters:
instance_id – The ID of the instance to check
- async on_connect(tstamp)
Called after authentication succeeds. Use for initialization.
- Parameters:
tstamp – Last known timestamp if reconnecting, or None
- async on_disconnect()
Called when the WebSocket disconnects. Use for cleanup.
- get_instance_id(**kwargs) int | str
Return the anchor instance ID from URL parameters. Override to customize how the anchor ID is extracted.
- Parameters:
kwargs – URL parameters from the WebSocket route
- async list_instances(**kwargs) QuerySet
For
many=Truechannels, return the queryset of anchor instances. Must be implemented by subclasses usingmany=True.
- async add_instance(instance_id, at_beginning=False)
Add an instance to a
many=Truechannel’s list.- Parameters:
instance_id – ID of the instance to add
at_beginning – If True, prepend to the list
- async remove_instance(instance_id)
Remove an instance from a
many=Truechannel’s list.- Parameters:
instance_id – ID of the instance to remove
- async set_runtime_var(var, value)
Set a runtime variable and push it to the connected client.
- Parameters:
var – Variable name
value – Variable value (must be JSON-serializable)
- async send(*args, **kwargs)
Proxy to the underlying
AsyncWebsocketConsumer.send(). Use this to send arbitrary data to the connected client.
- async group_add(group)
Add this consumer to a Django Channels group. Use this in
on_connect()to subscribe to group events handled by@consumerdecorated methods.- Parameters:
group – The group name to join
- async serialize_instance(instance, tstamp=0) dict
Serialize a model instance using the channel’s state model. Returns a flat dictionary suitable for broadcasting.
- Parameters:
instance – The Django model instance to serialize
tstamp – Timestamp to attach to the serialized data
- classmethod broadcast_instance(anchor_id, instance, operation='update')
Manually broadcast an instance update to all connected clients.
- Parameters:
anchor_id – The anchor (root object) ID for the channel
instance – The model instance to broadcast
operation –
'create','update', or'delete'
- classmethod broadcast_notification(anchor_id, notification, user_id=None)
Send a notification to connected clients.
- Parameters:
anchor_id – The anchor ID to target
notification – Dict with notification data
user_id – If provided, only send to this user
- classmethod get_cache_ttl() int
Get the cache TTL in seconds for this channel.
Resolution order:
Meta.cache_ttlon the channel class (per-channel override)RX_CACHE_TTLDjango setting (global override)Default: 604800 (1 week)
- classmethod get_registered_channels() set
Return all registered ContextChannel subclasses. Useful for introspection and cache management tooling.
- can_save(instance, data) bool
Check if the current user may update an existing instance. Called after
Meta.writableand anchor context checks pass. This is a synchronous method.- Parameters:
instance – The database instance (pre-update state)
data – Partial field dict being applied
- Returns:
True to allow, False to deny (rolls back optimistic update)
- Default:
False
Note
The type must be declared with
SAVEinMeta.writableand the instance must belong to the channel’s anchor context before this method is called.
- can_create(model_class, parent, data) bool
Check if the current user may create a new child instance. Called after
Meta.writableand anchor context checks pass. This is a synchronous method.- Parameters:
model_class – The child model being instantiated
parent – The parent instance that owns the relation
data – Field dict from the frontend
- Returns:
True to allow, False to deny (removes temporary instance)
- Default:
False
Note
The type must be declared with
CREATEinMeta.writableand the parent must belong to the channel’s anchor context before this method is called.
- can_delete(instance) bool
Check if the current user may delete an existing instance. Called after
Meta.writableand anchor context checks pass. This is a synchronous method.- Parameters:
instance – The database instance to delete
- Returns:
True to allow, False to deny (restores instance in UI)
- Default:
False
Note
The type must be declared with
DELETEinMeta.writableand the instance must belong to the channel’s anchor context before this method is called.
Decorators
@action
- @action
Expose a ContextChannel method as a frontend-callable RPC action.
The method must be
async. It will be available on the generated TypeScript channel class with automatic parameter serialization. Type hints are inspected to auto-convert parameters (e.g.datetimestrings are converted todatetimeobjects).Example:
@action async def update_status(self, status: str) -> dict: self.instance.status = status self.instance.save() return {"success": True}
Frontend usage:
const result = await channel.updateStatus("active");
@consumer
- @consumer(event_type)
Subscribe a ContextChannel method to Django Channels group events.
The decorated method will be called when an event of the specified type is received on any group the consumer is subscribed to.
- Parameters:
event_type – The event type string to listen for
Example:
async def on_connect(self, tstamp): await self.group_add('chat_room_123') @consumer('chat.message') async def handle_chat(self, event): await self.send(text_data=json.dumps(event['data']))
Serializer Meta Options
RxDjango extends the standard DRF serializer Meta class with
additional options for real-time behavior.
- Meta.user_key: str
Field name that identifies the owning user. Instances are only sent to the user matching this field value.
Example:
class TaskSerializer(serializers.ModelSerializer): class Meta: model = Task fields = ['id', 'title', 'owner'] user_key = 'owner'
- Meta.optimistic: bool = False
Enable optimistic updates. The frontend can immediately reflect changes before server confirmation.
- Meta.optimistic_timeout: int = 3
Seconds before server state overrides optimistic updates.
Exceptions
All exceptions are defined in rxdjango.exceptions.
- exception UnknownProperty
Raised when a serializer references a model property that does not exist. Typically indicates a missing
@related_propertydecorator on a custom model property.
- exception AnchorDoesNotExist
Raised when the requested anchor object cannot be found during WebSocket connection. Occurs when the anchor ID from the URL route does not match any database record.
- exception UnauthorizedError
Raised when authentication fails due to a missing or invalid token. Results in a 401 status code sent to the client.
- exception ForbiddenError
Raised when an authenticated user lacks permission to access a channel. Occurs when
has_permission()returnsFalse, resulting in a 403 status code.
- exception RxDjangoBug
Raised when an internal invariant is violated. Indicates a bug in RxDjango itself. If encountered, please report it as an issue.
- exception ActionNotAsync
Raised during channel initialization when an
@action-decorated method is not defined withasync def. All action methods must be async.
- exception WriteError
Raised when a write operation (save, create, delete) fails due to invalid data, missing instances, or other request errors. Results in a 400 error code sent to the client.
WebSocket Protocol
The WebSocket message format used between the Django backend and React frontend.
Incoming Messages (Client → Server)
Authentication (must be the first message after connecting):
{"token": "<rest_framework_auth_token>", "last_update": <timestamp|null>}
Action call (RPC):
{"callId": <unique_id>, "action": "methodName", "params": [...]}
Write operations (optimistic updates):
# Save an existing instance
{"type": "write", "writeId": <id>, "operation": "save",
"instanceType": "app.Serializer", "instanceId": <id>, "data": {...}}
# Create a new child instance
{"type": "write", "writeId": <id>, "operation": "create",
"instanceType": "app.ChildSerializer", "parentType": "app.ParentSerializer",
"parentId": <id>, "relationName": "children", "data": {...}}
# Delete an instance
{"type": "write", "writeId": <id>, "operation": "delete",
"instanceType": "app.Serializer", "instanceId": <id>}
Outgoing Messages (Server → Client)
Connection status:
{"status_code": 200}
{"status_code": 401, "error": "error/unauthorized"}
{"status_code": 403, "error": "error/forbidden"}
{"status_code": 404, "error": "error/not-found"}
Initial anchor list (sent after authentication):
{"initialAnchors": [1, 2, 3]}
State instances (array of flat instances):
[{"id": 1, "_instance_type": "app.Serializer", "_tstamp": ..., ...}, ...]
End of initial state marker:
[{"_instance_type": "", "_tstamp": ..., "_operation": "end_initial_state", "id": 0}]
Action response:
{"callId": <id>, "result": ...}
{"callId": <id>, "error": "Error"}
Write response:
{"type": "writeResponse", "writeId": <id>, "success": true}
{"type": "writeResponse", "writeId": <id>, "success": false,
"error": {"code": 403, "message": "Permission denied"}}
Write error codes: 400 (instance not found, not in context, bad data),
403 (type/operation not in Meta.writable, or can_* denied), 500 (server error).
Runtime state change:
{"runtimeVar": "varName", "value": ...}
Prepend anchor (for many=True channels):
{"prependAnchor": <anchor_id>}
Frontend API (TypeScript)
ContextChannel
- class ContextChannel<T, Y>()
Base class for generated TypeScript channel classes. Manages WebSocket connection, authentication, state rebuilding, and RPC.
- Arguments:
T – Type of the root/anchor state
Y – Type of the runtime state (optional)
- ContextChannel<T, Y>.constructor(token)
- Arguments:
token – Django REST Framework auth token
- ContextChannel<T, Y>.init()
Initialize the WebSocket and StateBuilder. Called automatically on first
subscribe().
- ContextChannel<T, Y>.disconnect()
Gracefully close the WebSocket connection.
- ContextChannel<T, Y>.subscribe(listener, noConnectionListener?)
Subscribe to state changes. Auto-connects on first subscriber.
- Arguments:
listener – Callback invoked with new state on every update
noConnectionListener – Optional callback for connection status
- Returns:
Unsubscribe function
- ContextChannel<T, Y>.subscribeInstance(listener, instance_id, instance_type)
Subscribe to changes on a specific instance.
- Arguments:
listener – Callback invoked when instance changes
instance_id – The instance ID
instance_type – The
_instance_typestring
- Returns:
Unsubscribe function
- ContextChannel<T, Y>.getInstance(instance_type, instance_id)
Get a specific instance from state.
- param instance_type:
The
_instance_typestring- param instance_id:
The instance ID
- returns:
The instance, or null if not found
- ContextChannel<T, Y>.callAction(action, params)
Call a backend
@actionmethod via WebSocket RPC. Used internally by generated subclass methods.- Arguments:
action – The snake_case action name
params – Array of parameters
- Returns:
Promise resolving with the action’s return value
- ContextChannel<T, Y>.saveInstance(instanceType, instanceId, data)
Save changes to an existing instance with optimistic update. The UI updates immediately, then reconciles when the server confirms or rolls back on error.
- Arguments:
instanceType – The
_instance_typestringinstanceId – The instance ID
data – Partial field dict to apply
- Returns:
Promise that resolves on success, rejects with error
- ContextChannel<T, Y>.createInstance(instanceType, parentType, parentId, relationName, data)
Create a new child instance with optimistic update. A temporary negative ID is assigned until the server responds.
- Arguments:
instanceType – The
_instance_typefor the new instanceparentType – The parent’s
_instance_typeparentId – The parent instance ID
relationName – The relation field name on the parent
data – Field dict for the new instance
- Returns:
Promise resolving with the temporary ID
- ContextChannel<T, Y>.deleteInstance(instanceType, instanceId)
Delete an instance with optimistic removal. The instance is immediately removed from UI state and restored on error.
- Arguments:
instanceType – The
_instance_typestringinstanceId – The instance ID
- Returns:
Promise that resolves on success, rejects with error
useChannelState
- useChannelState(channel)
React hook for subscribing to channel state.
- Arguments:
channel – A ContextChannel instance, or undefined
- Returns:
Object with
state,connected,no_connection_since,runtimeState,empty, anderrorfields
useChannelInstance
- useChannelInstance(channel, instance_type, instance_id)
React hook for subscribing to a specific instance.
- Arguments:
channel – A ContextChannel instance, or undefined
instance_type – The
_instance_typestringinstance_id – The instance ID, or undefined
- Returns:
The instance, or null if not available
StateBuilder
- class StateBuilder<T>()
Reconstructs nested state from flat server instances.
Maintains an instance registry indexed by
_instance_type:idand rebuilds the nested structure by replacing foreign key IDs with object references. Reference changes propagate recursively to trigger React re-renders.- StateBuilder<T>.update(instances)
Process a batch of instance updates from the server.
- Arguments:
instances – Array of flat instances with
_instance_type
- StateBuilder<T>.state
The current rebuilt nested state. Returns a new reference on each access to trigger React re-renders.
PersistentWebSocket
- class PersistentWebSocket()
WebSocket wrapper with automatic reconnection and message routing.
Features exponential backoff retry on connection drops, and routes incoming messages to type-specific handlers.
- PersistentWebSocket.connect()
Initiate WebSocket connection and send auth token.
- PersistentWebSocket.send(data)
Send a string message through the WebSocket.
- PersistentWebSocket.disconnect(reason?)
Close the connection. If reason is provided, prevents reconnection.
Cache System
RxDjango uses a two-layer cache for performance:
MongoDB (Persistent State)
Stores flattened instance data with optimistic locking.
One collection per channel (named
<channel_name>)Documents contain serialized instance data plus metadata
_tstampfield for change tracking and incremental loadingOptimistic locking prevents concurrent write conflicts
Documents exceeding MongoDB’s 16MB limit are transparently stored in GridFS and referenced via a
_grid_reffield
Redis (Coordination)
Handles transient state and coordination:
Cache state management (COLD / HEATING / HOT / COOLING)
Active WebSocket session tracking per anchor
last_disconnecttimestamps for TTL-based expiryInstance lists during HEATING and COOLING transitions
Pub/sub triggers for coordinating concurrent state loads
Timestamp generation for consistent ordering
Management Commands
expire_rxdjango_cache
Expire stale caches that have exceeded their configured TTL.
# Expire all stale caches
python manage.py expire_rxdjango_cache
# Preview what would be expired without making changes
python manage.py expire_rxdjango_cache --dry-run
This command is idempotent and safe to run concurrently (atomic Lua scripts prevent double transitions). Schedule it via cron or Celery beat:
# cron - run every 5 minutes
*/5 * * * * cd /path/to/project && python manage.py expire_rxdjango_cache
# Celery beat
CELERY_BEAT_SCHEDULE = {
'expire-rx-caches': {
'task': 'myapp.tasks.expire_rx_caches',
'schedule': 300,
},
}
broadcast_system_message
Broadcast a message to all connected WebSocket clients via the system channel.
python manage.py broadcast_system_message <source> <message>
The source parameter identifies the type of message (e.g. maintenance).
The message parameter is the human-readable message text.
makefrontend
Generate TypeScript interfaces and channel classes from Django serializers and ContextChannel subclasses.
python manage.py makefrontend # Generate all TS files
python manage.py makefrontend --dry-run # Preview changes
python manage.py makefrontend --force # Force rebuild all files
Configuration
Required Django Settings
- REDIS_URL: str
Redis connection URL.
Example:
redis://127.0.0.1:6379/0
- MONGO_URL: str
MongoDB connection URL.
Example:
mongodb://localhost:27017/
- MONGO_STATE_DB: str
MongoDB database name for state cache.
Example:
hot_state
- RX_FRONTEND_DIR: str
Path where
makefrontendwrites generated TypeScript files.
- RX_WEBSOCKET_URL: str
WebSocket URL for frontend connections.
Example:
http://localhost:8000/ws
Optional Settings
- RX_CACHE_TTL: int = 604800
Global cache TTL in seconds. When no active WebSocket sessions are connected to an anchor, the TTL countdown begins from the last disconnect time. After the TTL elapses, the
expire_rxdjango_cachecommand will transition the cache from HOT to COLD.Defaults to 604800 (1 week). Can be overridden per-channel via
Meta.cache_ttlon the ContextChannel subclass.
- RXDJANGO_SYSTEM_CHANNEL: str = '_rxdjango_system'
Name of the Django Channels group used for system-wide broadcasts (e.g. maintenance messages via
broadcast_system_message).
- TESTING: bool = False
When
True(along withDEBUG), the state loader annotates initial state instances with a_cache_statefield indicating whether the data came from COLD, HEATING, or HOT cache. Useful for debugging cache behavior during development.
Required Django Apps
INSTALLED_APPS = [
'rxdjango', # Must come before daphne
'daphne', # Must come before staticfiles
'django.contrib.staticfiles',
'channels',
]
ASGI_APPLICATION = 'your_project.asgi.application'