3:18 AM. The alarm cuts through my sleep like a hot knife through butter. Another incident alert. I grab my phone, squinting at the bright screen: "CRITICAL: Event pipeline latency threshold exceeded - p99 > 5ms."
I pull myself out of bed and shuffle to my laptop. Coffee can wait; production can't.
Our real-time event processing system - the backbone of our financial analytics platform - is drowning under peak market load. I pull up the monitoring dashboard and watch the latency graphs paint a horror story. Event throughput has tanked from our usual 500k events/sec to a measly 10k events/sec. Worse, latency has spiked from sub-millisecond to an unacceptable 5ms.
Five milliseconds might not sound like much, but in financial services, it's an eternity. Trades get executed in microseconds. Arbitrage opportunities vanish in nanoseconds. When your analytics pipeline is 5ms behind, you're analyzing history, not helping traders.
I start digging through the flame graphs and thread dumps. The culprit reveals itself quickly: our three-stage event pipeline - parse, enrich, aggregate - is using LinkedBlockingQueue for inter-stage communication. Each queue is a bottleneck, each stage is contending for locks, and the garbage collector is working overtime to clean up the queue node allocations.
The individual stages are fast - microseconds per event. But each blocking queue adds 1-5 microseconds of latency, and under contention, that explodes to tens of microseconds. Multiply by three stages, add some GC pauses from all those queue node allocations, and you've got 5ms of pain.
I've seen this pattern before. I've fixed it before. It's time to bring out the big guns: the Disruptor pattern.
By 6 AM, I have a proof-of-concept running. By 7 AM, it's in production. Latency drops from 5ms to 100 nanoseconds per stage. Throughput jumps from 10k to 500k events/sec. The garbage collector barely breaks a sweat.
This is the story of that transformation - the architecture, the implementation, and the lessons learned.
A typical event processing pipeline looks like this:
Loading diagram...
Each stage has multiple worker threads processing events. Between stages, a LinkedBlockingQueue buffers events. Simple, well-understood, and it works great - until it doesn't.
Let's dissect what happens when you offer() to a LinkedBlockingQueue:
public class LinkedBlockingQueue<E> { // Simplified from actual JDK source private final ReentrantLock putLock = new ReentrantLock(); private final ReentrantLock takeLock = new ReentrantLock(); public boolean offer(E e) { final ReentrantLock putLock = this.putLock; putLock.lock(); // <- CONTENTION POINT 1 try { // Create new Node - ALLOCATION 1 Node<E> node = new Node<>(e); enqueue(node); // Potentially signal waiting consumers } finally { putLock.unlock(); } // Potentially signal consumer - CONTENTION POINT 2 signalNotEmpty(); return true; }}
Every offer() operation:
Acquires a lock - Under contention, this means context switches, thread parking, and scheduling overhead
Allocates a Node object - 24-32 bytes per event, depending on compressed OOPs
May signal waiting consumers - More lock contention and potential context switches
With four producer threads offering to the same queue at ~100k events/sec each, you're looking at 400k lock acquisitions/sec (each with potential context switch overhead), 400k object allocations/sec (roughly 10 MB/sec of garbage), and constant cache line bouncing as the lock state changes ownership continuously.
The Disruptor pattern, pioneered by LMAX Exchange for their foreign exchange trading platform, provides exactly what we need: a lock-free, allocation-free, cache-friendly mechanism for passing events between stages.
The key insight of the Disruptor is beautifully simple: instead of multiple queues between stages, use a single, pre-allocated ring buffer shared by all stages. Each stage processes events at its own pace, tracked by sequence numbers.
Loading diagram...
In this diagram, green (0-1) marks slots fully processed by all stages, yellow (2-3) shows slots processed by Stage 1 and currently being processed by Stage 2, orange (4-5) indicates slots processed by Stage 1 but waiting for Stage 2, and red (6-7) represents slots being published that are not yet available for Stage 1.
The publisher tracks the slowest consumer. If the slowest consumer is N slots behind, the publisher has N slots of buffer space. If it catches up to the slowest consumer, it must wait - natural backpressure without explicit flow control.
This is fundamentally different from unbounded queues that can grow until you OOM, or bounded blocking queues where producers block and cause unpredictable latency spikes.
Events are processed in sequence order. Stage 1 processes slot 0, then slot 1, then slot 2. Stage 2 follows the same pattern, perhaps a few slots behind. This sequential access pattern is perfect for CPU prefetchers - they can load the next cache line before you need it.
Contrast this with linked lists where each node can be anywhere in memory, causing cache misses on every traversal.
public class RingBuffer<E> { private final E[] entries; private final int bufferSize; private final int indexMask; // Padded sequence to prevent false sharing private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L); @SuppressWarnings("unchecked") public RingBuffer(EventFactory<E> factory, int bufferSize) { // Must be power of 2 for fast modulo via bitwise AND this.bufferSize = bufferSize; this.indexMask = bufferSize - 1; this.entries = (E[]) new Object[bufferSize]; // Pre-allocate all event objects for (int i = 0; i < bufferSize; i++) { entries[i] = factory.newInstance(); } } public E get(long sequence) { return entries[(int) (sequence & indexMask)]; } public long getCursor() { return cursor.get(); }}
The ring buffer owns all event objects. They're created at initialization and never garbage collected (as long as the ring buffer lives). The indexMask trick (sequence & (size - 1)) gives us fast modulo for power-of-2 sizes.
Each processing stage tracks its progress with a Sequence:
public class Sequence { // Padding to prevent false sharing private long p1, p2, p3, p4, p5, p6, p7; private volatile long value; private long p8, p9, p10, p11, p12, p13, p14; public Sequence(long initialValue) { this.value = initialValue; } public long get() { return value; } public void set(long value) { this.value = value; } public boolean compareAndSet(long expected, long update) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update); }}
The massive padding ensures each sequence occupies its own cache line. Without this, updating one sequence would invalidate others in other cores' caches - false sharing is the silent performance killer.
A SequenceBarrier lets a stage wait for upstream stages to make progress:
public class SequenceBarrier { private final Sequence cursorSequence; private final Sequence[] dependentSequences; private final WaitStrategy waitStrategy; public long waitFor(long sequence) throws InterruptedException { long availableSequence; // Wait until cursor reaches our target while ((availableSequence = cursorSequence.get()) < sequence) { waitStrategy.wait(sequence, cursorSequence); } // Now wait for all dependent sequences (upstream stages) if (dependentSequences.length > 0) { while ((availableSequence = getMinimumSequence(dependentSequences)) < sequence) { waitStrategy.wait(sequence, dependentSequences[0]); } } return availableSequence; } private long getMinimumSequence(Sequence[] sequences) { long minimum = Long.MAX_VALUE; for (Sequence sequence : sequences) { minimum = Math.min(minimum, sequence.get()); } return minimum; }}
Each stage runs one or more EventProcessor instances:
public abstract class EventProcessor implements Runnable { private final RingBuffer<?> ringBuffer; private final SequenceBarrier barrier; private final Sequence sequence = new Sequence(-1L); @Override public void run() { long nextSequence = sequence.get() + 1L; while (running) { try { // Wait for events to be available long availableSequence = barrier.waitFor(nextSequence); // Process all available events while (nextSequence <= availableSequence) { Event event = ringBuffer.get(nextSequence); onEvent(event, nextSequence); nextSequence++; } // Update our sequence (publish our progress) sequence.set(availableSequence); } catch (InterruptedException e) { // Handle shutdown } } } protected abstract void onEvent(Event event, long sequence);}
The wait strategy determines how processors wait for events when the ring buffer is empty:
public interface WaitStrategy { long waitFor(long sequence, Sequence cursorSequence) throws InterruptedException;}// Maximum performance, maximum CPU usagepublic class BusySpinWaitStrategy implements WaitStrategy { @Override public long waitFor(long sequence, Sequence cursor) { while (cursor.get() < sequence) { Thread.onSpinWait(); // CPU hint for spin-waiting } return cursor.get(); }}// Lower CPU usage, slightly higher latencypublic class YieldingWaitStrategy implements WaitStrategy { @Override public long waitFor(long sequence, Sequence cursor) { int counter = 100; while (cursor.get() < sequence) { if (counter > 0) { counter--; Thread.onSpinWait(); } else { Thread.yield(); } } return cursor.get(); }}// Lowest CPU usage, highest latencypublic class BlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursor) throws InterruptedException { if (cursor.get() < sequence) { lock.lock(); try { while (cursor.get() < sequence) { condition.await(); } } finally { lock.unlock(); } } return cursor.get(); }}
For ultra-low latency, use BusySpinWaitStrategy. Your CPU will run hot, but latency will be minimal. For background processing where some latency is acceptable, YieldingWaitStrategy or BlockingWaitStrategy save power.
Let's build a complete implementation from scratch. I'll show you both the naive blocking approach and the Disruptor-style approach so you can see the difference.
/** * Pre-allocated event object for the ring buffer. * Mutable to avoid allocation in the hot path. */public class PipelineEvent { // Event data - reset and reused for each event private long timestamp; private int eventType; private long sourceId; private byte[] payload; // Processing state - tracks which stages have processed this event private volatile int processedStages; // Pre-allocate payload buffer public PipelineEvent() { this.payload = new byte[1024]; // Max payload size } public void reset() { this.timestamp = 0; this.eventType = 0; this.sourceId = 0; this.processedStages = 0; // Note: we don't reallocate payload, just overwrite } // Getters and setters omitted for brevity public void setData(long timestamp, int eventType, long sourceId, byte[] data, int offset, int length) { this.timestamp = timestamp; this.eventType = eventType; this.sourceId = sourceId; System.arraycopy(data, offset, this.payload, 0, length); }}
/** * Disruptor-style event pipeline. * Single pre-allocated ring buffer shared by all stages. * Lock-free, allocation-free in the hot path. */public class DisruptorStylePipeline { // Ring buffer configuration private static final int BUFFER_SIZE = 1024; // Must be power of 2 private static final int INDEX_MASK = BUFFER_SIZE - 1; // Pre-allocated event buffer private final PipelineEvent[] ringBuffer; // Sequence tracking with cache line padding private final PaddedAtomicLong publisherSequence; private final PaddedAtomicLong stage1Sequence; private final PaddedAtomicLong stage2Sequence; private final PaddedAtomicLong stage3Sequence; // Per-slot availability flags for multi-producer support private final int[] availableFlags; // Worker threads private final Thread[] stage1Workers; private final Thread[] stage2Workers; private final Thread[] stage3Workers; private volatile boolean running = true; public DisruptorStylePipeline() { // Pre-allocate everything this.ringBuffer = new PipelineEvent[BUFFER_SIZE]; for (int i = 0; i < BUFFER_SIZE; i++) { this.ringBuffer[i] = new PipelineEvent(); } // Initialize sequences to -1 (nothing published yet) this.publisherSequence = new PaddedAtomicLong(-1L); this.stage1Sequence = new PaddedAtomicLong(-1L); this.stage2Sequence = new PaddedAtomicLong(-1L); this.stage3Sequence = new PaddedAtomicLong(-1L); // Available flags for multi-producer coordination this.availableFlags = new int[BUFFER_SIZE]; Arrays.fill(availableFlags, -1); // Create worker threads (but don't start yet) this.stage1Workers = new Thread[4]; this.stage2Workers = new Thread[4]; this.stage3Workers = new Thread[2]; } public void start() { // Stage 1: Wait on publisher, process, update stage1Sequence for (int i = 0; i < stage1Workers.length; i++) { final int workerId = i; stage1Workers[i] = new Thread(() -> runStage1Worker(workerId)); stage1Workers[i].setName("Stage1-Worker-" + i); stage1Workers[i].start(); } // Stage 2: Wait on stage1, process, update stage2Sequence for (int i = 0; i < stage2Workers.length; i++) { final int workerId = i; stage2Workers[i] = new Thread(() -> runStage2Worker(workerId)); stage2Workers[i].setName("Stage2-Worker-" + i); stage2Workers[i].start(); } // Stage 3: Wait on stage2, process, update stage3Sequence for (int i = 0; i < stage3Workers.length; i++) { final int workerId = i; stage3Workers[i] = new Thread(() -> runStage3Worker(workerId)); stage3Workers[i].setName("Stage3-Worker-" + i); stage3Workers[i].start(); } } /** * Publish an event to the pipeline. * Returns the sequence number for tracking. */ public long publish(long timestamp, int eventType, long sourceId, byte[] data, int offset, int length) { // Step 1: Claim a slot (atomic increment) long sequence = claimNext(); // Step 2: Wait for slot to be available (slowest consumer caught up) waitForSlotAvailable(sequence); // Step 3: Write event data to the pre-allocated slot int index = (int) (sequence & INDEX_MASK); PipelineEvent event = ringBuffer[index]; event.reset(); event.setData(timestamp, eventType, sourceId, data, offset, length); // Step 4: Publish (make visible to consumers) publish(sequence); return sequence; } private long claimNext() { // Multi-producer safe: atomic increment return PUBLISHER_SEQUENCE.getAndIncrement(this) + 1; } private void waitForSlotAvailable(long sequence) { // Wait until the slowest consumer has processed this slot // in the previous cycle long wrapPoint = sequence - BUFFER_SIZE; while (stage3Sequence.get() < wrapPoint) { Thread.onSpinWait(); // Busy spin for lowest latency } } private void publish(long sequence) { // Mark this slot as available for consumers int index = (int) (sequence & INDEX_MASK); AVAILABLE_FLAGS.setRelease(availableFlags, index, (int) (sequence >>> 32)); } private boolean isAvailable(long sequence) { int index = (int) (sequence & INDEX_MASK); int flag = (int) AVAILABLE_FLAGS.getAcquire(availableFlags, index); return flag == (int) (sequence >>> 32); } // Worker implementations...}
private void runStage1Worker(int workerId) { // Each worker tracks its own position long nextSequence = workerId; // Workers interleave slots int numWorkers = stage1Workers.length; while (running) { // Wait for this sequence to be published while (!isAvailable(nextSequence)) { if (!running) return; Thread.onSpinWait(); } // Process the event int index = (int) (nextSequence & INDEX_MASK); PipelineEvent event = ringBuffer[index]; processStage1(event); // Update stage1 sequence (coordinate with other stage1 workers) updateStage1Sequence(nextSequence); // Move to next slot for this worker nextSequence += numWorkers; }}private void processStage1(PipelineEvent event) { // Parse the event // In real code, this would decode the payload, validate fields, etc. event.markStage1Complete();}private void updateStage1Sequence(long completedSequence) { // For single-threaded stage, simple set is enough // For multi-threaded stage, we need coordination long expected; do { expected = stage1Sequence.get(); if (completedSequence <= expected) { return; // Already updated by another worker } } while (!stage1Sequence.compareAndSet(expected, completedSequence));}
Modern CPUs load memory in cache lines, typically 64 bytes. If two variables share a cache line and are accessed by different cores, every write to one invalidates the other in all cores' caches - even if they're logically independent.
Our sequence variables are classic false sharing candidates:
// BAD: All sequences on same cache lineprivate volatile long publisherSequence;private volatile long stage1Sequence;private volatile long stage2Sequence;private volatile long stage3Sequence;// These fit in one 64-byte cache line!
Every time the publisher updates publisherSequence, it invalidates stage1Sequence in Stage 1's cache. Stage 1 must re-fetch it from main memory - a 40-100ns penalty on every iteration.
Solution: Pad each sequence to its own cache line:
/** * Padded atomic long that occupies exactly one cache line. * Prevents false sharing between sequences. */public class PaddedAtomicLong { // 7 longs = 56 bytes of padding before value private long p1, p2, p3, p4, p5, p6, p7; private volatile long value; // 7 longs = 56 bytes of padding after value private long p8, p9, p10, p11, p12, p13, p14; public PaddedAtomicLong(long initialValue) { this.value = initialValue; } public long get() { return value; } public void set(long newValue) { this.value = newValue; } public long getAndIncrement() { return UNSAFE.getAndAddLong(this, VALUE_OFFSET, 1L); } public boolean compareAndSet(long expected, long update) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update); } // VarHandle or Unsafe setup for atomic operations private static final long VALUE_OFFSET; static { try { VALUE_OFFSET = UNSAFE.objectFieldOffset( PaddedAtomicLong.class.getDeclaredField("value") ); } catch (NoSuchFieldException e) { throw new ExceptionInInitializerError(e); } }}
Now each sequence has its own cache line. Updating publisherSequence doesn't affect stage1Sequence at all.
The ring buffer array itself benefits from sequential layout:
// Events are contiguous in memoryPipelineEvent[] ringBuffer = new PipelineEvent[BUFFER_SIZE];// Access pattern is sequentialevent = ringBuffer[0]; // Cache line loadedevent = ringBuffer[1]; // Same cache line! No fetch neededevent = ringBuffer[2]; // Same cache line!// ...event = ringBuffer[8]; // New cache line (8 references = 64 bytes)
The CPU prefetcher recognizes this sequential pattern and proactively loads upcoming cache lines. By the time you access ringBuffer[8], it's already in L1 cache.
Compare this to a linked list:
// Linked list nodes are scattered in memoryNode node0 = new Node(); // Address: 0x1000Node node1 = new Node(); // Address: 0x5000 (who knows?)Node node2 = new Node(); // Address: 0x2000 (random location)// Access pattern is randomnode = node0; // Cache missnode = node.next; // Cache miss (different cache line)node = node.next; // Cache miss (different cache line)
Every linked list traversal is a cache miss. No prefetcher can predict where the next node will be.
Inside each event object, we also consider layout:
public class PipelineEvent { // Hot fields first - accessed on every event private long timestamp; // 8 bytes private int eventType; // 4 bytes private long sourceId; // 8 bytes private int payloadLength; // 4 bytes // Total: 24 bytes - fits in first cache line with header // Cold fields - only accessed sometimes private volatile int processedStages; // 4 bytes // Large fields last - may span cache lines private byte[] payload; // Reference (8 bytes) + array elsewhere}
By placing frequently accessed fields first, we maximize the chance that a single cache line fetch gives us everything we need.
On multi-socket systems (common in servers), memory is not uniform - each CPU socket has "local" memory that's faster to access than "remote" memory on other sockets.
For maximum performance, pin your pipeline threads to cores on the same socket:
# Pin all pipeline threads to cores 0-7 (socket 0)taskset -c 0-7 java -jar pipeline.jar# Or use numactlnumactl --cpunodebind=0 --membind=0 java -jar pipeline.jar
And allocate your ring buffer on startup, before any work happens:
public class DisruptorStylePipeline { // Allocate ring buffer in constructor (before threads start) // This ensures the array is allocated on the main thread's local memory public DisruptorStylePipeline() { this.ringBuffer = new PipelineEvent[BUFFER_SIZE]; // Force allocation of all events now for (int i = 0; i < BUFFER_SIZE; i++) { this.ringBuffer[i] = new PipelineEvent(); } }}
public class BusySpinWaitStrategy implements WaitStrategy { @Override public long waitFor(long sequence, Sequence dependentSequence, Sequence cursorSequence) { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { Thread.onSpinWait(); } return availableSequence; }}
Characteristics:
Latency: ~10-50ns response time
CPU: 100% utilization while waiting
Power: Maximum consumption
Use when: Ultra-low latency is critical, dedicated cores available
The Thread.onSpinWait() hint (Java 9+) tells the CPU we're in a spin loop. On x86-64, this compiles to PAUSE, which:
Reduces power consumption slightly
Improves performance on hyper-threaded cores
Prevents memory ordering violations from speculative loads
public class MultiProducerSequencer { private final int bufferSize; private final int indexMask; // Cursor tracks the highest claimed sequence private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L); // Gating sequences track the slowest consumers private final Sequence[] gatingSequences; // Per-slot flags track which sequences are fully published private final int[] availableBuffer; public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) { this.bufferSize = bufferSize; this.indexMask = bufferSize - 1; this.gatingSequences = gatingSequences; this.availableBuffer = new int[bufferSize]; // Initialize available buffer for (int i = 0; i < bufferSize; i++) { availableBuffer[i] = -1; } } /** * Claim the next sequence for publishing. * Multiple threads may claim concurrently. */ public long next() { long current; long next; do { current = cursor.get(); next = current + 1; // Check if we'd wrap past the slowest consumer long wrapPoint = next - bufferSize; long cachedGatingSequence = getMinimumSequence(gatingSequences); if (wrapPoint > cachedGatingSequence) { // Must wait for consumers to catch up LockSupport.parkNanos(1); continue; } } while (!cursor.compareAndSet(current, next)); return next; } /** * Publish a sequence, making it visible to consumers. * Must be called after writing to the slot. */ public void publish(long sequence) { int index = (int) (sequence & indexMask); int flag = (int) (sequence >>> indexShift); AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag); } /** * Check if a sequence is available for consumption. */ public boolean isAvailable(long sequence) { int index = (int) (sequence & indexMask); int flag = (int) (sequence >>> indexShift); return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag; } /** * Get the highest available sequence for a consumer. * Handles gaps from incomplete publications. */ public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }}
Understanding memory ordering is critical for Disruptor correctness. Let me walk through the VarHandle operations we use and why they matter.
public class DisruptorSequencer { private static final VarHandle CURSOR; private static final VarHandle AVAILABLE_BUFFER; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); CURSOR = lookup.findVarHandle( DisruptorSequencer.class, "cursor", long.class ); AVAILABLE_BUFFER = MethodHandles.arrayElementVarHandle(int[].class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } private volatile long cursor = -1L; private final int[] availableBuffer; /** * Claim the next sequence using CAS with volatile semantics. * Full memory fence ensures all prior writes are visible. */ public long next() { long current; long next; do { current = (long) CURSOR.getVolatile(this); // Acquire semantics next = current + 1; } while (!CURSOR.compareAndSet(this, current, next)); // Full fence return next; } /** * Publish with release semantics. * All prior writes (to the event) happen-before this write. */ public void publish(long sequence) { int index = (int) (sequence & indexMask); int flag = calculateAvailabilityFlag(sequence); // Release ensures event data is visible before flag update AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag); } /** * Check availability with acquire semantics. * If we see the flag, we're guaranteed to see the event data. */ public boolean isAvailable(long sequence) { int index = (int) (sequence & indexMask); int flag = calculateAvailabilityFlag(sequence); // Acquire ensures we see event data if we see the flag return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag; }}
Memory Access Modes Explained:
Plain (getPlain/setPlain): No ordering guarantees. Fastest but dangerous for concurrent access. Only use when you don't care about visibility to other threads.
Opaque (getOpaque/setOpaque): Guarantees the read/write happens, but no ordering with other operations. Useful for progress flags that don't need synchronization.
Acquire (getAcquire): All reads and writes after this load cannot be reordered before it. Use for reading synchronization variables before reading protected data.
Release (setRelease): All reads and writes before this store cannot be reordered after it. Use for writing synchronization variables after writing protected data.
Volatile (getVolatile/setVolatile): Both acquire and release semantics plus total ordering with other volatile operations. Most expensive but easiest to reason about.
The Happens-Before Relationship:
Producer: Consumer: write event data | | | setRelease(flag) --> getAcquire(flag) | | read event data
The release-acquire pairing creates a happens-before relationship: if the consumer sees the flag update, it's guaranteed to see all writes that happened before the release (including the event data).
When publishing many events, batching improves throughput:
/** * Batch publish for higher throughput. * Claims multiple slots at once, reducing CAS operations. */public long[] nextBatch(int batchSize) { long current; long next; do { current = cursor.get(); next = current + batchSize; long wrapPoint = next - bufferSize; long cachedGatingSequence = getMinimumSequence(gatingSequences); if (wrapPoint > cachedGatingSequence) { LockSupport.parkNanos(1); continue; } } while (!cursor.compareAndSet(current, next)); // Return all claimed sequences long[] sequences = new long[batchSize]; for (int i = 0; i < batchSize; i++) { sequences[i] = current + 1 + i; } return sequences;}/** * Batch publish all sequences at once. */public void publishBatch(long from, long to) { for (long seq = from; seq <= to; seq++) { publish(seq); }}
The improvement is dramatic across all percentiles. The tail latency improvement (p99.9 at 72x) is particularly important - this is where blocking queues cause the most damage.
During a 5-minute sustained load test with 4 publishers:
Blocking Pipeline:
Young GC events: 234
Total GC pause time: 4,567ms
Average pause: 19.5ms
Max pause: 127ms
Allocation rate: 23.4 MB/sec
Disruptor Pipeline:
Young GC events: 3
Total GC pause time: 45ms
Average pause: 15ms
Max pause: 18ms
Allocation rate: 0.1 MB/sec
GC Impact Reduction: 98x fewer events, 100x less pause time
The near-zero allocation rate of the Disruptor pipeline means the garbage collector has almost nothing to do. Those three GC events were likely triggered by monitoring code, not the pipeline itself.
public class MonitoredPipeline extends DisruptorStylePipeline { private final LongAdder publishCount = new LongAdder(); private final LongAdder stage1Count = new LongAdder(); private final LongAdder stage2Count = new LongAdder(); private final LongAdder stage3Count = new LongAdder(); private final AtomicLong lastPublishLatency = new AtomicLong(); private final AtomicLong lastE2ELatency = new AtomicLong(); @Override public long publish(long timestamp, int eventType, long sourceId, byte[] data, int offset, int length) { long startNanos = System.nanoTime(); long sequence = super.publish(timestamp, eventType, sourceId, data, offset, length); lastPublishLatency.set(System.nanoTime() - startNanos); publishCount.increment(); return sequence; } // Export via JMX or metrics framework public long getPublishCount() { return publishCount.sum(); } public long getPublishLatencyNanos() { return lastPublishLatency.get(); } public long getBufferUtilization() { long head = publisherSequence.get(); long tail = stage3Sequence.get(); return head - tail; } public double getBufferUtilizationPercent() { return (double) getBufferUtilization() / BUFFER_SIZE * 100; }}
For ultra-low latency, pin threads to specific CPU cores:
public class AffinityPipeline extends DisruptorStylePipeline { @Override public void start() { // Pin publisher threads to cores 0-3 // Pin stage 1 workers to cores 4-7 // Pin stage 2 workers to cores 8-11 // Pin stage 3 workers to cores 12-13 for (int i = 0; i < stage1Workers.length; i++) { int core = 4 + i; stage1Workers[i] = new Thread(() -> { setThreadAffinity(core); runStage1Worker(i); }); stage1Workers[i].start(); } // Similarly for other stages... } private void setThreadAffinity(int core) { // Use JNA or JNI to call sched_setaffinity // Or use a library like Java-Thread-Affinity Affinity.setAffinity(1L << core); }}
# Linux: Disable CPU frequency scaling (use performance governor)echo performance | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor# Linux: Disable transparent huge pages (THP) if using explicit large pagesecho never | sudo tee /sys/kernel/mm/transparent_hugepage/enabled# Linux: Set real-time scheduling priority for pipeline threadssudo chrt -f 99 java -jar pipeline.jar# Linux: Isolate CPUs from scheduler for dedicated pipeline use# In /etc/default/grub: GRUB_CMDLINE_LINUX="isolcpus=4-15"# Linux: Pin IRQ handlers away from pipeline coresecho 0-3 > /proc/irq/*/smp_affinity_list
The JIT compiler needs time to optimize hot paths. Warm up your pipeline before handling production traffic:
public class WarmUpPipeline extends DisruptorStylePipeline { public void warmUp(int iterations) { System.out.println("Starting warm-up phase..."); byte[] warmUpData = new byte[100]; long startTime = System.currentTimeMillis(); // Run through enough iterations to trigger JIT compilation for (int i = 0; i < iterations; i++) { publish(System.nanoTime(), 1, i, warmUpData, 0, 100); } // Wait for all warm-up events to process waitForDrain(); long elapsed = System.currentTimeMillis() - startTime; System.out.println("Warm-up complete: " + iterations + " events in " + elapsed + "ms (" + (iterations * 1000 / elapsed) + " ops/sec)"); // Force full GC to clear warm-up garbage System.gc(); System.gc(); // Reset metrics resetCounters(); System.out.println("Pipeline ready for production traffic"); } private void waitForDrain() { while (stage3Sequence.get() < publisherSequence.get()) { Thread.onSpinWait(); } }}
Recommended warm-up: at least 100,000 iterations, or until you see consistent latency numbers in monitoring.
If you're processing fewer than 10,000 events/second, the complexity of the Disruptor pattern isn't justified. A simple BlockingQueue will work fine.
Variable-Size Events
The Disruptor works best with fixed-size events that can be pre-allocated. If your events vary significantly in size, you'll need complex memory management.
Unbounded Queues
The ring buffer has fixed capacity. If you truly need unbounded buffering, ConcurrentLinkedQueue or similar is simpler.
Teams Without Low-Level Experience
Disruptor-style code requires understanding of:
Memory ordering and barriers
Cache architecture and false sharing
Lock-free algorithm correctness
JVM memory model
If your team isn't comfortable with these concepts, the maintenance burden will outweigh the performance benefits.
That 3 AM incident taught me something fundamental about event processing systems: the transport between processing stages often matters more than the processing itself.
Our original pipeline spent more time coordinating between stages than actually processing events. Each LinkedBlockingQueue was a bottleneck of lock contention, memory allocation, and cache thrashing. The business logic was fast; the plumbing was slow.
The Disruptor pattern eliminates all three problems:
Lock contention eliminated - Atomic sequence numbers replace locks. No thread ever blocks waiting for another.
Allocation eliminated - Pre-allocated ring buffer and events mean zero garbage in the hot path. GC pauses drop from seconds to milliseconds.
Cache efficiency maximized - Sequential access patterns, padded sequences, and contiguous memory make the CPU prefetcher happy.
The results speak for themselves:
Metric
Before (Blocking)
After (Disruptor)
Improvement
Mean Latency
2,340ns
98ns
23.9x
p99.9 Latency
45,123ns
623ns
72.4x
Throughput
48k ops/s
523k ops/s
10.9x
GC Pauses
4,567ms/5min
45ms/5min
101x
But the pattern isn't magic, and it's not free. The complexity of lock-free code makes it harder to write, debug, and maintain. Fixed capacity ring buffers have size limits, so you must plan for backpressure. The expertise required spans memory ordering, cache architecture, and JVM internals. And it's not always necessary — for many workloads, simple blocking queues are perfectly adequate.
The key insight from this journey: measure first, then optimize. We knew we had a latency problem because we measured it. We knew blocking queues were the bottleneck because we profiled it. We knew the Disruptor pattern would help because we benchmarked it.
Performance optimization is about understanding where time goes and making informed trade-offs. The Disruptor pattern trades simplicity for speed. That trade-off made sense for our trading analytics pipeline. It might make sense for yours too - or it might not.
When you do reach for the Disruptor pattern, remember:
Pre-allocate everything at startup
Pad sequences to prevent false sharing
Choose your wait strategy based on latency vs. CPU trade-offs
Monitor buffer utilization and plan for backpressure
Test under realistic load before production
That 3 AM incident is now a distant memory. Our pipeline handles 10x the original load with latency consistently under 200 nanoseconds. The on-call rotation is much quieter these days.
Let me distill the most important lessons from this journey:
Architecture Decisions:
The transport layer between processing stages often dominates overall latency
Pre-allocation eliminates GC pressure in the hot path
A single shared buffer outperforms multiple independent queues
Sequential memory access patterns enable CPU prefetching
Implementation Details:
Use power-of-2 buffer sizes for fast index calculation via bitwise AND
Pad sequences to separate cache lines (128 bytes total per sequence)
Choose wait strategies based on your latency vs. CPU trade-off
Use VarHandle with proper memory ordering (release for publish, acquire for consume)
Production Operations:
Warm up the JIT compiler before handling production traffic
Monitor buffer utilization to detect backpressure early
Plan for graceful shutdown that drains in-flight events
Test under realistic load for extended periods
Team Considerations:
Lock-free code is harder to debug than locked code
Ensure your team understands memory ordering before deploying
Consider using the LMAX Disruptor library instead of rolling your own
Document your design decisions for future maintainers
The Disruptor pattern isn't magic, but it is a powerful tool when applied correctly. The key is understanding when the complexity is justified and implementing it with care.
The most common bug in Disruptor-style code is assuming operations happen in program order. Without proper memory barriers, the CPU and compiler can reorder operations, leading to subtle bugs.
// WRONG: No memory barrier between write and publishbuffer[index] = event;sequence.set(newSequence); // Consumer might see sequence before event!// CORRECT: Use release semanticsbuffer[index] = event;SEQUENCE.setRelease(sequence, newSequence); // Barrier ensures order
Debugging tip: If you see occasional corrupted data that you can't reproduce reliably, suspect memory ordering issues. Add explicit barriers and see if the problem disappears.
Even with padded sequences, false sharing can creep in through other means:
// DANGEROUS: Worker state on same cache linepublic class Worker { private long sequence; // One core writes private long processingCount; // Same core writes - fine private long errorCount; // Monitoring thread reads - FALSE SHARING!}
Any field read by a different thread should be on its own cache line or accept the performance penalty.
Using int for sequences leads to overflow after 2^31 operations:
// DANGEROUS: Int overflow after ~2 billion operationsprivate int sequence = 0; // Overflows in ~4 minutes at 10M ops/sec// SAFE: Long lasts effectively foreverprivate long sequence = 0; // Overflows in 29,000 years at 10M ops/sec
With multiple consumers processing different events, one slow consumer can starve others:
// Each consumer processes every Nth eventWorker 0: events 0, 4, 8, 12...Worker 1: events 1, 5, 9, 13...Worker 2: events 2, 6, 10, 14...Worker 3: events 3, 7, 11, 15...// If Worker 0 is slow, it blocks the ring buffer from advancing// Workers 1-3 finish their work but can't get more events
Solution: Implement work-stealing or use batched processing to rebalance load.
private final LongAdder casRetries = new LongAdder();private final LongAdder casSuccesses = new LongAdder();private final LongAdder waitCycles = new LongAdder();public long claim() { long seq; int retries = 0; while (!cursor.compareAndSet(seq = cursor.get(), seq + 1)) { retries++; } casRetries.add(retries); casSuccesses.increment(); return seq + 1;}
@Testvoid stressTest() { DisruptorStylePipeline pipeline = new DisruptorStylePipeline(); pipeline.start(); AtomicLong published = new AtomicLong(); AtomicLong processed = new AtomicLong(); // Spawn many producer threads for (int i = 0; i < 16; i++) { new Thread(() -> { byte[] data = new byte[100]; for (int j = 0; j < 1_000_000; j++) { pipeline.publish(System.nanoTime(), 1, j, data, 0, 100); published.incrementAndGet(); } }).start(); } // Wait and verify Thread.sleep(60_000); assertEquals(published.get(), processed.get(), "All published events should be processed");}
Run stress tests for extended periods (hours, not seconds) to catch rare race conditions.
LMAX Exchange, a multilateral trading facility, processes over 6 million transactions per second with average latency under 1 millisecond. Their architecture centers on the Disruptor pattern:
Single-threaded business logic eliminates synchronization
Ring buffer connects market data feed to matching engine
Events processed in strict sequence order
Zero garbage in the critical path
Key insight: They realized that single-threaded processing with a fast event bus outperformed multi-threaded processing with locks.
Application threads publish log events to a ring buffer. A dedicated logging thread drains the buffer and writes to disk. Application latency is minimized because logging never blocks.
// MPSC (Multi-Producer Single-Consumer) queueMpscArrayQueue<Event> mpscQueue = new MpscArrayQueue<>(1024);// SPSC (Single-Producer Single-Consumer) queue - fastestSpscArrayQueue<Event> spscQueue = new SpscArrayQueue<>(1024);// Usage is similar to standard queuesmpscQueue.offer(event); // Non-blockingEvent e = mpscQueue.poll(); // Non-blocking
JCTools queues are simpler than full Disruptor pipelines but still provide excellent performance for point-to-point messaging.
As I reflect on this journey from that 3 AM incident to a pipeline that handles half a million events per second with sub-microsecond latency, a few thoughts stand out.
Performance optimization is not about clever tricks or obscure techniques. It's about understanding where time goes and systematically eliminating waste. In our case, the waste was in coordination overhead - locks, context switches, and memory allocations that had nothing to do with our actual business logic.
The Disruptor pattern taught me that sometimes the best optimization is not doing something at all. We eliminated locks by not needing them. We eliminated allocations by pre-allocating. We eliminated cache misses by controlling memory layout. Each "not doing" required understanding what we were avoiding and why.
But perhaps the most important lesson is humility. Before that incident, I thought our pipeline was well-designed. It used standard, battle-tested components. It followed best practices. And yet it was 50x slower than it could have been. The lesson: measure everything, assume nothing, and always be willing to question your assumptions.
The Disruptor pattern may not be the right tool for your next project. But the principles behind it - understanding your hardware, minimizing coordination, and measuring relentlessly - apply everywhere. Whether you're building a trading system or a web application, these principles will serve you well.
Happy coding, and may your latencies be low and your throughput high.