API Reference

This document consolidates all public APIs for RxDjango in one place.

ContextChannel (Backend)

Base class for creating real-time channels.

class ContextChannel

Base class for defining a real-time data channel. Subclass this in a file named channels.py inside a Django app.

Meta.state: ModelSerializer

The serializer defining the state structure. Can be a single serializer or serializer(many=True) for list states.

Meta.auto_update: bool = False

For many=True states, automatically add/remove instances when they match/unmatch the queryset.

Meta.optimize_anchors: bool = False

Add _rx_* boolean fields to models for efficient filtering. When enabled, the metaclass adds a BooleanField (with db_index=True) to the anchor model, allowing optimized queries for active channels.

Meta.cache_ttl: int | None = None

Per-channel TTL override in seconds. When set, this channel’s cache will expire after the specified duration instead of using the global RX_CACHE_TTL setting. Set to None (default) to use the global setting.

has_permission(user, **kwargs) bool

Check if a user can access this channel.

Parameters:
  • user – The authenticated Django user

  • kwargs – URL parameters from the WebSocket route

Returns:

True if access is allowed

async is_visible(instance_id) bool

For many=True channels, check if an instance should be visible to the connected user.

Parameters:

instance_id – The ID of the instance to check

async on_connect(tstamp)

Called after authentication succeeds. Use for initialization.

Parameters:

tstamp – Last known timestamp if reconnecting, or None

async on_disconnect()

Called when the WebSocket disconnects. Use for cleanup.

get_instance_id(**kwargs) int | str

Return the anchor instance ID from URL parameters. Override to customize how the anchor ID is extracted.

Parameters:

kwargs – URL parameters from the WebSocket route

async list_instances(**kwargs) QuerySet

For many=True channels, return the queryset of anchor instances. Must be implemented by subclasses using many=True.

async add_instance(instance_id, at_beginning=False)

Add an instance to a many=True channel’s list.

Parameters:
  • instance_id – ID of the instance to add

  • at_beginning – If True, prepend to the list

async remove_instance(instance_id)

Remove an instance from a many=True channel’s list.

Parameters:

instance_id – ID of the instance to remove

async set_runtime_var(var, value)

Set a runtime variable and push it to the connected client.

Parameters:
  • var – Variable name

  • value – Variable value (must be JSON-serializable)

async send(*args, **kwargs)

Proxy to the underlying AsyncWebsocketConsumer.send(). Use this to send arbitrary data to the connected client.

async group_add(group)

Add this consumer to a Django Channels group. Use this in on_connect() to subscribe to group events handled by @consumer decorated methods.

Parameters:

group – The group name to join

async serialize_instance(instance, tstamp=0) dict

Serialize a model instance using the channel’s state model. Returns a flat dictionary suitable for broadcasting.

Parameters:
  • instance – The Django model instance to serialize

  • tstamp – Timestamp to attach to the serialized data

classmethod broadcast_instance(anchor_id, instance, operation='update')

Manually broadcast an instance update to all connected clients.

Parameters:
  • anchor_id – The anchor (root object) ID for the channel

  • instance – The model instance to broadcast

  • operation'create', 'update', or 'delete'

classmethod broadcast_notification(anchor_id, notification, user_id=None)

Send a notification to connected clients.

Parameters:
  • anchor_id – The anchor ID to target

  • notification – Dict with notification data

  • user_id – If provided, only send to this user

classmethod get_cache_ttl() int

Get the cache TTL in seconds for this channel.

Resolution order:

  1. Meta.cache_ttl on the channel class (per-channel override)

  2. RX_CACHE_TTL Django setting (global override)

  3. Default: 604800 (1 week)

classmethod get_registered_channels() set

Return all registered ContextChannel subclasses. Useful for introspection and cache management tooling.

can_save(instance, data) bool

Check if the current user may update an existing instance. Called after Meta.writable and anchor context checks pass. This is a synchronous method.

Parameters:
  • instance – The database instance (pre-update state)

  • data – Partial field dict being applied

Returns:

True to allow, False to deny (rolls back optimistic update)

Default:

False

Note

The type must be declared with SAVE in Meta.writable and the instance must belong to the channel’s anchor context before this method is called.

can_create(model_class, parent, data) bool

Check if the current user may create a new child instance. Called after Meta.writable and anchor context checks pass. This is a synchronous method.

Parameters:
  • model_class – The child model being instantiated

  • parent – The parent instance that owns the relation

  • data – Field dict from the frontend

Returns:

True to allow, False to deny (removes temporary instance)

Default:

False

Note

The type must be declared with CREATE in Meta.writable and the parent must belong to the channel’s anchor context before this method is called.

can_delete(instance) bool

Check if the current user may delete an existing instance. Called after Meta.writable and anchor context checks pass. This is a synchronous method.

Parameters:

instance – The database instance to delete

Returns:

True to allow, False to deny (restores instance in UI)

Default:

False

Note

The type must be declared with DELETE in Meta.writable and the instance must belong to the channel’s anchor context before this method is called.

Decorators

@action

@action

Expose a ContextChannel method as a frontend-callable RPC action.

The method must be async. It will be available on the generated TypeScript channel class with automatic parameter serialization. Type hints are inspected to auto-convert parameters (e.g. datetime strings are converted to datetime objects).

Example:

@action
async def update_status(self, status: str) -> dict:
    self.instance.status = status
    self.instance.save()
    return {"success": True}

Frontend usage:

const result = await channel.updateStatus("active");

@consumer

@consumer(event_type)

Subscribe a ContextChannel method to Django Channels group events.

The decorated method will be called when an event of the specified type is received on any group the consumer is subscribed to.

Parameters:

event_type – The event type string to listen for

Example:

async def on_connect(self, tstamp):
    await self.group_add('chat_room_123')

@consumer('chat.message')
async def handle_chat(self, event):
    await self.send(text_data=json.dumps(event['data']))

Serializer Meta Options

RxDjango extends the standard DRF serializer Meta class with additional options for real-time behavior.

Meta.user_key: str

Field name that identifies the owning user. Instances are only sent to the user matching this field value.

Example:

class TaskSerializer(serializers.ModelSerializer):
    class Meta:
        model = Task
        fields = ['id', 'title', 'owner']
        user_key = 'owner'
Meta.optimistic: bool = False

Enable optimistic updates. The frontend can immediately reflect changes before server confirmation.

Meta.optimistic_timeout: int = 3

Seconds before server state overrides optimistic updates.

Exceptions

All exceptions are defined in rxdjango.exceptions.

exception UnknownProperty

Raised when a serializer references a model property that does not exist. Typically indicates a missing @related_property decorator on a custom model property.

exception AnchorDoesNotExist

Raised when the requested anchor object cannot be found during WebSocket connection. Occurs when the anchor ID from the URL route does not match any database record.

exception UnauthorizedError

Raised when authentication fails due to a missing or invalid token. Results in a 401 status code sent to the client.

exception ForbiddenError

Raised when an authenticated user lacks permission to access a channel. Occurs when has_permission() returns False, resulting in a 403 status code.

exception RxDjangoBug

Raised when an internal invariant is violated. Indicates a bug in RxDjango itself. If encountered, please report it as an issue.

exception ActionNotAsync

Raised during channel initialization when an @action-decorated method is not defined with async def. All action methods must be async.

exception WriteError

Raised when a write operation (save, create, delete) fails due to invalid data, missing instances, or other request errors. Results in a 400 error code sent to the client.

WebSocket Protocol

The WebSocket message format used between the Django backend and React frontend.

Incoming Messages (Client → Server)

Authentication (must be the first message after connecting):

{"token": "<rest_framework_auth_token>", "last_update": <timestamp|null>}

Action call (RPC):

{"callId": <unique_id>, "action": "methodName", "params": [...]}

Write operations (optimistic updates):

# Save an existing instance
{"type": "write", "writeId": <id>, "operation": "save",
 "instanceType": "app.Serializer", "instanceId": <id>, "data": {...}}

# Create a new child instance
{"type": "write", "writeId": <id>, "operation": "create",
 "instanceType": "app.ChildSerializer", "parentType": "app.ParentSerializer",
 "parentId": <id>, "relationName": "children", "data": {...}}

# Delete an instance
{"type": "write", "writeId": <id>, "operation": "delete",
 "instanceType": "app.Serializer", "instanceId": <id>}

Outgoing Messages (Server → Client)

Connection status:

{"status_code": 200}
{"status_code": 401, "error": "error/unauthorized"}
{"status_code": 403, "error": "error/forbidden"}
{"status_code": 404, "error": "error/not-found"}

Initial anchor list (sent after authentication):

{"initialAnchors": [1, 2, 3]}

State instances (array of flat instances):

[{"id": 1, "_instance_type": "app.Serializer", "_tstamp": ..., ...}, ...]

End of initial state marker:

[{"_instance_type": "", "_tstamp": ..., "_operation": "end_initial_state", "id": 0}]

Action response:

{"callId": <id>, "result": ...}
{"callId": <id>, "error": "Error"}

Write response:

{"type": "writeResponse", "writeId": <id>, "success": true}
{"type": "writeResponse", "writeId": <id>, "success": false,
 "error": {"code": 403, "message": "Permission denied"}}

Write error codes: 400 (instance not found, not in context, bad data), 403 (type/operation not in Meta.writable, or can_* denied), 500 (server error).

Runtime state change:

{"runtimeVar": "varName", "value": ...}

Prepend anchor (for many=True channels):

{"prependAnchor": <anchor_id>}

Frontend API (TypeScript)

ContextChannel

class ContextChannel<T, Y>()

Base class for generated TypeScript channel classes. Manages WebSocket connection, authentication, state rebuilding, and RPC.

Arguments:
  • T – Type of the root/anchor state

  • Y – Type of the runtime state (optional)

ContextChannel<T, Y>.constructor(token)
Arguments:
  • token – Django REST Framework auth token

ContextChannel<T, Y>.init()

Initialize the WebSocket and StateBuilder. Called automatically on first subscribe().

ContextChannel<T, Y>.disconnect()

Gracefully close the WebSocket connection.

ContextChannel<T, Y>.subscribe(listener, noConnectionListener?)

Subscribe to state changes. Auto-connects on first subscriber.

Arguments:
  • listener – Callback invoked with new state on every update

  • noConnectionListener – Optional callback for connection status

Returns:

Unsubscribe function

ContextChannel<T, Y>.subscribeInstance(listener, instance_id, instance_type)

Subscribe to changes on a specific instance.

Arguments:
  • listener – Callback invoked when instance changes

  • instance_id – The instance ID

  • instance_type – The _instance_type string

Returns:

Unsubscribe function

ContextChannel<T, Y>.getInstance(instance_type, instance_id)

Get a specific instance from state.

param instance_type:

The _instance_type string

param instance_id:

The instance ID

returns:

The instance, or null if not found

ContextChannel<T, Y>.callAction(action, params)

Call a backend @action method via WebSocket RPC. Used internally by generated subclass methods.

Arguments:
  • action – The snake_case action name

  • params – Array of parameters

Returns:

Promise resolving with the action’s return value

ContextChannel<T, Y>.saveInstance(instanceType, instanceId, data)

Save changes to an existing instance with optimistic update. The UI updates immediately, then reconciles when the server confirms or rolls back on error.

Arguments:
  • instanceType – The _instance_type string

  • instanceId – The instance ID

  • data – Partial field dict to apply

Returns:

Promise that resolves on success, rejects with error

ContextChannel<T, Y>.createInstance(instanceType, parentType, parentId, relationName, data)

Create a new child instance with optimistic update. A temporary negative ID is assigned until the server responds.

Arguments:
  • instanceType – The _instance_type for the new instance

  • parentType – The parent’s _instance_type

  • parentId – The parent instance ID

  • relationName – The relation field name on the parent

  • data – Field dict for the new instance

Returns:

Promise resolving with the temporary ID

ContextChannel<T, Y>.deleteInstance(instanceType, instanceId)

Delete an instance with optimistic removal. The instance is immediately removed from UI state and restored on error.

Arguments:
  • instanceType – The _instance_type string

  • instanceId – The instance ID

Returns:

Promise that resolves on success, rejects with error

useChannelState

useChannelState(channel)

React hook for subscribing to channel state.

Arguments:
  • channel – A ContextChannel instance, or undefined

Returns:

Object with state, connected, no_connection_since, runtimeState, empty, and error fields

useChannelInstance

useChannelInstance(channel, instance_type, instance_id)

React hook for subscribing to a specific instance.

Arguments:
  • channel – A ContextChannel instance, or undefined

  • instance_type – The _instance_type string

  • instance_id – The instance ID, or undefined

Returns:

The instance, or null if not available

StateBuilder

class StateBuilder<T>()

Reconstructs nested state from flat server instances.

Maintains an instance registry indexed by _instance_type:id and rebuilds the nested structure by replacing foreign key IDs with object references. Reference changes propagate recursively to trigger React re-renders.

StateBuilder<T>.update(instances)

Process a batch of instance updates from the server.

Arguments:
  • instances – Array of flat instances with _instance_type

StateBuilder<T>.state

The current rebuilt nested state. Returns a new reference on each access to trigger React re-renders.

PersistentWebSocket

class PersistentWebSocket()

WebSocket wrapper with automatic reconnection and message routing.

Features exponential backoff retry on connection drops, and routes incoming messages to type-specific handlers.

PersistentWebSocket.connect()

Initiate WebSocket connection and send auth token.

PersistentWebSocket.send(data)

Send a string message through the WebSocket.

PersistentWebSocket.disconnect(reason?)

Close the connection. If reason is provided, prevents reconnection.

Cache System

RxDjango uses a two-layer cache for performance:

MongoDB (Persistent State)

Stores flattened instance data with optimistic locking.

  • One collection per channel (named <channel_name>)

  • Documents contain serialized instance data plus metadata

  • _tstamp field for change tracking and incremental loading

  • Optimistic locking prevents concurrent write conflicts

  • Documents exceeding MongoDB’s 16MB limit are transparently stored in GridFS and referenced via a _grid_ref field

Redis (Coordination)

Handles transient state and coordination:

  • Cache state management (COLD / HEATING / HOT / COOLING)

  • Active WebSocket session tracking per anchor

  • last_disconnect timestamps for TTL-based expiry

  • Instance lists during HEATING and COOLING transitions

  • Pub/sub triggers for coordinating concurrent state loads

  • Timestamp generation for consistent ordering

Management Commands

expire_rxdjango_cache

Expire stale caches that have exceeded their configured TTL.

# Expire all stale caches
python manage.py expire_rxdjango_cache

# Preview what would be expired without making changes
python manage.py expire_rxdjango_cache --dry-run

This command is idempotent and safe to run concurrently (atomic Lua scripts prevent double transitions). Schedule it via cron or Celery beat:

# cron - run every 5 minutes
*/5 * * * * cd /path/to/project && python manage.py expire_rxdjango_cache
# Celery beat
CELERY_BEAT_SCHEDULE = {
    'expire-rx-caches': {
        'task': 'myapp.tasks.expire_rx_caches',
        'schedule': 300,
    },
}

broadcast_system_message

Broadcast a message to all connected WebSocket clients via the system channel.

python manage.py broadcast_system_message <source> <message>

The source parameter identifies the type of message (e.g. maintenance). The message parameter is the human-readable message text.

makefrontend

Generate TypeScript interfaces and channel classes from Django serializers and ContextChannel subclasses.

python manage.py makefrontend              # Generate all TS files
python manage.py makefrontend --dry-run    # Preview changes
python manage.py makefrontend --force      # Force rebuild all files

Configuration

Required Django Settings

REDIS_URL: str

Redis connection URL.

Example: redis://127.0.0.1:6379/0

MONGO_URL: str

MongoDB connection URL.

Example: mongodb://localhost:27017/

MONGO_STATE_DB: str

MongoDB database name for state cache.

Example: hot_state

RX_FRONTEND_DIR: str

Path where makefrontend writes generated TypeScript files.

RX_WEBSOCKET_URL: str

WebSocket URL for frontend connections.

Example: http://localhost:8000/ws

Optional Settings

RX_CACHE_TTL: int = 604800

Global cache TTL in seconds. When no active WebSocket sessions are connected to an anchor, the TTL countdown begins from the last disconnect time. After the TTL elapses, the expire_rxdjango_cache command will transition the cache from HOT to COLD.

Defaults to 604800 (1 week). Can be overridden per-channel via Meta.cache_ttl on the ContextChannel subclass.

RXDJANGO_SYSTEM_CHANNEL: str = '_rxdjango_system'

Name of the Django Channels group used for system-wide broadcasts (e.g. maintenance messages via broadcast_system_message).

TESTING: bool = False

When True (along with DEBUG), the state loader annotates initial state instances with a _cache_state field indicating whether the data came from COLD, HEATING, or HOT cache. Useful for debugging cache behavior during development.

Required Django Apps

INSTALLED_APPS = [
    'rxdjango',        # Must come before daphne
    'daphne',          # Must come before staticfiles
    'django.contrib.staticfiles',
    'channels',
]

ASGI_APPLICATION = 'your_project.asgi.application'