Wednesday afternoon, 3:47 PM. I was reviewing our ring buffer implementation's performance metrics when something caught my eye. Our single-producer single-consumer (SPSC) queue was handling orders reliably, but the numbers didn't add up. We were processing individual messages at roughly 20-30 nanoseconds each - respectable by most standards, but our throughput calculations showed we were leaving significant performance on the table.
I pulled up the profiler and there it was: memory barriers. Every single enqueue operation and every single dequeue operation was executing a full memory fence. That's the cost of inter-thread visibility in modern CPUs. The useful copy into the buffer took perhaps 2-3 nanoseconds. The remaining 18-27 nanoseconds were pure synchronization overhead.
Let me put this in perspective. Our trading system was processing about 25 million events per second at peak. At 25ns average per operation, we were spending roughly 625 milliseconds per second on memory barriers alone. That's 62.5% of our CPU time on synchronization, not actual work.
I'd seen this pattern before in network packet processing. When you're handling individual packets, the per-packet overhead dominates. But what if you could batch packets together and amortize that overhead? The same principle had to apply here.
That evening, I started sketching out what would become our batched queue. The idea was simple in concept but tricky in execution: instead of publishing each element individually with its own memory barrier, collect elements into batches and publish them with a single fence. Same correctness guarantees, fraction of the overhead.
What followed was a two-week deep dive into memory ordering, cache behavior, and the subtle art of batch processing. The end result? A queue that processes elements at 5-10 nanoseconds each under batched workloads - a 2-4x improvement that transformed our system's throughput characteristics. But more importantly, I gained a much deeper understanding of where performance actually goes in lock-free data structures.
This is the story of that journey.
Part 2: Understanding Memory Barriers
Before we can optimize memory barriers, we need to understand what they actually do and why they cost so much. This section gets into the weeds of CPU architecture, but the concepts are essential for understanding our optimization.
What Is a Memory Barrier?
Modern CPUs don't execute instructions in strict program order. They reorder, speculate, and parallelize to maximize throughput. This is great for performance but creates a problem for multi-threaded code: Thread A might write to memory locations X and Y in that order, but Thread B might observe the writes in reverse order, or see only one of them.
A memory barrier (also called a memory fence) is a CPU instruction that enforces ordering. There are several types:
Store-Store Barrier: Ensures all stores before the barrier complete before any stores after it. This prevents the CPU from reordering writes.
Load-Load Barrier: Ensures all loads before the barrier complete before any loads after it. This prevents the CPU from speculatively reading ahead.
Load-Store Barrier: Ensures all loads before complete before any stores after.
Store-Load Barrier: The most expensive. Ensures all stores before complete and are visible to other CPUs before any loads after. This typically requires flushing the store buffer.
The Cost of Barriers
On x86-64 (Intel/AMD), most memory operations have implicit ordering that reduces the need for explicit barriers. But volatile writes in Java still typically compile to a LOCK-prefixed instruction or an explicit MFENCE, which:
Flushes the store buffer: The CPU maintains a buffer of pending writes for performance. A barrier forces these to drain to the cache.
Waits for cache coherency: The write must propagate to other cores' caches via the MESI protocol. This involves inter-core communication over the CPU's interconnect.
Prevents speculative execution: The CPU can't proceed past the barrier until the ordering is established, stalling the pipeline.
The actual cost varies by operation and CPU:
Operation
Typical Cost (cycles)
Normal load
~4 (L1 hit)
Normal store
~4 (L1 hit)
Volatile load
~4-10 (depending on cache state)
Volatile store
~20-50 (includes store buffer drain)
CAS (Compare-And-Swap)
~15-40 (plus cache line bouncing)
Full memory fence (MFENCE)
~30-100
On a 3 GHz CPU, a 30-cycle operation takes 10 nanoseconds. That's our memory barrier overhead per element.
How Ring Buffers Use Barriers
A typical SPSC ring buffer needs barriers at specific points:
// Producer sidepublic void offer(T element) { long pos = head; buffer[pos & mask] = element; // BARRIER: Ensure buffer write is visible before head update head = pos + 1; // volatile write includes barrier}// Consumer sidepublic T poll() { long pos = tail; if (pos == head) { // volatile read return null; } // BARRIER: Ensure we see the buffer write after seeing head update T element = buffer[pos & mask]; tail = pos + 1; // volatile write return element;}
Every offer() requires at least one store barrier (the volatile write to head). Every poll() requires at least one load barrier (the volatile read of head) and one store barrier (the volatile write to tail). That's 2-3 barriers per element processed.
The Insight: Barrier Amortization
Here's the key insight that drives our optimization: synchronization fences have a fixed cost regardless of payload size. Guarding one write costs roughly the same as guarding a thousand.
The batch approach uses 97% fewer cycles on barriers while moving the same data. This is the foundation of our optimization.
Visualizing Memory Barrier Impact
To truly understand the impact, let's visualize what happens at the CPU level during a single operation versus batched operations:
Compare this with batched operations:
The cost of the barrier is amortized across all N elements. If N=64, that's 0.4ns per element instead of 25ns.
CPU Pipeline Considerations
Modern CPUs use deep pipelines (15-20 stages on Intel/AMD) to achieve high instruction-level parallelism. A memory barrier disrupts this pipeline in several ways:
Retirement stall: Instructions after the barrier can't retire until the barrier completes
Issue stall: Some CPUs can't issue new instructions while waiting for the barrier
Speculation rollback: Any speculative execution past the barrier must be discarded
This pipeline disruption is why barriers cost dozens of cycles rather than single-digit cycles. The CPU has to drain and refill its pipeline, losing significant throughput.
By batching, we suffer this pipeline disruption once per batch instead of once per element. For a 64-element batch, that's a 64x reduction in pipeline disruptions.
Part 3: The Naive Single-Operation Approach
Let's establish our baseline with a straightforward SPSC queue implementation. This is the "obvious" approach that most developers would write.
package com.techishthoughts.batch;import java.lang.invoke.MethodHandles;import java.lang.invoke.VarHandle;/** * Single-Producer Single-Consumer queue with per-element synchronization. * * This is the baseline implementation that we'll optimize. * Each offer/poll operation includes full memory barrier overhead. */public class SingleOperationQueue<T> { // VarHandle for atomic operations private static final VarHandle HEAD; private static final VarHandle TAIL; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); HEAD = lookup.findVarHandle(SingleOperationQueue.class, "head", long.class); TAIL = lookup.findVarHandle(SingleOperationQueue.class, "tail", long.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } // Cache line padding to prevent false sharing private long p01, p02, p03, p04, p05, p06, p07; /** Producer's write position */ private volatile long head = 0; private long p11, p12, p13, p14, p15, p16, p17; /** Consumer's read position */ private volatile long tail = 0; private long p21, p22, p23, p24, p25, p26, p27; /** Ring buffer storage */ private final Object[] buffer; /** Capacity (power of 2) */ private final int capacity; /** Mask for fast modulo: index = position & mask */ private final int mask; /** * Creates a new queue with the specified capacity. * Capacity is rounded up to the next power of 2. */ public SingleOperationQueue(int requestedCapacity) { this.capacity = roundUpToPowerOf2(requestedCapacity); this.mask = this.capacity - 1; this.buffer = new Object[this.capacity]; } private static int roundUpToPowerOf2(int value) { int highBit = Integer.highestOneBit(value); return (highBit == value) ? value : highBit << 1; } /** * Adds an element to the queue. * * BARRIER COST: One volatile write (store barrier) * * @return true if successful, false if queue is full */ public boolean offer(T element) { if (element == null) { throw new NullPointerException("Null elements not allowed"); } long currentHead = head; // Plain read (we're the only writer) long currentTail = (long) TAIL.getAcquire(this); // Acquire semantics // Check if full (head would catch up to tail) if (currentHead - currentTail >= capacity) { return false; } // Write element to buffer int index = (int) (currentHead & mask); buffer[index] = element; // Publish: volatile write ensures element is visible before head advances // This is our MEMORY BARRIER - costs ~20-30ns HEAD.setRelease(this, currentHead + 1); return true; } /** * Removes and returns an element from the queue. * * BARRIER COST: One volatile read (acquire) + one volatile write (release) * * @return the next element, or null if queue is empty */ @SuppressWarnings("unchecked") public T poll() { long currentTail = tail; // Plain read (we're the only reader) long currentHead = (long) HEAD.getAcquire(this); // Acquire semantics // Check if empty if (currentTail >= currentHead) { return null; } // Read element from buffer int index = (int) (currentTail & mask); T element = (T) buffer[index]; // Help GC by clearing the reference buffer[index] = null; // Advance tail: volatile write ensures we don't re-read this slot TAIL.setRelease(this, currentTail + 1); return element; } /** * Returns approximate number of elements in queue. */ public int size() { long h = (long) HEAD.getAcquire(this); long t = (long) TAIL.getAcquire(this); long size = h - t; return (int) Math.max(0, Math.min(size, capacity)); } public boolean isEmpty() { return head == tail; } public int capacity() { return capacity; }}
Analyzing the Barrier Overhead
Let's trace through what happens when we process 100 elements:
So we're at roughly 25ns per element. Our target: get this under 10ns through batching.
Part 4: The Batched SPSC Queue Design
The key insight is that memory barriers order all preceding operations, not just the immediately previous one. If we batch multiple writes before issuing a barrier, all of them become visible simultaneously at a fraction of the per-element cost.
Design Goals
Reduce barrier frequency: Issue one barrier per batch instead of one per element
Maintain correctness: Consumer must never see partially-written batches
Preserve API simplicity: Caller shouldn't need to manage batching manually
Adaptive batching: Handle both bursty and steady workloads efficiently
The Two-Phase Commit Pattern
Our batched queue uses a two-phase commit pattern:
Phase 1 - Accumulate: Producer writes elements to the buffer without barriers
Phase 2 - Commit: A single barrier makes all accumulated elements visible
The Shadow Head Pattern
To implement this safely, we use a "shadow head" pattern. localHead is the producer's private write position with no synchronization overhead, while publishedHead is the position visible to the consumer, updated with volatile semantics.
The producer advances localHead with each write but only updates publishedHead when committing a batch.
// Producer stateprivate long localHead = 0; // Private, no barriers neededprivate volatile long publishedHead = 0; // Visible to consumerpublic void offerBatch(T[] elements, int count) { // Phase 1: Write all elements (no barriers) for (int i = 0; i < count; i++) { int index = (int) (localHead & mask); buffer[index] = elements[i]; localHead++; // Plain increment, no barrier } // Phase 2: Single barrier publishes entire batch PUBLISHED_HEAD.setRelease(this, localHead);}
Consumer Batch Reading
The consumer can also batch its operations:
public int pollBatch(T[] output, int maxCount) { long currentTail = localTail; long availableHead = (long) PUBLISHED_HEAD.getAcquire(this); // Calculate how many elements are available long available = availableHead - currentTail; int count = (int) Math.min(available, maxCount); if (count == 0) { return 0; } // Phase 1: Read all elements (no barriers between reads) for (int i = 0; i < count; i++) { int index = (int) (currentTail & mask); output[i] = (T) buffer[index]; buffer[index] = null; // Help GC currentTail++; } // Phase 2: Single barrier publishes consumption localTail = currentTail; PUBLISHED_TAIL.setRelease(this, currentTail); return count;}
Part 5: Complete Implementation
Here's the full batched SPSC queue implementation with detailed commentary:
package com.techishthoughts.batch;import java.lang.invoke.MethodHandles;import java.lang.invoke.VarHandle;import java.util.function.Consumer;/** * Batched Single-Producer Single-Consumer Queue. * * Optimizes throughput by amortizing memory barrier costs across multiple elements. * * Key optimizations: * 1. Shadow head/tail pattern: separate local and published positions * 2. Batch commit: single barrier for multiple elements * 3. Lazy publication: accumulate before publishing * 4. Adaptive batching: automatic batch size based on workload * * Performance characteristics: * - Per-element cost: 5-10ns (vs 20-30ns for single-operation queue) * - Optimal batch size: 10-100 elements * - 2-4x throughput improvement over naive implementation * * @param <T> Element type */public class BatchedSPSCQueue<T> { // ========== VarHandle Setup ========== private static final VarHandle PUBLISHED_HEAD; private static final VarHandle PUBLISHED_TAIL; private static final VarHandle CACHED_HEAD; private static final VarHandle CACHED_TAIL; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); PUBLISHED_HEAD = lookup.findVarHandle( BatchedSPSCQueue.class, "publishedHead", long.class); PUBLISHED_TAIL = lookup.findVarHandle( BatchedSPSCQueue.class, "publishedTail", long.class); CACHED_HEAD = lookup.findVarHandle( BatchedSPSCQueue.class, "cachedHead", long.class); CACHED_TAIL = lookup.findVarHandle( BatchedSPSCQueue.class, "cachedTail", long.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } // ========== Configuration ========== /** Default batch size for auto-commit */ private static final int DEFAULT_BATCH_SIZE = 64; /** Maximum batch size to prevent unbounded latency */ private static final int MAX_BATCH_SIZE = 1024; // ========== Producer Fields (Cache Line 1) ========== @SuppressWarnings("unused") private long p01, p02, p03, p04, p05, p06, p07; /** Producer's local write position (no synchronization needed) */ private long localHead = 0; /** Last position published to consumer */ private volatile long publishedHead = 0; /** Producer's cached view of consumer's tail (reduces volatile reads) */ private long cachedTail = 0; /** Elements accumulated since last publish */ private int pendingCount = 0; /** Configured batch size */ private final int batchSize; @SuppressWarnings("unused") private long p08, p09, p10; // ========== Consumer Fields (Cache Line 2) ========== @SuppressWarnings("unused") private long p11, p12, p13, p14, p15, p16, p17; /** Consumer's local read position (no synchronization needed) */ private long localTail = 0; /** Last position published to producer */ private volatile long publishedTail = 0; /** Consumer's cached view of producer's head (reduces volatile reads) */ private long cachedHead = 0; @SuppressWarnings("unused") private long p18, p19, p20; // ========== Shared State (Cache Line 3) ========== @SuppressWarnings("unused") private long p21, p22, p23, p24, p25, p26, p27; /** Ring buffer storage */ private final Object[] buffer; /** Buffer capacity (power of 2) */ private final int capacity; /** Bit mask for fast modulo: index = position & mask */ private final int mask; // ========== Constructors ========== /** * Creates a batched queue with default batch size. */ public BatchedSPSCQueue(int requestedCapacity) { this(requestedCapacity, DEFAULT_BATCH_SIZE); } /** * Creates a batched queue with specified batch size. * * @param requestedCapacity Queue capacity (rounded to power of 2) * @param batchSize Elements per batch (clamped to 1-MAX_BATCH_SIZE) */ public BatchedSPSCQueue(int requestedCapacity, int batchSize) { this.capacity = roundUpToPowerOf2(Math.max(requestedCapacity, 2)); this.mask = this.capacity - 1; this.buffer = new Object[this.capacity]; this.batchSize = Math.max(1, Math.min(batchSize, MAX_BATCH_SIZE)); } private static int roundUpToPowerOf2(int value) { int highBit = Integer.highestOneBit(value); return (highBit == value) ? value : highBit << 1; } // ========== Single Element API (with auto-batching) ========== /** * Adds an element to the queue. * * Elements are accumulated locally until batch size is reached, * then published with a single memory barrier. * * @return true if successful, false if queue is full */ public boolean offer(T element) { if (element == null) { throw new NullPointerException("Null elements not allowed"); } // Check if we have space (use cached tail to avoid volatile read) if (localHead - cachedTail >= capacity) { // Cached tail is stale - refresh it cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); if (localHead - cachedTail >= capacity) { // Publish any pending elements before failing if (pendingCount > 0) { commit(); } return false; // Queue is genuinely full } } // Write element to buffer (no barrier) int index = (int) (localHead & mask); buffer[index] = element; localHead++; pendingCount++; // Auto-commit when batch size reached if (pendingCount >= batchSize) { commit(); } return true; } /** * Forces publication of all pending elements. * * Call this when you need low latency and can't wait for batch to fill. * Also called automatically when batch size is reached. */ public void commit() { if (pendingCount > 0) { // Single barrier publishes all pending elements PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; } } /** * Retrieves and removes an element from the queue. * * @return the next element, or null if queue is empty */ @SuppressWarnings("unchecked") public T poll() { // Check if we have elements (use cached head to avoid volatile read) if (localTail >= cachedHead) { // Cached head is stale - refresh it cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); if (localTail >= cachedHead) { return null; // Queue is genuinely empty } } // Read element from buffer int index = (int) (localTail & mask); T element = (T) buffer[index]; buffer[index] = null; // Help GC localTail++; // Lazy publication: only update published tail periodically // This is safe because producer only needs to know we're not full if ((localTail & (batchSize - 1)) == 0) { PUBLISHED_TAIL.setRelease(this, localTail); } return element; } // ========== Batch API ========== /** * Adds multiple elements in a single batch. * * This is the most efficient way to use this queue when you have * multiple elements ready to enqueue. * * @param elements Array of elements to add * @param offset Starting index in the array * @param count Number of elements to add * @return Number of elements actually added */ public int offerBatch(T[] elements, int offset, int count) { if (count <= 0) return 0; // Calculate available space if (localHead - cachedTail >= capacity) { cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); } long available = capacity - (localHead - cachedTail); int toAdd = (int) Math.min(count, available); if (toAdd == 0) { return 0; } // Write all elements without barriers for (int i = 0; i < toAdd; i++) { T element = elements[offset + i]; if (element == null) { throw new NullPointerException( "Null element at index " + (offset + i)); } int index = (int) (localHead & mask); buffer[index] = element; localHead++; } // Single barrier publishes entire batch PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; return toAdd; } /** * Retrieves multiple elements in a single batch. * * @param output Array to store retrieved elements * @param offset Starting index in output array * @param maxCount Maximum elements to retrieve * @return Number of elements retrieved */ @SuppressWarnings("unchecked") public int pollBatch(T[] output, int offset, int maxCount) { if (maxCount <= 0) return 0; // Calculate available elements if (localTail >= cachedHead) { cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); } long available = cachedHead - localTail; int toRead = (int) Math.min(maxCount, available); if (toRead == 0) { return 0; } // Read all elements for (int i = 0; i < toRead; i++) { int index = (int) (localTail & mask); output[offset + i] = (T) buffer[index]; buffer[index] = null; localTail++; } // Single barrier publishes consumption PUBLISHED_TAIL.setRelease(this, localTail); return toRead; } /** * Drains all available elements to a consumer function. * * More efficient than repeated poll() calls. * * @param consumer Function to process each element * @return Number of elements processed */ @SuppressWarnings("unchecked") public int drain(Consumer<T> consumer) { // Get all available elements cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); long available = cachedHead - localTail; if (available <= 0) { return 0; } int count = (int) available; // Process all elements for (int i = 0; i < count; i++) { int index = (int) (localTail & mask); T element = (T) buffer[index]; buffer[index] = null; localTail++; consumer.accept(element); } // Single barrier at the end PUBLISHED_TAIL.setRelease(this, localTail); return count; } /** * Fills the queue from a supplier function. * * Efficient for bulk loading. * * @param supplier Function that supplies elements (return null to stop) * @param maxCount Maximum elements to add * @return Number of elements added */ public int fill(java.util.function.Supplier<T> supplier, int maxCount) { // Calculate available space if (localHead - cachedTail >= capacity) { cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); } long available = capacity - (localHead - cachedTail); int canAdd = (int) Math.min(maxCount, available); int added = 0; for (int i = 0; i < canAdd; i++) { T element = supplier.get(); if (element == null) { break; // Supplier exhausted } int index = (int) (localHead & mask); buffer[index] = element; localHead++; added++; } if (added > 0) { PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; } return added; } // ========== Query Operations ========== /** * Returns the number of pending (uncommitted) elements. * Only meaningful from producer thread. */ public int pendingCount() { return pendingCount; } /** * Returns approximate size of the queue. */ public int size() { long h = (long) PUBLISHED_HEAD.getAcquire(this); long t = (long) PUBLISHED_TAIL.getAcquire(this); long size = h - t; return (int) Math.max(0, Math.min(size, capacity)); } public boolean isEmpty() { return publishedHead == publishedTail; } public int capacity() { return capacity; } /** * Returns configured batch size. */ public int batchSize() { return batchSize; }}
Design Decisions Explained
Why separate local and published positions?
The producer's localHead can advance freely without any synchronization - it's only accessed by the producer thread. Only when we commit do we update publishedHead with a barrier. This separation is what enables batching.
Why cache the opposite position?
The producer needs to know the consumer's tail to check if the queue is full. But reading a volatile is expensive. By caching the tail locally and only refreshing when we think the queue might be full, we eliminate most volatile reads.
Why auto-commit at batch size?
Without auto-commit, elements could sit unpublished indefinitely, hurting latency. Auto-commit at batch size provides predictable latency while preserving throughput.
Why lazy tail publication?
The consumer doesn't need to publish its tail after every element - the producer only cares that the queue isn't full. Publishing every batchSize elements is sufficient and reduces barriers.
Part 6: Memory Ordering Deep Dive
Now examine how the batched queue maintains correctness despite reduced synchronization.
The Happens-Before Relationship
Java's memory model defines a "happens-before" relationship. If action A happens-before action B, then A's effects are visible to B. Key rules:
Program order: Within a thread, earlier actions happen-before later ones
Volatile write/read: A volatile write happens-before subsequent volatile reads of the same variable
Transitivity: If A happens-before B, and B happens-before C, then A happens-before C
Our queue relies on these relationships:
The release-acquire pairing ensures that when the consumer sees head = 3, it also sees all buffer writes that preceded the release.
Why setRelease/getAcquire Instead of Volatile?
Full volatile has stronger semantics than we need:
// Full volatile write includes:// 1. StoreStore barrier before (our writes complete before this write)// 2. StoreLoad barrier after (this write completes before any loads)// Release semantics only include:// 1. StoreStore barrier before (our writes complete before this write)// No StoreLoad barrier - we don't need to order with subsequent loads
The StoreLoad barrier is the most expensive part of a volatile write. Release semantics give us what we need at lower cost.
Proving Correctness
We can prove the batched queue is correct:
Claim: The consumer never reads a buffer slot before the producer has written to it.
Proof:
Producer writes to buffer[i] before updating publishedHead (program order)
publishedHead update uses release semantics
Consumer reads publishedHead with acquire semantics before reading buffer[i]
Release-acquire creates happens-before edge
By transitivity: buffer[i] write happens-before buffer[i] read
Therefore consumer sees producer's write. QED.
Claim: Producer never overwrites a slot the consumer hasn't read.
Proof:
Consumer reads buffer[i] before updating publishedTail (program order)
publishedTail update uses release semantics
Producer reads publishedTail with acquire semantics before writing buffer[i]
By transitivity: consumer's buffer[i] read happens-before producer's buffer[i] write
Therefore producer writes only to consumed slots. QED.
The ABA Problem (And Why We Don't Have It)
The ABA problem occurs when:
Thread reads value A
Thread is preempted
Value changes A -> B -> A
Thread resumes, sees A, incorrectly assumes no change
Our queue avoids this by using monotonically increasing positions:
Head only increases
Tail only increases
We use long (64-bit) positions - would take 2^63 operations to wrap
Even if positions could wrap, the slot-based indexing means we'd be accessing the same logical slot. The sequence numbers embedded in our positions ensure we're always in the correct cycle.
Part 7: Batch Size Optimization
Choosing the right batch size is crucial. Too small and we don't amortize barriers effectively. Too large and we add latency.
The Trade-off
Analyzing Barrier Cost vs Batch Size
Let's model the cost mathematically:
T(n) = Total time to process n elementsB = Cost of one memory barrier (~25ns)W = Cost of one buffer write (~3ns)S = Batch sizeWith no batching:T(n) = n × (W + B) = n × 28nsWith batching:T(n) = n × W + (n/S) × B = n × 3ns + (n/S) × 25ns = n × (3 + 25/S) nsFor S=10: T(n) = n × 5.5ns (49% of naive)For S=50: T(n) = n × 3.5ns (67% improvement)For S=100: T(n) = n × 3.25ns (diminishing returns)
Empirical Batch Size Analysis
We benchmarked various batch sizes:
Batch Size
Per-Element Latency
Throughput (M/s)
Commit Latency
1
24.3ns
41.2
<1ns
10
8.7ns
114.9
~100ns
25
6.2ns
161.3
~250ns
50
5.1ns
196.1
~500ns
100
4.6ns
217.4
~1μs
250
4.2ns
238.1
~2.5μs
500
4.0ns
250.0
~5μs
1000
3.9ns
256.4
~10μs
Key observations:
Sweet spot at 50-100: Best throughput/latency balance
Diminishing returns past 100: Throughput improvement flattens
Latency concern at 250+: Commit latency becomes noticeable
Adaptive Batch Sizing
For workloads with variable arrival rates, consider adaptive batching:
/** * Adaptive batch sizing based on arrival rate. */public class AdaptiveBatchedQueue<T> extends BatchedSPSCQueue<T> { private long lastCommitTime = System.nanoTime(); private int adaptiveBatchSize = 32; // Target commit latency in nanoseconds private static final long TARGET_LATENCY_NS = 1_000_000; // 1ms @Override public boolean offer(T element) { boolean result = super.offer(element); // Adapt batch size based on actual commit frequency if (pendingCount() == 0) { // Just committed long now = System.nanoTime(); long elapsed = now - lastCommitTime; lastCommitTime = now; if (elapsed > TARGET_LATENCY_NS * 2) { // Commits are too infrequent - reduce batch size adaptiveBatchSize = Math.max(8, adaptiveBatchSize / 2); } else if (elapsed < TARGET_LATENCY_NS / 2) { // Commits are too frequent - increase batch size adaptiveBatchSize = Math.min(512, adaptiveBatchSize * 2); } } return result; }}
Latency vs Throughput Profiles
Different applications need different trade-offs:
High-Frequency Trading (Latency-Critical):
Batch size: 8-16
Auto-commit timeout: 100μs
Accept lower throughput for predictable latency
Log Aggregation (Throughput-Critical):
Batch size: 256-512
Auto-commit timeout: 10ms
Maximize throughput, latency less important
Real-Time Analytics (Balanced):
Batch size: 32-64
Auto-commit timeout: 1ms
Balance both concerns
Part 8: Comprehensive Benchmarks
Let's rigorously compare our implementations across various scenarios.
Benchmark Suite
@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})@OutputTimeUnit(TimeUnit.NANOSECONDS)@Warmup(iterations = 5, time = 2)@Measurement(iterations = 10, time = 5)@Fork(value = 3, jvmArgs = { "-Xms4g", "-Xmx4g", "-XX:+UseG1GC", "-XX:+AlwaysPreTouch", "-XX:-UseBiasedLocking"})@State(Scope.Benchmark)public class BatchProcessingBenchmark { @Param({"16", "64", "256", "1024"}) private int batchSize; private SingleOperationQueue<Long> singleOpQueue; private BatchedSPSCQueue<Long> batchedQueue; private Long[] batchBuffer; private long producerCounter = 0; private volatile boolean running = true; private Thread consumerThread; @Setup(Level.Trial) public void setup() { singleOpQueue = new SingleOperationQueue<>(64 * 1024); batchedQueue = new BatchedSPSCQueue<>(64 * 1024, batchSize); batchBuffer = new Long[batchSize]; // Pre-populate batch buffer for (int i = 0; i < batchSize; i++) { batchBuffer[i] = (long) i; } // Background consumer for throughput tests running = true; consumerThread = new Thread(() -> { Long[] drainBuffer = new Long[1024]; while (running) { batchedQueue.pollBatch(drainBuffer, 0, 1024); singleOpQueue.poll(); } }); consumerThread.setDaemon(true); consumerThread.start(); } @TearDown(Level.Trial) public void teardown() throws InterruptedException { running = false; consumerThread.join(1000); } // ========== Single Operation Benchmarks ========== @Benchmark public boolean singleOp_offer() { return singleOpQueue.offer(producerCounter++); } @Benchmark public Long singleOp_poll() { return singleOpQueue.poll(); } // ========== Batched Operation Benchmarks ========== @Benchmark public boolean batched_offer() { boolean result = batchedQueue.offer(producerCounter++); if (producerCounter % batchSize == 0) { batchedQueue.commit(); } return result; } @Benchmark public int batched_offerBatch() { for (int i = 0; i < batchSize; i++) { batchBuffer[i] = producerCounter++; } return batchedQueue.offerBatch(batchBuffer, 0, batchSize); } @Benchmark public int batched_pollBatch() { return batchedQueue.pollBatch(batchBuffer, 0, batchSize); } // ========== Latency Distribution Benchmark ========== @Benchmark @BenchmarkMode(Mode.SampleTime) public void latencyDistribution_singleOp(Blackhole bh) { bh.consume(singleOpQueue.offer(producerCounter++)); } @Benchmark @BenchmarkMode(Mode.SampleTime) public void latencyDistribution_batched(Blackhole bh) { bh.consume(batchedQueue.offer(producerCounter++)); if (producerCounter % batchSize == 0) { batchedQueue.commit(); } }}
Throughput Results
Operations per Second (higher is better):
Implementation
Batch=16
Batch=64
Batch=256
Batch=1024
SingleOp offer
41.2M
41.2M
41.2M
41.2M
Batched offer
89.7M
156.3M
198.4M
215.2M
Batched offerBatch
124.5M
245.8M
412.7M
498.3M
Improvement (offer)
2.18x
3.79x
4.82x
5.22x
Improvement (batch)
3.02x
5.97x
10.02x
12.10x
Key findings:
Batched offer with auto-commit: 2-5x improvement
Explicit batch API: 3-12x improvement
Larger batches yield better throughput but have diminishing returns
Why does batching improve cache behavior so dramatically? Several factors:
1. Spatial Locality
When we batch writes, consecutive buffer slots are accessed together. Modern CPUs prefetch cache lines speculatively - by accessing slots sequentially in a tight loop, we benefit from hardware prefetching.
// Single operation: random access pattern from CPU perspectivefor (each_operation) { // barrier overhead buffer[index] = element; // barrier overhead}// Batched: sequential access patternfor (each_element_in_batch) { buffer[index++] = element; // No barrier between writes}// Single barrier at end
2. Reduced Cache Line Bouncing
Each volatile write can cause the cache line containing head to bounce between cores. With batching, this bouncing happens once per batch instead of once per element.
3. Store Buffer Utilization
The CPU's store buffer is finite (typically 42-56 entries on modern Intel). Without batching, the store buffer fills with barrier-waiting entries. With batching, the store buffer holds actual data writes and drains efficiently during the single barrier.
Flame Graph Analysis
Profiling with async-profiler shows where time is spent:
Single Operation Queue - CPU Flame Graph:
38% of time is spent in setRelease - that's our memory barrier overhead.
Batched Queue - CPU Flame Graph:
With batching, setRelease drops from 38% to 11%. The time is now dominated by actual buffer operations (45%), which is exactly what we want.
Part 9: Real-World Integration Patterns
Let's explore how to integrate batched queues into production systems.
Pattern 1: Timed Commit
For latency-sensitive applications that can't wait for full batches:
/** * Queue with time-based auto-commit. * Commits pending elements if no activity for specified duration. */public class TimedBatchedQueue<T> extends BatchedSPSCQueue<T> { private final long commitTimeoutNanos; private long lastActivityTime; private final ScheduledExecutorService scheduler; private ScheduledFuture<?> commitTask; public TimedBatchedQueue(int capacity, int batchSize, long commitTimeout, TimeUnit unit) { super(capacity, batchSize); this.commitTimeoutNanos = unit.toNanos(commitTimeout); this.lastActivityTime = System.nanoTime(); this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "batch-commit"); t.setDaemon(true); return t; }); // Schedule periodic commit check scheduleCommitCheck(); } @Override public boolean offer(T element) { lastActivityTime = System.nanoTime(); return super.offer(element); } private void scheduleCommitCheck() { commitTask = scheduler.scheduleAtFixedRate(() -> { long elapsed = System.nanoTime() - lastActivityTime; if (elapsed >= commitTimeoutNanos && pendingCount() > 0) { commit(); } }, commitTimeoutNanos, commitTimeoutNanos / 2, TimeUnit.NANOSECONDS); } public void shutdown() { commitTask.cancel(false); scheduler.shutdown(); }}
Pattern 2: Backpressure-Aware Batching
When the queue fills up, we want to apply backpressure rather than drop elements:
/** * Batched queue with blocking backpressure. */public class BackpressureBatchedQueue<T> extends BatchedSPSCQueue<T> { private final LockSupport parkSupport = new LockSupport(); private volatile Thread waitingProducer; @Override public boolean offer(T element) { while (!super.offer(element)) { // Queue full - apply backpressure waitingProducer = Thread.currentThread(); LockSupport.parkNanos(1_000_000); // 1ms if (Thread.interrupted()) { return false; } } return true; } @Override public T poll() { T element = super.poll(); // Wake up waiting producer if we made space Thread waiting = waitingProducer; if (waiting != null) { waitingProducer = null; LockSupport.unpark(waiting); } return element; }}
Pattern 3: Metrics Integration
Adding observability to understand batch behavior:
/** * Batched queue with metrics collection. */public class MetricsBatchedQueue<T> extends BatchedSPSCQueue<T> { // Metrics counters private final LongAdder totalOffers = new LongAdder(); private final LongAdder totalPolls = new LongAdder(); private final LongAdder totalCommits = new LongAdder(); private final LongAdder batchedElements = new LongAdder(); // Histograms for batch size distribution private final long[] batchSizeHistogram = new long[17]; // 0-16 slots @Override public boolean offer(T element) { boolean result = super.offer(element); if (result) { totalOffers.increment(); } return result; } @Override public void commit() { int pending = pendingCount(); if (pending > 0) { totalCommits.increment(); batchedElements.add(pending); // Record in histogram (log2 buckets) int bucket = Math.min(16, 32 - Integer.numberOfLeadingZeros(pending)); batchSizeHistogram[bucket]++; } super.commit(); } @Override public T poll() { T element = super.poll(); if (element != null) { totalPolls.increment(); } return element; } // Metrics accessors public long getTotalOffers() { return totalOffers.sum(); } public long getTotalPolls() { return totalPolls.sum(); } public long getTotalCommits() { return totalCommits.sum(); } public double getAverageBatchSize() { long commits = totalCommits.sum(); return commits == 0 ? 0 : (double) batchedElements.sum() / commits; } public String getBatchSizeDistribution() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < batchSizeHistogram.length; i++) { if (batchSizeHistogram[i] > 0) { int low = i == 0 ? 0 : (1 << (i - 1)); int high = (1 << i) - 1; sb.append(String.format("[%d-%d]: %d%n", low, high, batchSizeHistogram[i])); } } return sb.toString(); }}
Pattern 4: Event Pipeline with Batching
A complete event processing pipeline using batched queues:
/** * High-throughput event pipeline using batched SPSC queues. */public class BatchedEventPipeline<E> { private final BatchedSPSCQueue<E> inputQueue; private final BatchedSPSCQueue<E> outputQueue; private final Function<E, E> processor; private final Thread workerThread; private volatile boolean running = true; // Batch processing buffers private static final int PROCESS_BATCH_SIZE = 64; private final Object[] inputBatch = new Object[PROCESS_BATCH_SIZE]; private final Object[] outputBatch = new Object[PROCESS_BATCH_SIZE]; public BatchedEventPipeline(int queueCapacity, Function<E, E> processor) { this.inputQueue = new BatchedSPSCQueue<>(queueCapacity, 32); this.outputQueue = new BatchedSPSCQueue<>(queueCapacity, 32); this.processor = processor; this.workerThread = new Thread(this::processLoop, "event-processor"); this.workerThread.setDaemon(true); this.workerThread.start(); } /** * Submit an event for processing. */ public boolean submit(E event) { return inputQueue.offer(event); } /** * Retrieve a processed event. */ public E retrieve() { return outputQueue.poll(); } /** * Retrieve multiple processed events. */ @SuppressWarnings("unchecked") public int retrieveBatch(E[] output, int maxCount) { return outputQueue.pollBatch(output, 0, maxCount); } @SuppressWarnings("unchecked") private void processLoop() { while (running) { // Batch read from input int count = inputQueue.pollBatch((E[]) inputBatch, 0, PROCESS_BATCH_SIZE); if (count == 0) { // No input - brief pause to avoid spinning Thread.onSpinWait(); continue; } // Process batch for (int i = 0; i < count; i++) { E input = (E) inputBatch[i]; E output = processor.apply(input); outputBatch[i] = output; inputBatch[i] = null; // Help GC } // Batch write to output outputQueue.offerBatch((E[]) outputBatch, 0, count); // Clear output batch for (int i = 0; i < count; i++) { outputBatch[i] = null; } } } public void shutdown() throws InterruptedException { running = false; workerThread.join(5000); }}
Pattern 5: Multi-Stage Pipeline
Chaining multiple batched stages:
/** * Multi-stage pipeline with batched queues between stages. */public class MultiStagePipeline<I, M, O> { private final BatchedEventPipeline<I> stage1; private final BatchedEventPipeline<M> stage2; private final Thread bridgeThread; private volatile boolean running = true; public MultiStagePipeline( int queueCapacity, Function<I, M> stage1Processor, Function<M, O> stage2Processor) { this.stage1 = new BatchedEventPipeline<>(queueCapacity, i -> { // Stage 1 processes and produces intermediate type return stage1Processor.apply(i); }); // Bridge connects stage1 output to stage2 input this.bridgeThread = new Thread(() -> { Object[] batch = new Object[64]; while (running) { int count = stage1.retrieveBatch((I[]) batch, 64); if (count > 0) { // Forward to stage 2 // (In real code, stage2 would accept M type) } Thread.onSpinWait(); } }, "pipeline-bridge"); bridgeThread.setDaemon(true); bridgeThread.start(); // Stage 2 would be similar this.stage2 = null; // Simplified for example } public void submit(I input) { stage1.submit(input); } public void shutdown() throws InterruptedException { running = false; stage1.shutdown(); bridgeThread.join(5000); }}
Part 10: Advanced Optimizations
For those seeking maximum performance, here are additional optimizations to consider.
Optimization 1: Prefetching
Modern CPUs can prefetch data before it's needed. We can hint at upcoming accesses:
/** * Batched queue with software prefetching hints. */public class PrefetchingBatchedQueue<T> extends BatchedSPSCQueue<T> { private static final VarHandle ARRAY_HANDLE = MethodHandles.arrayElementVarHandle(Object[].class); @Override @SuppressWarnings("unchecked") public int pollBatch(T[] output, int offset, int maxCount) { // ... availability check ... for (int i = 0; i < toRead; i++) { int index = (int) (localTail & mask); // Prefetch next cache line if (i < toRead - 8) { int prefetchIndex = (int) ((localTail + 8) & mask); ARRAY_HANDLE.getOpaque(buffer, prefetchIndex); } output[offset + i] = (T) buffer[index]; buffer[index] = null; localTail++; } // ... publish tail ... return toRead; }}
Optimization 2: Cache Line Alignment for Buffer
Ensuring buffer elements are cache-line aligned can improve performance:
/** * Queue with cache-aligned buffer slots. * Each slot occupies a full cache line to prevent false sharing. */public class CacheAlignedBatchedQueue<T> { // Each slot is padded to 64 bytes (cache line size) private static final int SLOT_PADDING = 64 / 8; // 8 longs per slot private final long[] paddedBuffer; // Use longs for padding private final Object[] actualElements; public CacheAlignedBatchedQueue(int capacity) { // Actual elements stored separately this.actualElements = new Object[capacity]; // Padded buffer for index tracking (optional, for demonstration) this.paddedBuffer = new long[capacity * SLOT_PADDING]; }}
Optimization 3: NUMA-Aware Placement
On multi-socket systems, memory locality matters:
/** * NUMA-aware queue that keeps producer and consumer data * on their respective NUMA nodes. */public class NumaAwareBatchedQueue<T> extends BatchedSPSCQueue<T> { // Hint: Use JVM flags to control thread/memory placement // -XX:+UseNUMA // -XX:+UseNUMAInterleaving // Or use libraries like OpenHFT's affinity for explicit control public static void pinProducerToNode(int numaNode) { // Implementation depends on affinity library // AffinityLock.acquireLock(numaNode); } public static void pinConsumerToNode(int numaNode) { // AffinityLock.acquireLock(numaNode); }}
Optimization 4: Busy-Spin vs. Yield Trade-off
For ultra-low-latency, busy-spinning beats yielding:
/** * Configurable wait strategy for batched queue. */public enum WaitStrategy { BUSY_SPIN { @Override public void waitFor() { Thread.onSpinWait(); } }, YIELD { @Override public void waitFor() { Thread.yield(); } }, PARK { @Override public void waitFor() { LockSupport.parkNanos(1000); } }, ADAPTIVE { private int spins = 0; @Override public void waitFor() { if (++spins < 100) { Thread.onSpinWait(); } else if (spins < 200) { Thread.yield(); } else { LockSupport.parkNanos(1000); spins = 0; } } }; public abstract void waitFor();}
Pattern 6: Graceful Degradation Under Load
When the system is under stress, graceful degradation is crucial:
/** * Queue with load-adaptive behavior. * Reduces batch size under backpressure to maintain responsiveness. */public class LoadAdaptiveQueue<T> extends BatchedSPSCQueue<T> { private final int normalBatchSize; private final int stressedBatchSize; private volatile int currentBatchSize; // Load detection private long lastPollTime = System.nanoTime(); private static final long STRESS_THRESHOLD_NS = 1_000_000; // 1ms public LoadAdaptiveQueue(int capacity, int normalBatch, int stressedBatch) { super(capacity, normalBatch); this.normalBatchSize = normalBatch; this.stressedBatchSize = stressedBatch; this.currentBatchSize = normalBatch; } @Override public boolean offer(T element) { // Detect stress: if queue is >75% full, reduce batch size int currentSize = size(); if (currentSize > capacity() * 0.75) { currentBatchSize = stressedBatchSize; } else if (currentSize < capacity() * 0.25) { currentBatchSize = normalBatchSize; } boolean result = super.offer(element); // Auto-commit at current (possibly reduced) batch size if (pendingCount() >= currentBatchSize) { commit(); } return result; } public int getCurrentBatchSize() { return currentBatchSize; } public boolean isUnderStress() { return currentBatchSize < normalBatchSize; }}
Part 10B: Production Deployment Considerations
Deploying batched queues in production requires attention to several operational concerns.
JVM Configuration
Recommended JVM flags for batched queue performance:
java \ -Xms8g -Xmx8g \ # Fixed heap to avoid resizing -XX:+UseG1GC \ # Or ZGC for ultra-low latency -XX:MaxGCPauseMillis=10 \ # Aggressive GC target -XX:+AlwaysPreTouch \ # Touch pages at startup -XX:-UseBiasedLocking \ # Disable biased locking -XX:+UseNUMA \ # NUMA awareness -XX:+PerfDisableSharedMem \ # Disable perf shared memory -Djava.lang.Integer.IntegerCache.high=10000 \ # Larger int cache -jar application.jar
Why disable biased locking?
Biased locking assumes low contention and adds overhead when revoking biases. In high-throughput systems, this revocation cost can spike latency. Disabling it provides more consistent performance.
Why AlwaysPreTouch?
Without pre-touching, the JVM lazily allocates physical memory pages. First access to a new page triggers a page fault, adding latency jitter. Pre-touching at startup makes memory access times predictable.
Thread Affinity
For maximum performance, pin threads to specific CPU cores:
/** * Thread affinity helper using OpenHFT Affinity library. */public class AffinityHelper { /** * Pin current thread to a specific CPU core. */ public static void pinToCore(int coreId) { try { // Using OpenHFT Affinity // AffinityLock lock = AffinityLock.acquireLock(coreId); // Or using JNA to call sched_setaffinity System.out.println("Pinned thread " + Thread.currentThread().getName() + " to core " + coreId); } catch (Exception e) { System.err.println("Failed to pin thread: " + e.getMessage()); } } /** * Pin producer and consumer to different cores. */ public static void setupAffinityForSPSC(Thread producer, Thread consumer) { // Ideally, same socket but different physical cores // Avoid hyperthreads of the same core // Example: Producer on core 0, Consumer on core 2 // (assuming cores 0,1 are hyperthreads and 2,3 are hyperthreads) producer.start(); pinToCore(0); consumer.start(); // Consumer should pin itself when it starts }}
Monitoring and Alerting
Set up monitoring for queue health:
/** * Queue health metrics for monitoring systems. */public class QueueHealthMetrics { private final BatchedSPSCQueue<?> queue; private final MeterRegistry registry; // Micrometer public QueueHealthMetrics(BatchedSPSCQueue<?> queue, MeterRegistry registry) { this.queue = queue; this.registry = registry; // Register gauges Gauge.builder("queue.size", queue, BatchedSPSCQueue::size) .description("Current queue size") .register(registry); Gauge.builder("queue.capacity", queue, BatchedSPSCQueue::capacity) .description("Queue capacity") .register(registry); Gauge.builder("queue.utilization", queue, q -> (double) q.size() / q.capacity()) .description("Queue utilization percentage") .register(registry); Gauge.builder("queue.pending", queue, BatchedSPSCQueue::pendingCount) .description("Uncommitted elements") .register(registry); } /** * Check if queue is healthy. */ public boolean isHealthy() { double utilization = (double) queue.size() / queue.capacity(); // Unhealthy if >90% full if (utilization > 0.9) { return false; } // Unhealthy if many pending uncommitted elements if (queue.pendingCount() > queue.batchSize() * 2) { return false; } return true; }}
Alerting Thresholds
Metric
Warning
Critical
Action
Queue utilization
> 70%
> 90%
Scale consumers
Pending elements
> 2x batch
> 5x batch
Check producer
Commit latency
> 10ms
> 100ms
Reduce batch size
Poll empty rate
> 50%
> 90%
Check producers
Capacity Planning
To size your queue appropriately:
Required capacity = Peak rate × Maximum latency tolerance × Safety factorExample:- Peak rate: 100,000 events/second- Latency tolerance: 100ms- Safety factor: 2xCapacity = 100,000 × 0.1 × 2 = 20,000 elementsRound up to power of 2: 32,768 elements
Part 11: Trade-offs and When to Use
Summary of Trade-offs
Advantages:
✓ 2-4x lower per-element latency
✓ 2-4x higher throughput
✓ Better cache utilization
✓ Fewer memory barriers
✓ Same correctness guarantees
Disadvantages:
✗ Increased complexity
✗ Commit latency for partial batches
✗ Requires batch-aware consumers
✗ May need tuning for batch size
✗ Slightly higher memory footprint
Use When:
✓ High throughput required
✓ Batch workloads natural
✓ Can tolerate commit delay
✓ SPSC pattern fits architecture
✓ Memory barriers are bottleneck
Avoid When:
✗ Single-element latency critical
✗ Unpredictable arrival patterns
✗ Team unfamiliar with batching
✗ Simple queue is fast enough
✗ Other bottlenecks dominate
Decision Matrix
Scenario
Recommendation
Batch Size
HFT order processing
Batched + timed commit
16-32, 100μs timeout
Log aggregation
Batched + large batches
256-512
Real-time analytics
Batched + adaptive
32-64, 1ms timeout
Network packet processing
Batched + full batches
64-128, no timeout
Database write-behind
Batched + large batches
512-1024
Game server events
Single operation
N/A
GUI event handling
Single operation
N/A
When Batching Doesn't Help
Batching provides minimal benefit when:
Throughput is already sufficient: If single-operation performance meets requirements, added complexity isn't justified
Memory barriers aren't the bottleneck: Profile first. If time is spent in business logic, batching won't help
Single-element latency is critical: Each uncommitted element adds latency. For strict latency SLAs, batching may not be suitable
Arrival pattern is truly random: If elements arrive one at a time with significant gaps, batches never fill naturally
Part 12: Conclusion and Key Takeaways
The journey from 25ns to 5ns per element taught me something fundamental about performance optimization: the overhead often isn't where you expect it.
When I first profiled our queue, I assumed the bottleneck was data movement - copying references, updating indices, and checking bounds. The useful work was just 2-3 nanoseconds. The remaining 22 nanoseconds came from the fences that keep threads aligned.
The lesson is simple: fence cost stays roughly flat even as the protected batch grows. By batching our operations - accumulating multiple elements before a single publication - we amortized that fixed cost across many elements.
Key Takeaways
1. Memory Barriers Have Fixed Cost
A release/acquire barrier costs roughly 20-30 nanoseconds regardless of how much data it orders. Design your synchronization around this.
2. Batch Size Matters
Too small: you don't amortize barriers effectively. Too large: you add latency waiting for batches to fill. The sweet spot is typically 50-100 elements for throughput, 10-30 for latency-sensitive applications.
3. Shadow State Enables Batching
Separating local state (private to one thread) from published state (visible to other threads) is the key pattern. Advance local state freely; update published state only when committing.
4. Profile Before Optimizing
We only discovered memory barriers as the bottleneck through profiling. The fix is specific to this bottleneck - it wouldn't help if the issue were elsewhere.
5. Correctness Through Memory Ordering
Our batched queue maintains the same correctness guarantees as single-operation queues. Release-acquire semantics ensure that when the consumer sees the updated position, it also sees all the data that was written.
The Numbers
Queue Type
Per-element Latency
Throughput
Memory Barriers
Single Operation
20-30ns
~40M ops/sec
2-3 per element
Batched (size 64)
5-10ns
~160M ops/sec
1 per batch (64 elements)
Improvement
2-4x
2-4x
N/A
Final Thoughts
The batched SPSC queue is now a core component of our trading infrastructure. It handles market data feeds where every nanosecond counts and log aggregation pipelines where throughput is king. The same fundamental optimization - amortizing barrier costs - applies in both cases.
But the real lesson isn't about batching specifically. It's about understanding where time actually goes. Modern hardware is incredibly fast at moving data but surprisingly slow at synchronization. When you find yourself constrained by synchronization overhead rather than actual work, batching is often the answer.
The next time you're optimizing a concurrent data structure, don't just focus on the algorithm. Look at the synchronization. Count the barriers. Measure their cost. And consider whether you can batch your way to better performance.
Sometimes the biggest wins come from doing less, not more - fewer barriers, fewer cache line transfers, fewer synchronization events. In the world of lock-free programming, less is often more.
Appendix A: Quick Reference
API Summary
// CreationBatchedSPSCQueue<T> queue = new BatchedSPSCQueue<>(capacity);BatchedSPSCQueue<T> queue = new BatchedSPSCQueue<>(capacity, batchSize);// Single-element API (auto-batching)boolean success = queue.offer(element); // Add element, auto-commits at batch sizequeue.commit(); // Force publication of pending elementsT element = queue.poll(); // Remove and return element// Batch API (explicit batching)int added = queue.offerBatch(array, offset, count); // Add multiple elementsint removed = queue.pollBatch(array, offset, count); // Remove multiple elementsint drained = queue.drain(consumer); // Drain all to consumerint filled = queue.fill(supplier, maxCount); // Fill from supplier// Queryint size = queue.size(); // Approximate sizeboolean empty = queue.isEmpty(); // Check if emptyint pending = queue.pendingCount(); // Uncommitted elements
Optimal Batch Sizes
Use Case
Recommended Batch Size
Commit Strategy
Ultra-low latency
8-16
Timed (100μs)
Low latency
16-32
Timed (500μs)
Balanced
32-64
Timed (1ms)
High throughput
128-256
Size-based
Maximum throughput
512-1024
Size-based
Memory Ordering Summary
Operation
Semantics
Cost
offer() - write
Plain
~3ns
commit()
Release
~25ns
poll() - read head
Acquire
~15ns
poll() - write tail
Release
~25ns
Per batch (64 elements):
Writes: 64 × 3ns = 192ns
Barriers: 1 × 25ns = 25ns
Total: 217ns / 64 = 3.4ns per element
Appendix B: Troubleshooting
Common Issues and Solutions
Problem: Elements not visible to consumer
Cause: Forgot to call commit()
Solution: Enable auto-commit or call commit() explicitly after writing
Cause: Race condition - likely bug in implementation
Solution: Verify memory ordering, check for missing acquire/release
// Verify these patterns:PUBLISHED_HEAD.setRelease(this, localHead); // After all writeslong head = (long) PUBLISHED_HEAD.getAcquire(this); // Before any reads
Appendix C: Further Reading
Books
"The Art of Multiprocessor Programming" by Herlihy & Shavit - Definitive guide to lock-free algorithms
"Java Concurrency in Practice" by Goetz et al. - Java memory model and concurrent programming
"Is Parallel Programming Hard?" by Paul McKenney - Deep dive into memory ordering
Papers
"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" by Michael and Scott
"A Scalable, Correct Time-Stamped Stack" by Dodds et al.
Batching provides the best improvement for SPSC scenarios because it directly addresses the dominant cost (memory barriers) rather than secondary factors.
Combining Optimizations
For maximum performance, combine batching with other techniques:
/** * Fully optimized batched queue combining all techniques. */@Contended // Prevent false sharing at object levelpublic class FullyOptimizedQueue<T> extends BatchedSPSCQueue<T> { // Thread affinity on construction private final int producerCore; private final int consumerCore; public FullyOptimizedQueue(int capacity, int batchSize, int producerCore, int consumerCore) { super(capacity, batchSize); this.producerCore = producerCore; this.consumerCore = consumerCore; } /** * Call from producer thread to pin affinity. */ public void initProducer() { AffinityHelper.pinToCore(producerCore); } /** * Call from consumer thread to pin affinity. */ public void initConsumer() { AffinityHelper.pinToCore(consumerCore); }}
When Each Approach Shines
Batching alone: When memory barriers are the bottleneck and you have bursty or continuous workloads.
Batching + affinity: When you need the lowest possible latency and can dedicate CPU cores.
Batching + NUMA: On multi-socket systems where memory locality matters.
All combined: For trading systems, network packet processing, and other ultra-low-latency applications.
The key is to measure first and apply optimizations that address your actual bottlenecks. Batching is often the highest-impact optimization for SPSC queues, but your mileage may vary based on workload characteristics.