Database Architecture
Overview
Building a realtime document database presents unique challenges that traditional storage architectures struggle to address. When multiple users collaborate on documents simultaneously, the system must handle rapid changes while maintaining consistency and performance. This architecture emerges from the intersection of three critical requirements that often conflict in traditional systems.
First, we chose delta-based mutations over full document replacement. In collaborative environments, users make small, incremental changes - fixing a typo here, adding a paragraph there. Transmitting entire documents for each change wastes bandwidth and creates merge conflicts. Deltas capture the essence of each change, allowing the system to apply modifications surgically while preserving the work of other users.
Second, zero-copy operations became essential for performance. Traditional databases serialise data for storage and network transmission, creating overhead that compounds with each operation. By designing data structures that can be used directly from memory, we eliminate this serialisation tax. The same bytes that store a document can be transmitted over the network or accessed by the application without transformation.
Third, thread-safe concurrent access underlies everything. Modern servers have many CPU cores, and realtime collaboration demands we use them all. The architecture allows multiple threads to read documents simultaneously while writers make changes, all without the global locks that cripple traditional systems under load.
These principles work together to create something greater than their parts. Delta-based mutations naturally align with zero-copy operations because small changes can be stored and transmitted efficiently. Zero-copy operations enable safe concurrent access because immutable data can be shared between threads without synchronisation. And concurrent access makes delta processing practical because different documents can be updated in parallel.
The architecture specifically targets realtime collaboration scenarios where read operations vastly outnumber writes, network bandwidth is precious, and latency directly impacts user experience. Every design decision flows from these constraints, creating a system optimised for the unique demands of collaborative document editing.
This document details the Rust implementation of these concepts, showing how the language’s ownership model and type system enforce the safety guarantees that make zero-copy operations practical in a multithreaded environment.
Core Concepts
Zero-Copy Architecture
Zero-copy architecture represents a fundamental shift in how we think about data access. Rather than copying data between different representations for storage, processing, and network transmission, the architecture maintains data in a single format that serves all purposes. This concept becomes particularly powerful in Rust, where the type system can enforce the safety guarantees that make zero-copy practical.
Memory Safety Requirements
The challenge with zero-copy lies in maintaining memory safety whilst allowing multiple threads to access data concurrently. Traditional systems solve this through copying - each thread gets its own copy of the data, eliminating any possibility of interference. Our architecture takes a different approach, keeping data at fixed memory addresses and using Rust’s ownership system to ensure references remain valid.
This requires three key guarantees. First, data must remain at fixed memory addresses throughout its lifetime. Second, references must remain valid across thread boundaries, which Rust enforces through its Send and Sync traits. Third, no reallocation can occur whilst references exist, a guarantee we achieve through careful data structure design and pre-allocation strategies.
Multi-threading Challenges
The most insidious problem in zero-copy architectures occurs when mutable data types reallocate their internal storage. Consider this scenario: Thread A obtains a pointer to string data within a document. Meanwhile, Thread B appends to that string, causing it to exceed its capacity. The string reallocates to a larger buffer, moving the data to a new memory location. Thread A’s pointer now points to freed memory, leading to undefined behaviour or crashes.
This challenge extends beyond simple strings. Vectors grow when elements are added. HashMaps rehash when they reach capacity thresholds. Even seemingly innocent operations can trigger reallocations that invalidate pointers held by other threads. Traditional solutions either prohibit mutation entirely or require expensive synchronisation that defeats the performance benefits of zero-copy.
Performance Objectives
Our zero-copy architecture achieves its performance goals through three principles. Direct memory access eliminates the overhead of serialisation and deserialisation. By maintaining data in its final format, we can hand pointers directly to network layers or processing threads without transformation.
We minimise atomic operations in hot paths because even atomic increments can become bottlenecks under high concurrency. The architecture uses atomics judiciously, preferring immutable data structures that can be shared without synchronisation.
Finally, cache-friendly data layouts ensure the CPU can efficiently process our structures. By keeping related data together and aligning structures to cache line boundaries, we reduce cache misses that would otherwise dominate performance profiles.
Data Immutability
The system achieves safety through controlled immutability, a design philosophy that turns the challenge of concurrent access into an advantage. Rather than fighting to synchronise mutable state, we embrace immutability where it matters most, creating a system that is both safe and performant.
Append-Only Design
At the heart of our approach lies the append-only principle. Deltas, once created, never change. Streams only grow by adding new entries. Document modifications create new versions rather than destroying old ones. This isn’t just a safety measure - it’s a performance optimisation.
When data never changes, it can be shared freely between threads without synchronisation. A delta created by one thread can be immediately visible to all others without locks or barriers. The network layer can transmit deltas whilst the storage layer processes them, all operating on the same immutable bytes in memory.
This design particularly shines for audit trails and collaboration features. Every change is preserved, creating a natural history that applications can traverse. Conflict resolution becomes possible because we never lose information - competing changes exist as separate deltas that can be reconciled.
Version Management
Version management transforms the challenge of mutation into a structured process. When a document property changes, we don’t modify the existing data. Instead, we create a new version and update a pointer. Readers continue using their existing references to old versions whilst new readers see the latest data.
This approach requires careful consideration of retention policies. Keeping every version forever would exhaust memory, but aggressive cleanup could invalidate active references. The architecture supports configurable retention, allowing applications to balance memory usage against their specific needs. Some might keep all versions for complete history, whilst others retain only recent versions for active collaboration.
The versioning system also enables powerful features like snapshots and rollback. Applications can capture a document’s state at a point in time simply by recording version numbers. Rolling back becomes a matter of updating pointers rather than complex undo operations.
Memory Stability Guarantees
Achieving stable memory addresses requires fighting against the natural tendencies of dynamic data structures. Our architecture employs three strategies to guarantee stability.
Fixed-size allocations form the foundation. By allocating memory in large chunks that never resize, we ensure addresses remain stable. A 64,000-slot delta chunk might seem wasteful when only partially filled, but this waste purchases the stability that makes zero-copy possible.
Pre-allocated chunks extend this principle. Rather than growing storage on demand, we allocate chunks ahead of time. This front-loads the allocation cost but eliminates allocation from hot paths. When a thread needs to store a delta, it finds a pre-allocated slot rather than triggering allocation.
Reference counting through Rust’s Arc (Atomically Reference Counted) type prevents premature deallocation. Even if the storage system decides to clean up old versions, the memory remains valid as long as any thread holds a reference. This automatic memory management eliminates entire classes of use-after-free bugs that plague similar systems in other languages.
Storage Components
Document Storage
The storage layer centres on the DocumentStorage trait, establishing a clear contract that any storage implementation must fulfil. This trait-based design allows for future storage backends whilst maintaining a consistent interface.
trait DocumentStorage {
fn get_document(&self, user_id: UserId, doc_id: DocumentId) -> Option<Document>;
fn apply_delta(&self, user_id: UserId, doc_id: DocumentId, delta: Delta) -> Result<()>;
// Additional methods...
}MemStore Implementation
MemStore serves as our primary in-memory implementation, designed for maximum concurrency. The architecture separates users at the storage level, ensuring complete isolation between different users’ data. This isn’t just a security measure - it eliminates an entire class of concurrency bottlenecks by ensuring operations for different users never contend for the same locks.
struct MemStore {
// User isolation: each user has their own document space
user_stores: DashMap<UserId, UserDocumentSpace>,
}
struct UserDocumentSpace {
// Root documents owned by this user
documents: DashMap<DocumentId, Document>,
// Immutable delta storage
delta_pool: DeltaPool,
}The choice of DashMap over standard HashMap proves crucial. DashMap provides lock-free reads and fine-grained locking for writes, allowing multiple threads to access different users’ data simultaneously without contention. Within each user’s space, documents and deltas are stored separately, reflecting their different access patterns and immutability guarantees.
User Isolation
User isolation goes beyond simple security. By separating data at the storage level, we achieve natural sharding that scales with user count. Each user’s operations remain independent, preventing one user’s heavy workload from impacting others. This design also simplifies security auditing - cross-user references become impossible by construction rather than policy.
Document Structure
Documents combine a fixed-size header with variable-size data, a separation that optimises both access patterns and memory layout. The header contains frequently accessed metadata that fits in a single cache line, whilst the variable data holds the actual document content.
struct Document {
header: DocumentHeader,
data: Value, // Root value, typically Object
}
struct DocumentHeader {
id: DocumentId,
version: AtomicU64,
created_at: u64,
modified_at: AtomicU64,
doc_type: DocumentType,
}The atomic version counter enables lock-free version checks, crucial for our optimistic concurrency control. By making versions atomic, readers can check if a document has changed without acquiring locks, maintaining our zero-copy read path.
Value Type System
The Value enum represents our solution to the versioning challenge. Each type receives treatment optimised for its mutation patterns, balancing performance with memory efficiency.
use im::{HashMap as ImHashMap, HashSet as ImHashSet};
enum Value {
// Primitives - no versioning needed
Null,
Bool(bool),
Int(i64),
Float(f64),
// Versioned types - replaced entirely on update
String(Versioned<String>),
Binary(Versioned<Vec<u8>>),
Array(Versioned<Vec<Value>>),
// Structural sharing - efficient incremental updates
Object(Versioned<ImHashMap<String, Value>>),
Set(Versioned<ImHashSet<Value>>),
// References
Reference(DocumentId),
// Streams - owned append-only data (no versioning needed)
BinaryStream(Box<StreamEntryPool>),
TextStream(Box<StreamEntryPool>),
DocumentStream(Box<StreamEntryPool>),
}Primitives need no versioning because they’re immutable and Copy types. Strings, binaries, and arrays use simple versioning because they’re typically replaced entirely. Objects and sets use persistent data structures from the im crate for structural sharing. When adding a property to an object with 1000 existing properties, we share 999 of them rather than copying everything. This O(log n) update cost makes frequent property updates practical.
Streams are unique - they’re owned directly by the document but require no versioning due to their append-only nature. The Box wrapper ensures the Value enum remains small (8 bytes per variant) whilst the StreamEntryPool can grow unbounded. This design keeps streams close to their owning document for better cache locality whilst maintaining all zero-copy properties.
Document Audit Trail
Every document maintains a complete audit trail through a dedicated DocumentStream. This stream captures references to every delta applied to the document, providing a complete history for debugging, compliance, and collaboration features.
impl Document {
fn new(doc_type: DocumentType) -> Self {
let mut doc = Document {
header: DocumentHeader::new(doc_type),
data: Value::Object(Versioned::new(ImHashMap::new())),
};
// Every document has a delta history stream
doc.create_stream("__delta_history__".to_string(), StreamType::Document);
doc
}
fn record_delta(&self, delta_ref: DeltaRef) {
if let Some(Value::DocumentStream(ref pool)) = self.get_property("__delta_history__") {
// Store just the coordinate reference (8 bytes)
let ref_bytes = delta_ref.to_bytes();
pool.append_entry(ref_bytes);
}
}
}This design provides several benefits. The audit trail consumes minimal space - just 8 bytes per delta reference plus the stream entry overhead. The trail is automatically ordered by timestamp through the stream’s natural ordering. Applications can replay a document’s complete history by fetching deltas from the pool using these references. And because it’s just another stream, it inherits all the zero-copy and concurrency properties of the stream system.
Delta Storage
Delta storage exemplifies our two-stage architecture: hot path processing through SPSC queues, followed by long-term storage in the delta pool. This separation optimises for the different access patterns of active processing versus historical lookup.
const SLOTS_PER_CHUNK: usize = 64_000;
struct DeltaStorage {
// Stage 1: Active processing queues (hot path)
processing_queues: DashMap<DocumentId, Arc<SpscQueue<Arc<[u8]>>>>,
// Stage 2: Long-term storage with embedded state (cold path)
delta_pool: DeltaPool,
}
struct DeltaPool {
// Pre-allocated chunks - no locking needed for access
chunks: Vec<Box<DeltaChunk>>,
}
struct DeltaChunk {
// Each slot stores complete delta with mutable state
slots: Vec<UnsafeCell<Option<Arc<StoredDelta>>>>,
used: AtomicUsize,
}
// Cache-line aligned to prevent false sharing
#[repr(C, align(64))]
struct StoredDelta {
// Immutable wire format data: [ServerExtension] + [Client Packet]
wire_data: Arc<[u8]>, // 8 bytes
// Mutable state - atomics for lock-free updates
status: AtomicU8, // 1 byte
validation_result: AtomicU8, // 1 byte
retry_count: AtomicU8,// 1 byte
_pad1: [u8; 5], // 5 bytes padding
applied_at: AtomicU64,// 8 bytes
processing_time_us: AtomicU32, // 4 bytes
_pad2: [u8; 28], // Pad to 64 bytes total
}
// Server extension prepended to client packets (first 16 bytes of wire_data)
#[repr(C)]
struct ServerExtension {
delta_id: ID8, // 8 bytes - unique identifier
sequence: u64, // 8 bytes - global ordering
}
struct DeltaRef {
chunk_index: u32,
slot_index: u16, // 16 bits sufficient for 64k slots
}The StoredDelta structure combines immutable wire data with mutable processing state in a cache-friendly layout. The wire_data field contains both the server-generated extension and the complete original client packet:
// Memory layout of wire_data Arc<[u8]>:
// [ServerExtension(16 bytes)] [ClientHeader(32)] [Security(32)] [Operations(...)]
// ^-- Original client packet starts hereBy storing these together, a single memory fetch retrieves both the delta content and its current state. The 64-byte alignment ensures each StoredDelta occupies exactly one cache line, preventing false sharing when different threads update different deltas’ states.
When a delta needs to be propagated to clients, we can send the wire_data directly without any serialisation. The server extension provides delta_id and sequence for deduplication and ordering, while preserving the client’s original data unchanged.
Delta Wire Format
The binary layout of deltas differs between client-to-server and server-to-network transmission, optimised for their respective requirements:
CLIENT → SERVER
┌─────────────────────────────────────────────────┐
│ BATCH HEADER 8 bytes │
│ - batch_count: u32 │
│ - total_size: u32 │
├─────────────────────────────────────────────────┤
│ DELTA 1 │
│ ┌───────────────────────────────────────┐ │
│ │ CLIENT HEADER 32 bytes │ │
│ │ - target_doc_id: ID16 (16) │ │
│ │ - timestamp: u64 (8) │ │
│ │ - delta_size: u32 (4) │ │
│ │ - op_count: u16 (2) │ │
│ │ - _padding: [u8; 2] │ │
│ ├───────────────────────────────────────┤ │
│ │ SECURITY LAYER 32 bytes │ │
│ │ - _reserved: [u8; 32] │ │
│ ├───────────────────────────────────────┤ │
│ │ OPERATIONS Variable │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ Operation 1 │ │ │
│ │ │ - op_type: u8 (1) │ │ │
│ │ │ - payload_len: u32 (4)│ │ │
│ │ │ - payload: [u8; N]│ │ │
│ │ ├─────────────────────────────────┤ │ │
│ │ │ Operation 2 │ │ │
│ │ │ - op_type: u8 (1) │ │ │
│ │ │ - payload_len: u32 (4)│ │ │
│ │ │ - payload: [u8; N]│ │ │
│ │ └─────────────────────────────────┘ │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
SERVER → NETWORK
┌─────────────────────────────────────────────────┐
│ BATCH HEADER8 bytes │
│ - batch_count: u32 │
│ - total_size: u32 │
├─────────────────────────────────────────────────┤
│ DELTA 1 │
│ ┌───────────────────────────────────────┐ │
│ │ SERVER HEADER 48 bytes │ │
│ │ - delta_id: ID8 (8) │ │
│ │ - target_doc_id: ID16 (16) │ │
│ │ - timestamp: u64 (8) │ │
│ │ - sequence: u64 (8) │ │
│ │ - delta_size: u32 (4) │ │
│ │ - op_count: u16 (2) │ │
│ │ - _padding: [u8; 2] │ │
│ ├───────────────────────────────────────┤ │
│ │ SECURITY LAYER 32 bytes │ │
│ ├───────────────────────────────────────┤ │
│ │ OPERATIONS Variable │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ Operation 1 │ │ │
│ │ │ - op_type: u8 (1) │ │ │
│ │ │ - payload_len: u32 (4) │ │ │
│ │ │ - payload: [u8; N] │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘Fixed Overhead Per Delta:
- Client → Server: 64 bytes + 5 bytes per operation
- Server → Network: 80 bytes + 5 bytes per operation
The server header adds delta_id and sequence fields for distributed ordering and deduplication. Both formats maintain alignment for efficient parsing whilst keeping overhead minimal. The batch header enables efficient transmission of multiple deltas, amortising network overhead across operations.
Delta Storage Layout
When storing deltas in memory, the server prepends its extension to the client packet, preserving the original client data unchanged:
DELTA STORAGE LAYOUT (In Memory)
┌─────────────────────────────────────────────────┐
│ SERVER EXTENSION 16 bytes │
│ - delta_id: ID8 (8) │
│ - sequence: u64 (8) │
├─────────────────────────────────────────────────┤
│ CLIENT HEADER32 bytes │ ← Original client
│ - target_doc_id: ID16 (16) │ packet starts
│ - timestamp: u64 (8) │ here (unchanged)
│ - delta_size: u32 (4)│
│ - op_count: u16 (2) │
│ - _padding: [u8; 2] │
├─────────────────────────────────────────────────┤
│ SECURITY LAYER 32 bytes │
│ - _reserved: [u8; 32] │
├─────────────────────────────────────────────────┤
│ OPERATIONSVariable │
│ ┌─────────────────────────────────┐│
│ │ Operation 1 ││
│ │ - op_type: u8 (1) ││
│ │ - payload_len: u32 (4)││
│ │ - payload: [u8; N]││
│ ├─────────────────────────────────┤│
│ │ Operation 2... ││
│ └─────────────────────────────────┘│
└─────────────────────────────────────────────────┘
MUTABLE STATE (Adjacent in StoredDelta)
┌─────────────────────────────────────────────────┐
│ PROCESSING STATE 8 bytes │
│ - status: AtomicU8 (1) │
│ - validation_result: AtomicU8 (1) │
│ - retry_count: AtomicU8 (1) │
│ - _padding: [u8; 5] │
└─────────────────────────────────────────────────┘
Total Fixed Overhead: 88 bytes (80 for headers + 8 for state)This layout achieves several goals:
- Server extension is prepended without modifying client data
- Original client packet remains byte-for-byte identical
- Mutable processing state stored adjacent for cache efficiency
- Single contiguous memory allocation optimal for zero-copy operations
Delta Processing Flow
The complete delta flow demonstrates how the two-stage architecture maintains both performance and durability:
impl DeltaProcessor {
// Stage 1: Network thread receives delta
fn receive_delta(&self, doc_id: DocumentId, wire_bytes: Vec<u8>) {
// Convert to Arc immediately - single allocation
let arc_bytes = Arc::from(wire_bytes.into_boxed_slice());
// Push to document's processing queue (lock-free)
if let Some(queue) = self.processing_queues.get(&doc_id) {
queue.push(arc_bytes);
}
}
// Stage 1.5: Prepare delta with server extension
fn prepare_stored_delta(&self, client_packet: Arc<[u8]>) -> Arc<[u8]> {
// Server extension structure
#[repr(C)]
struct ServerExtension {
delta_id: ID8,
sequence: u64,
}
let extension = ServerExtension {
delta_id: self.generate_delta_id(),
sequence: self.next_sequence(),
};
// Allocate buffer for server extension + client packet
let total_size = size_of::<ServerExtension>() + client_packet.len();
let mut buffer = Vec::with_capacity(total_size);
// Write server extension
unsafe {
let ext_bytes = slice::from_raw_parts(
&extension as *const _ as *const u8,
size_of::<ServerExtension>()
);
buffer.extend_from_slice(ext_bytes);
}
// Append entire client packet (unchanged)
buffer.extend_from_slice(&client_packet);
Arc::from(buffer.into_boxed_slice())
}
// Stage 2: Worker thread processes delta
fn process_delta(&self, doc_id: DocumentId) -> Option<DeltaRef> {
let queue = self.processing_queues.get(&doc_id)?;
let client_packet = queue.pop()?;
// Create wire_data with server extension prepended
let wire_data = self.prepare_stored_delta(client_packet);
// Apply operations (parse from offset 16 to skip server extension)
let client_data = &wire_data[16..];
let result = self.apply_operations(&doc_id, client_data);
// Create StoredDelta with mutable state
let stored_delta = Arc::new(StoredDelta {
wire_data: wire_data.clone(),
status: AtomicU8::new(DeltaStatus::Applied as u8),
validation_result: AtomicU8::new(result.to_u8()),
retry_count: AtomicU8::new(0),
_pad1: [0; 5],
applied_at: AtomicU64::new(current_timestamp()),
processing_time_us: AtomicU32::new(0),
_pad2: [0; 28],
});
let delta_ref = self.delta_pool.store(stored_delta);
// Record in document's audit trail
if let Some(doc) = self.get_document(&doc_id) {
doc.record_delta(delta_ref);
}
// Propagate to subscribers (zero-copy with server headers)
self.propagate_delta(&doc_id, &wire_data);
Some(delta_ref)
}
// Helper to access headers from wire_data
impl StoredDelta {
fn server_extension(&self) -> &ServerExtension {
unsafe {
&*(self.wire_data.as_ptr() as *const ServerExtension)
}
}
fn client_header(&self) -> &ClientHeader {
unsafe {
&*(self.wire_data.as_ptr().add(16) as *const ClientHeader)
}
}
fn operations(&self) -> &[u8] {
&self.wire_data[80..] // Skip ServerExt(16) + ClientHeader(32) + Security(32)
}
}
}
impl DeltaPool {
fn store(&self, delta: Arc<StoredDelta>) -> DeltaRef {
// Find chunk with space
for (chunk_idx, chunk) in self.chunks.iter().enumerate() {
let slot_idx = chunk.used.fetch_add(1, Ordering::AcqRel);
if slot_idx < SLOTS_PER_CHUNK {
// Store delta
unsafe {
*chunk.slots[slot_idx].get() = Some(delta);
}
return DeltaRef {
chunk_index: chunk_idx as u32,
slot_index: slot_idx as u16,
};
}
}
// All chunks full - would add new chunk here
panic!("Delta pool exhausted");
}
// Historical lookup - O(1) once you have the reference
fn get(&self, reference: &DeltaRef) -> Option<Arc<StoredDelta>> {
self.chunks
.get(reference.chunk_index as usize)?
.slots
.get(reference.slot_index as usize)?
.get()
.as_ref()
.and_then(|cell| unsafe { (*cell).clone() })
}
}This flow achieves several critical properties:
- The Arc<[u8]> created from network bytes flows through the entire system without copying
- Processing happens in the hot path (SPSC queues) without touching the storage pool
- Historical storage happens after processing, when latency isn’t critical
- The document’s audit trail automatically captures every applied delta
- Propagation to clients uses the same Arc<[u8]> - true zero-copy from receipt to transmission
Stream Storage
Streams provide append-only, immutable sequences of timestamped data. Unlike strings or arrays that are replaced entirely on update, streams grow incrementally through appends, making them ideal for audit logs, event sequences, and time-series data.
Streams are owned directly by documents as Values, eliminating the need for separate storage and ID-based lookups. This design provides better cache locality and simpler lifecycle management whilst maintaining all zero-copy properties.
const STREAM_SLOTS_PER_CHUNK: usize = 16_000;
enum StreamType {
Document,
Text,
Binary,
}
struct StreamEntryPool {
// Vector of fixed-size chunks
chunks: Mutex<Vec<Box<StreamChunk>>>, // Mutex only for chunk creation
stream_type: StreamType,
}
struct StreamChunk {
// Each slot wrapped individually for minimal unsafe surface
slots: Vec<UnsafeCell<Option<StreamEntry>>>,
// Track number of used slots
used: AtomicUsize,
}
impl StreamChunk {
fn new() -> Self {
let mut slots = Vec::with_capacity(STREAM_SLOTS_PER_CHUNK);
slots.resize_with(STREAM_SLOTS_PER_CHUNK, || UnsafeCell::new(None));
Self {
slots,
used: AtomicUsize::new(0),
}
}
fn is_full(&self) -> bool {
self.used.load(Ordering::Acquire) >= STREAM_SLOTS_PER_CHUNK
}
}
struct StreamEntry {
timestamp: u64,
data: Box<[u8]>, // Contains DocumentId, text, or binary data
}
struct StreamEntryRef {
chunk_index: u32,
slot_index: u16, // 16 bits sufficient for 16k slots
}Documents own streams directly through the Value enum:
impl Document {
fn create_stream(&mut self, property: String, stream_type: StreamType) {
let pool = Box::new(StreamEntryPool::new(stream_type));
let value = match stream_type {
StreamType::Binary => Value::BinaryStream(pool),
StreamType::Text => Value::TextStream(pool),
StreamType::Document => Value::DocumentStream(pool),
};
self.set_property(property, value);
}
fn append_to_stream(&self, property: &str, data: Vec<u8>) -> Option<StreamEntryRef> {
match self.get_property(property)? {
Value::BinaryStream(pool) |
Value::TextStream(pool) |
Value::DocumentStream(pool) => {
Some(pool.append_entry(data))
}
_ => None,
}
}
}The architecture maintains the same chunk-based design as deltas but with key differences. Each stream owns its chunks, ensuring entries remain sequential in memory. The smaller chunk size (16,000 vs 64,000 for deltas) reflects typical stream usage patterns - many streams with moderate entry counts rather than few streams with massive entries.
With 16k slots per chunk, each chunk pre-allocates approximately 384KB (16k × 24 bytes for Vec<UnsafeCell<Option
Stream operations achieve near lock-free appends through atomic operations and careful use of interior mutability:
use std::cell::UnsafeCell;
impl StreamEntryPool {
fn new(stream_type: StreamType) -> Self {
Self {
chunks: Mutex::new(Vec::new()),
stream_type,
}
}
fn append_entry(&self, data: Vec<u8>) -> StreamEntryRef {
let timestamp = current_timestamp();
let entry = StreamEntry {
timestamp,
data: data.into_boxed_slice(),
};
// Find or create chunk - the only synchronisation point
let (chunk_index, chunk) = self.get_or_create_chunk();
// Lock-free slot allocation using atomic fetch_add
let slot_index = chunk.used.fetch_add(1, Ordering::AcqRel);
if slot_index < STREAM_SLOTS_PER_CHUNK {
// Safe: each thread gets unique slot index
chunk.set_slot(slot_index, entry);
StreamEntryRef {
chunk_index: chunk_index as u32,
slot_index: slot_index as u16,
}
} else {
// Chunk filled while we were appending - retry with new chunk
self.append_entry(data)
}
}
fn get_or_create_chunk(&self) -> (usize, &StreamChunk) {
let mut chunks = self.chunks.lock().unwrap();
// Find non-full chunk
for (i, chunk) in chunks.iter().enumerate() {
if !chunk.is_full() {
// Extend lifetime beyond lock
let chunk_ptr = chunk.as_ref() as *const StreamChunk;
drop(chunks);
return (i, unsafe { &*chunk_ptr });
}
}
// Create new chunk
chunks.push(Box::new(StreamChunk::new()));
let index = chunks.len() - 1;
let chunk_ptr = chunks[index].as_ref() as *const StreamChunk;
drop(chunks);
(index, unsafe { &*chunk_ptr })
}
}
struct StreamChunk {
// Each slot wrapped individually for minimal unsafe surface
slots: Vec<UnsafeCell<Option<StreamEntry>>>,
used: AtomicUsize,
}
impl StreamChunk {
fn new() -> Self {
let mut slots = Vec::with_capacity(STREAM_SLOTS_PER_CHUNK);
slots.resize_with(STREAM_SLOTS_PER_CHUNK, || UnsafeCell::new(None));
Self {
slots,
used: AtomicUsize::new(0),
}
}
fn set_slot(&self, index: usize, entry: StreamEntry) {
unsafe {
// Safe because:
// 1. Each thread gets unique index from atomic fetch_add
// 2. No thread accesses another thread's slot during write
// 3. Once written, slots are immutable
*self.slots[index].get() = Some(entry);
}
}
fn get_slot(&self, index: usize) -> Option<&StreamEntry> {
unsafe {
// Safe: slots are immutable after write
(*self.slots[index].get()).as_ref()
}
}
}The design achieves concurrency without traditional locking through three mechanisms:
Interior Mutability: The
Vec<UnsafeCell<Option<StreamEntry>>>pattern precisely expresses our invariant - the Vec structure never changes (no reallocation), but individual slots have interior mutability for their one-time write. UsingUnsafeCellper slot rather than for the entire Vec minimises the unsafe surface area.Atomic Slot Allocation: The atomic
usedcounter ensures each thread gets a unique slot. No thread ever waits for another during append operations. The atomic fetch_add operation provides the synchronisation guarantee that makes the unsafe slot access sound.Reference Stability: Once written, entries never move. The coordinate system (chunk_index, slot_index) provides permanent addresses. When the chunks vector grows, existing Box
pointers remain valid.
The only synchronisation required is a brief mutex acquisition when creating new chunks - a rare operation compared to appends. This occurs when all 16,000 slots in the current chunk are exhausted. For typical workloads, this means lock acquisition once per tens of thousands of appends, achieving effectively lock-free performance for the common case.
Concurrency Model
Threading Strategy
Our threading strategy maximises concurrency whilst maintaining safety. Rather than using coarse-grained locks that serialise access, we employ fine-grained locking that allows operations on different data to proceed in parallel. The key insight: readers should never wait for other readers, and writers should only block what they’re modifying.
Read Concurrency
The architecture achieves near-perfect read scaling through RwLock’s shared read mode. When a thread needs to read a value, it acquires a read lock, clones the Arc pointer (incrementing its reference count), and immediately releases the lock. This operation takes nanoseconds, allowing thousands of concurrent readers without contention.
// Reading nested property - no blocking
fn read_property(&self, path: &[&str]) -> Option<Arc<dyn Any>> {
let mut current = &self.root;
for key in path {
match current {
Value::Object(versioned) => {
let map = versioned.read(); // Brief lock
current = map.get(*key)?;
}
_ => return None,
}
}
Some(current.as_arc())
}The beauty lies in what happens after the lock is released. The thread holds an Arc to immutable data, which it can process at leisure without blocking other operations. This design transforms the traditional tension between consistency and concurrency - by making data immutable, we get both.
Write Serialisation
Writers face different constraints. Only one thread can modify a value at a time, but our granular locking ensures this limitation applies only to the specific value being changed. A thread updating document A’s title doesn’t block another thread updating document B’s content, or even document A’s metadata. This property-level granularity extracts maximum parallelism from write workloads.
Versioning System
The versioning system forms the heart of our concurrency solution. By treating updates as new versions rather than modifications, we sidestep the fundamental conflict between readers and writers. The Versioned
struct Versioned<T> {
inner: Arc<RwLock<Vec<Arc<T>>>>,
}
impl<T> Versioned<T> {
fn new(initial: T) -> Self {
Self {
inner: Arc::new(RwLock::new(vec![Arc::new(initial)])),
}
}
fn read(&self) -> Arc<T> {
let versions = self.inner.read().unwrap();
versions.last().unwrap().clone() // Just Arc increment
}
fn write(&self, value: T) {
let mut versions = self.inner.write().unwrap();
versions.push(Arc::new(value));
}
}This elegantly simple structure enables powerful guarantees. Readers always see a consistent version, even if writes occur during their operation. Writers never corrupt active reads because they create new versions rather than modifying existing ones. The Arc wrapping ensures memory remains valid as long as any thread holds a reference.
Optimisation Strategies
Different data types benefit from different versioning strategies. Strings and binary data typically change wholesale - when you update a string, you replace it entirely. For these types, simple versioning suffices. Each update creates a new version containing the complete new value.
Objects and sets present a different challenge. Adding one property to an object shouldn’t require copying hundreds of existing properties. Here, we employ persistent data structures from the im crate. These structures use tree-based representations that share unchanged portions between versions. Adding a property to a 1000-property object shares 999 properties, copying only the modified path through the tree. This structural sharing transforms O(n) operations into O(log n), making frequent small updates practical.
Delta Processing
Delta processing showcases sophisticated concurrency through work-stealing algorithms combined with lock-free data structures. The challenge: maintain strict ordering for each document’s deltas whilst maximising CPU utilisation across multiple documents. Our solution elegantly combines work-stealing for load balancing with SPSC queues for lock-free processing.
Processing Architecture
The system maintains two levels of queuing. At the document level, work-stealing ensures all CPU cores stay busy. At the delta level, lock-free SPSC queues eliminate contention:
pub struct DeltaProcessor<S: DocumentStorage> {
// Document-level work queue for stealing
work_queue: Arc<Injector<ID16>>,
// Per-document lock-free queues
delta_queues: Arc<DashMap<ID16, Arc<SpscQueue<DeltaPacket>>>>,
// Worker thread pool
workers: Vec<thread::JoinHandle<()>>,
}
struct DeltaPacket {
delta_id: ID8,
wire_data: Arc<[u8]>, // Zero-copy from network
received_at: u64,
}The architecture achieves zero-copy through careful data flow. Network threads receive deltas as bytes, immediately convert them to Arc references, then push these into per-document queues. Worker threads pop from these queues and process deltas without any copying or locking.
Lock-Free Delta Flow
The journey of a delta from network to storage demonstrates the lock-free design:
Network Thread Document Queue Worker Thread
│ │ │
Receive bytes │ │
│ │ │
Arc::from(bytes) ─── push() ───>│ │
│<─── pop() ────────────────┤
│ │
│ Process delta
│ │
│ Store in poolEach arrow represents a lock-free operation. The SPSC queue’s design ensures the producer (network thread) and consumer (worker thread) never contend for the same cache line, achieving true parallel operation.
Work-Stealing Algorithm
Work-stealing occurs at the document level, never at the delta level. This crucial distinction maintains our ordering guarantee whilst enabling load balancing:
fn worker_loop(&self) {
loop {
// Try to steal a document with pending deltas
if let Some(doc_id) = steal_document(&self.work_queue) {
if let Some(queue) = self.delta_queues.get(&doc_id) {
// Process ALL deltas for this document sequentially
while let Some(packet) = queue.pop() {
self.apply_delta(doc_id, packet);
}
// Re-queue if more deltas arrived during processing
if !queue.is_empty() {
self.work_queue.push(doc_id);
}
}
}
}
}This design achieves near-optimal CPU utilisation. Threads never idle whilst work exists, yet document integrity is never compromised. The work-stealing algorithm adapts to varying workloads automatically - if one document receives many deltas whilst others receive few, multiple threads naturally converge on the busy document once they complete their other work.
Processing Guarantees
The processing flow maintains our critical guarantees without locks:
- Ordering: Each document’s deltas apply in sequence because only one thread processes that document
- Atomicity: Delta operations apply completely or not at all
- Visibility: Applied deltas become visible to readers immediately through versioning
- Performance: Zero contention between different documents’ processing
The SPSC queues provide back-pressure naturally. If a document receives deltas faster than they can be processed, the queue fills up, providing flow control without explicit coordination. This design enables real-time collaboration where users see changes as they happen without experiencing pauses or inconsistencies.
Implementation Details
Memory Management
Memory management in a zero-copy architecture requires careful planning. Traditional systems can afford to be casual about allocation because they copy data anyway. Our system must be deliberate, pre-allocating memory to ensure stability whilst managing growth efficiently.
Pre-allocation Strategy
Pre-allocation transforms unpredictable allocation costs into predictable startup costs. The system employs different strategies for different data types based on their access patterns:
Delta chunks allocate 64,000 slots upfront, using Vec<UnsafeCell<Option<Arc<StoredDelta>>>> with atomic slot allocation. Each chunk stores Arc references to StoredDelta structures, which combine the wire format data with mutable processing state. The chunk never reallocates, ensuring pointer stability.
Stream chunks use smaller 16,000 slot allocations with UnsafeCell wrapping for interior mutability. SPSC queues pre-allocate their ring buffers - typically 10,000 entries per document queue. This seems excessive for lightly-used documents but prevents allocation during burst traffic, maintaining predictable latency.
Arc-based Memory Sharing
The shift to Arc-based memory management fundamentally changes the system’s memory profile. When a delta arrives from the network, it’s converted once to Arc<[u8]>. This single allocation then flows through the system:
// Network receives 1KB delta
Vec<u8> → Arc<[u8]> // One allocation
// Arc flows through system (no copying)
Network thread → SPSC queue → Worker thread → Delta pool → Network propagationReference counting ensures memory lives exactly as long as needed. When the last network connection finishes sending a delta, the Arc count drops to zero and memory is freed. This happens deterministically without garbage collection pauses.
Memory Usage Patterns
The system exhibits predictable memory usage patterns that aid capacity planning. Each user’s base overhead includes one DashMap entry plus pre-allocated structures. Per-document overhead includes the document header, property storage, and an SPSC queue if deltas are active. The queue memory can be reclaimed when documents become inactive.
Delta memory follows a sawtooth pattern - growing as deltas arrive, dropping when old versions are garbage collected. The Arc sharing means multiple connections sending the same delta share the memory rather than duplicating it. This becomes significant during broadcast scenarios where one delta might be sent to thousands of clients.
Wire Format and Serialisation
The architecture maintains two representations of documents: an in-memory format optimised for concurrent access and a wire format optimised for network transmission. This dual representation allows each format to excel at its purpose without compromise.
The Serialisation Challenge
The in-memory format scatters data across memory for optimal concurrent access:
- Versioned wrappers with Arc<RwLock<>> for thread safety
- Persistent data structures (ImHashMap) with nodes across the heap
- Stream entries distributed across multiple chunks
Network transmission requires contiguous data. We cannot simply send pointers to scattered memory - we must serialise into a flat, self-describing format.
Wire Format Structure
impl Document {
/// Serialise document to wire format - flattens all versioned data
fn to_wire_format(&self) -> Arc<Vec<u8>> {
// Check cache first
{
let cache = self.wire_cache.read().unwrap();
if let Some(ref cached) = *cache {
return Arc::clone(cached);
}
}
// Cache miss - serialise
let mut buffer = Vec::new();
// Write header (fixed size)
buffer.extend_from_slice(&self.header.to_bytes());
// Recursively flatten value tree
self.data.write_to_buffer(&mut buffer);
let wire_data = Arc::new(buffer);
// Update cache
{
let mut cache = self.wire_cache.write().unwrap();
*cache = Some(Arc::clone(&wire_data));
}
wire_data
}
fn invalidate_cache(&self) {
let mut cache = self.wire_cache.write().unwrap();
*cache = None;
}
}
impl Value {
fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
match self {
Value::String(versioned) => {
buffer.push(ValueType::String as u8);
let current = versioned.read();
buffer.extend_from_slice(&(current.len() as u32).to_le_bytes());
buffer.extend_from_slice(current.as_bytes());
}
Value::Object(versioned) => {
buffer.push(ValueType::Object as u8);
let current = versioned.read();
buffer.extend_from_slice(&(current.len() as u32).to_le_bytes());
// Flatten the persistent data structure
for (key, value) in current.iter() {
buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
buffer.extend_from_slice(key.as_bytes());
value.write_to_buffer(buffer);
}
}
Value::BinaryStream(stream_id) => {
// Streams are sent by reference only
buffer.push(ValueType::BinaryStream as u8);
buffer.extend_from_slice(stream_id.as_bytes());
}
// ... other types
}
}
}Wire Format Cache
Documents cache their wire format to avoid repeated serialisation costs. This cache leverages the read-heavy nature of collaborative systems:
struct Document {
header: DocumentHeader,
data: Value,
// Cache survives until next write
wire_cache: RwLock<Option<Arc<Vec<u8>>>>,
}The cache invalidates on any write operation, ensuring consistency. Since writes already acquire locks for versioning, cache invalidation adds no additional synchronisation cost. The Arc wrapping allows multiple network sends to share the same wire format bytes without copying.
Zero-Copy Reception
Whilst serialisation is unavoidable when sending, reception achieves zero-copy through reference types:
/// Zero-copy view into wire format document
pub struct DocumentRef<'a> {
header: DocumentHeader, // Header is small, so we copy it
data: &'a [u8], // Zero-copy reference to value data
}
/// Zero-copy reference to value in wire format
pub enum ValueRef<'a> {
Null,
Bool(bool),
Int(i64),
String(&'a str), // Points directly into buffer
Binary(&'a [u8]), // No copying
Object(ObjectRef<'a>), // Lazy parsing
BinaryStream(ID16), // Just the reference
// ... other types
}
impl<'a> DocumentRef<'a> {
/// Parse document from wire format without copying
pub fn from_bytes(bytes: &'a [u8]) -> Result<Self, ParseError> {
let header = DocumentHeader::from_bytes(&bytes[..HEADER_SIZE])?;
Ok(DocumentRef {
header,
data: &bytes[HEADER_SIZE..],
})
}
/// Access properties without full deserialisation
pub fn get_property(&self, path: &[&str]) -> Option<ValueRef<'a>> {
// Navigate through wire format without copying
let mut current = ValueRef::from_bytes(self.data).ok()?.0;
for key in path {
match current {
ValueRef::Object(obj_ref) => {
current = obj_ref.get_property(key)?;
}
_ => return None,
}
}
Some(current)
}
}This design ensures network operations never block document access. Serialisation happens once per write, cached for subsequent reads. Reception requires no deserialisation for common operations like property access. The wire format becomes an efficient intermediate representation that bridges the gap between our optimised in-memory structures and network requirements.
Garbage Collection (Planned)
Whilst reference counting handles immediate cleanup, long-running systems need strategies for managing version history. The planned garbage collection system will support multiple policies. Time-based collection might retain versions for a fixed duration. Count-based collection might keep the last N versions. Application-specific policies could preserve versions based on business rules.
The key insight: garbage collection never invalidates active references. Even if the collector removes a version from the version history, any thread holding an Arc to that version can continue using it. The memory frees only when the last reference drops, maintaining our safety guarantees.
Performance Characteristics
Understanding performance characteristics helps developers make informed decisions. Our architecture trades memory for speed and complexity for predictability, choices that pay dividends in realtime collaborative scenarios.
Time Complexity
Operation costs reflect our design priorities. Document lookups achieve O(1) average case through DashMap’s concurrent hash table. Property reads cost O(1) after first access because we cache property locations. Delta appends amortise to O(1) by pre-allocating chunks. The surprising winner: object property updates at O(log n) thanks to structural sharing. Traditional systems copy entire objects (O(n)), but our persistent data structures share unchanged portions.
String and binary updates create new versions in O(1) time. We don’t copy the old version or modify it - we simply add a new version to the history. This constant-time operation enables rapid updates without degrading as documents grow.
Space Complexity
Space usage reflects our pre-allocation strategy. Each delta chunk consumes approximately 1.5MB for the slots array itself (64k slots × 24 bytes for UnsafeCell wrapper). Stream chunks use approximately 384KB. These fixed costs seem high for sparse usage but vanish in typical workloads where chunks fill reasonably.
Version storage adds approximately 24 bytes per version for the Arc and vector entry. Structural sharing dramatically reduces overhead for objects - adding one property to a 1000-property object might use only 1KB for the new tree nodes, not megabytes for a full copy.
The StoredDelta structure adds 64 bytes per delta for the struct itself, plus the Arc<[u8]> allocation which includes:
- 16 bytes for server extension (delta_id, sequence)
- Original client packet size (typically 64+ bytes)
- Total per-delta overhead: ~144 bytes minimum (64 for StoredDelta + 80 for headers)
This co-location improves cache efficiency during delta processing, as accessing the mutable state often requires checking the wire data headers.
Trade-offs
Every architecture makes trade-offs. Ours optimises for read performance and predictability at the cost of memory efficiency. Pre-allocation wastes memory but eliminates allocation from hot paths. Version history consumes memory but enables lock-free reads. Structural sharing adds complexity but makes frequent updates practical.
These trade-offs align with our use case. Collaborative editing generates many reads per write. Users expect instant response times. Network distribution demands zero-copy efficiency. By accepting higher memory usage, we achieve the performance characteristics that make realtime collaboration feel effortless.
Memory Layout Optimisations
Cache-line awareness drives our memory layout decisions. Modern CPUs fetch memory in 64-byte cache lines. When data structures span multiple cache lines, every access potentially triggers multiple memory fetches. Our structures align to cache boundaries, ensuring single-fetch access patterns.
Cache-Line Alignment
The StoredDelta structure exemplifies our cache-conscious design:
#[repr(C, align(64))]
struct StoredDelta {
// First cache line - frequently accessed together
wire_data: Arc<[u8]>, // 8 bytes - contains server extension + client packet
status: AtomicU8, // 1 byte - processing state
validation_result: AtomicU8, // 1 byte - processing result
retry_count: AtomicU8,// 1 byte - failure handling
_pad1: [u8; 5], // 5 bytes - alignment padding
applied_at: AtomicU64, // 8 bytes - when processed
processing_time_us: AtomicU32, // 4 bytes - performance metrics
_pad2: [u8; 28], // 28 bytes - pad to exactly 64 bytes
}
// Server extension layout (stored at start of wire_data)
#[repr(C)]
struct ServerExtension {
delta_id: ID8, // 8 bytes - unique identifier
sequence: u64, // 8 bytes - global ordering
}
// Client header layout (follows server extension in wire_data)
#[repr(C)]
struct ClientHeader {
target_doc_id: ID16,// 16 bytes
timestamp: u64, // 8 bytes
delta_size: u32, // 4 bytes
op_count: u16, // 2 bytes
_padding: [u8; 2], // 2 bytes
}This layout ensures a single cache-line fetch retrieves all delta metadata. Processing threads checking status don’t trigger additional memory fetches for sequence numbers or timestamps. The padding seems wasteful but prevents false sharing - a critical optimisation for multi-threaded access.
False Sharing Prevention
False sharing occurs when different threads modify variables in the same cache line. Even though they access different variables, the CPU must synchronise the entire cache line between cores. Our design eliminates false sharing through:
- Alignment: Each StoredDelta starts on a cache-line boundary
- Padding: Unused bytes ensure each delta occupies exactly one cache line
- Grouping: Related fields that change together share cache lines
This attention to memory layout can improve performance by 10x under high concurrency. The cost - some wasted memory - is negligible compared to the performance gains.
Garbage Collection
Garbage collection in a lock-free architecture requires careful design. Traditional stop-the-world collectors would destroy our realtime guarantees. Instead, we implement an epoch-based collector that runs continuously without blocking active operations.
Collection Architecture
The garbage collector uses a producer-consumer pattern where writers produce cleanup tasks and a background thread consumes them. This separation ensures write operations never wait for cleanup, maintaining our performance guarantees.
const MAX_VERSION_LENGTH: usize = 5;
struct EpochCollector {
// Fixed-size queue prevents unbounded growth
cleanup_queue: Arc<ArrayQueue<VersionedRef>>,
// Thread handle for cleanup worker
worker: Option<thread::JoinHandle<()>>,
}
struct VersionedRef {
// Weak reference prevents reference cycles
versioned: Weak<RwLock<VersionList<T>>>,
}
impl<T> Versioned<T> {
fn write(&self, value: T) {
let mut versions = self.inner.write().unwrap();
versions.push(Arc::new(value));
// Queue cleanup only when exceeding threshold
if versions.len() > MAX_VERSION_LENGTH {
if let Some(ref collector) = self.collector {
collector.queue_cleanup(Arc::downgrade(&self.inner));
}
}
}
}The fixed-size ArrayQueue prevents memory exhaustion if cleanup falls behind. Using weak references ensures the collector doesn’t keep values alive artificially - if a document is deleted, its cleanup tasks vanish automatically.
Collection Strategy
The collector employs several strategies to minimise impact on active operations. First, it batches queue operations to reduce contention. Rather than processing items one at a time, it drains batches from the queue, amortising synchronisation costs.
fn background_cleanup_thread(collector: Arc<EpochCollector>) {
let mut batch = Vec::with_capacity(100);
loop {
// Batch drain from queue - single synchronisation point
for _ in 0..100 {
match collector.cleanup_queue.pop() {
Some(vref) => batch.push(vref),
None => break,
}
}
// Process batch without touching queue
for vref in batch.drain(..) {
if let Some(strong) = vref.versioned.upgrade() {
// Ultra-short timeout prevents blocking writers
if let Ok(mut versions) = strong.try_write_for(Duration::from_micros(10)) {
cleanup_old_versions(&mut versions);
}
// Don't re-queue failures - writers will queue again if needed
}
}
// Adaptive sleep based on workload
let sleep_ms = if collector.cleanup_queue.is_empty() { 100 } else { 10 };
thread::sleep(Duration::from_millis(sleep_ms));
}
}The 10-microsecond timeout seems aggressive but reflects our lock hold times. Writers typically hold locks for hundreds of nanoseconds, so a 10-microsecond wait almost always succeeds without blocking anyone.
Cleanup Algorithm
The cleanup algorithm removes only truly unreferenced versions whilst maintaining the invariant that readers always find valid data. It checks reference counts only for versions it might remove, avoiding unnecessary atomic operations.
fn cleanup_old_versions<T>(versions: &mut Vec<Arc<T>>) {
let excess = versions.len().saturating_sub(MAX_VERSION_LENGTH);
if excess == 0 { return; }
// Check only versions we're considering for removal
let mut remove_count = 0;
for i in 0..excess {
if Arc::strong_count(&versions[i]) == 1 {
// Only this vector holds a reference
remove_count += 1;
} else {
// Active references exist - can't remove this or later versions
break;
}
}
if remove_count > 0 {
versions.drain(0..remove_count);
}
}This algorithm ensures safety through two mechanisms. First, it only removes versions with no external references. Second, it preserves order - if version N has active references, versions N+1 onwards are preserved even if unreferenced, maintaining our sequential guarantee.
Performance Optimisations
Several optimisations reduce collection overhead. Thread-local batching reduces queue contention by collecting multiple cleanup requests before touching the shared queue:
thread_local! {
static CLEANUP_BATCH: RefCell<Vec<WeakRef>> = RefCell::new(Vec::with_capacity(32));
}
impl<T> Versioned<T> {
fn queue_cleanup(&self, weak_ref: WeakRef) {
CLEANUP_BATCH.with(|batch| {
let mut b = batch.borrow_mut();
b.push(weak_ref);
if b.len() >= 32 {
// Push entire batch atomically
if let Some(ref collector) = self.collector {
for item in b.drain(..) {
let _ = collector.cleanup_queue.push(item);
}
}
}
});
}
}The collector also uses memory ordering optimisations. ArrayQueue with bounded size allows more relaxed memory ordering than SegQueue, reducing synchronisation overhead on ARM processors where sequential consistency is expensive.
Configuration and Tuning
The MAX_VERSION_LENGTH constant provides the primary tuning parameter. Setting it to 5 balances memory usage against reference retention. Applications can adjust this based on their access patterns - higher values support more concurrent long-running operations but use more memory.
The system also supports disabling collection entirely for specific values:
impl<T> Versioned<T> {
fn new_with_collection(initial: T, collect: bool) -> Self {
Self {
inner: Arc::new(RwLock::new(vec![Arc::new(initial)])),
collector: if collect { Some(GLOBAL_COLLECTOR.clone()) } else { None },
}
}
}This flexibility allows applications to disable collection for frequently accessed values whilst enabling it for the long tail of occasionally accessed data.
Usage Examples
Basic Operations
// Initialize storage
let store = MemStore::new();
// Create document with versioned properties
let mut doc = Document::new(DocumentType::Generic);
doc.set_property("name", Value::String(Versioned::new("Alice".to_string())));
doc.set_property("metadata", Value::Object(Versioned::new(ImHashMap::new())));
// Store document
store.insert_document(user_id, doc_id, doc);Concurrent Access Patterns
// Multiple readers - no blocking
let doc = Arc::new(store.get_document(user_id, doc_id)?);
// Spawn multiple reader threads
for _ in 0..10 {
let doc_clone = Arc::clone(&doc);
thread::spawn(move || {
let name = doc_clone.read_property(&["name"]); // No blocking
});
}
// Writer thread - only blocks specific property
thread::spawn(move || {
doc.update_property(&["name"],
Value::String(Versioned::new("Alice Smith".to_string()))
);
});Delta Processing with Audit Trail
// Complete delta flow from network to storage to audit
async fn process_incoming_delta(
processor: &DeltaProcessor,
doc_id: DocumentId,
wire_bytes: Vec<u8>
) -> Result<DeltaRef> {
// Stage 1: Convert to Arc and queue for processing
let arc_bytes = Arc::from(wire_bytes.into_boxed_slice());
processor.receive_delta(doc_id, arc_bytes.clone());
// Stage 2: Worker processes delta (happens asynchronously)
// - Applies operations to document
// - Stores in delta pool with state
// - Records in document's audit trail
// - Propagates to subscribers
// Check processing status
let delta_ref = processor.await_processing(doc_id).await?;
Ok(delta_ref)
}
// Access document's complete delta history
fn get_document_history(
store: &MemStore,
doc_id: DocumentId
) -> Vec<DeltaRef> {
let doc = store.get_document(user_id, doc_id).unwrap();
if let Some(Value::DocumentStream(ref stream)) =
doc.get_property("__delta_history__") {
// Iterate through all delta references
stream.iter()
.map(|entry| DeltaRef::from_bytes(&entry.data))
.collect()
} else {
Vec::new()
}
}
// Retrieve historical delta with its state
fn get_historical_delta(
pool: &DeltaPool,
delta_ref: &DeltaRef
) -> Option<Arc<StoredDelta>> {
// O(1) lookup using coordinates
pool.get(delta_ref)
}Stream Operations
// Create audit log stream on document
let mut doc = Document::new(DocumentType::UserProfile);
doc.create_stream("audit_log".to_string(), StreamType::Text);
// Append audit entries
fn log_action(doc: &Document, action: &str) {
let timestamp = current_timestamp();
let entry = format!("{}: {}", timestamp, action);
doc.append_to_stream("audit_log", entry.into_bytes());
}
// Read audit history
fn read_audit_log(doc: &Document) -> Vec<String> {
if let Some(Value::TextStream(ref pool)) = doc.get_property("audit_log") {
pool.iter()
.map(|entry| String::from_utf8_lossy(&entry.data).to_string())
.collect()
} else {
Vec::new()
}
}