It was a chilly Tuesday in November, precisely at 9:17 AM, when the disaster struck. I remember the exact moment because I was mid-sip of my third espresso when the alerts started cascading across my monitoring dashboard like a waterfall of red.
Our high-frequency trading system had been designed to handle 10,000 trades per second. We had tested it, benchmarked it, stress-tested it. But on this particular morning, earnings season combined with unexpected Fed commentary had pushed us to 15,000 trades per second. And that's when everything started to fall apart.
The symptoms were insidious at first. Latency spikes. Not the kind you notice immediately - we're talking milliseconds in a world where microseconds matter. But those milliseconds started compounding. Trades were queuing up. Market data was getting stale. And worst of all, our telemetry system - the very tool we relied on to diagnose problems - had become the problem itself.
I pulled up the flame graphs, and there it was: our telemetry capture was blocking. Not just any blocking - synchronized blocking with wait/notify semantics. Every time our metrics buffer filled up, producer threads would park, waiting for the consumer to drain data. In a high-frequency system, that's not a minor inconvenience. It's a catastrophe.
Here's what I saw in the profiler:
Thread State Analysis (9:17:23 AM - 9:17:28 AM):
BLOCKED: 47.3% (synchronized wait on MetricsBuffer.lock)
RUNNABLE: 31.2% (actual work)
WAITING: 18.4% (condition.await in telemetry)
TIMED_WAIT: 3.1% (other)
Nearly half our CPU time was spent waiting on a lock. Not processing trades. Not computing risk. Waiting. For telemetry. The very metrics we needed to observe the system were causing the system to fail.
The irony wasn't lost on me. We had built an observability layer that made the system unobservable under stress - the exact moment when observability matters most.
I pulled up the telemetry code:
View source
public class BlockingMetricsBuffer {
private final Object[] buffer;
private int head = 0;
private int tail = 0;
private final Object lock = new Object();
private final int capacity;
public void record(MetricEvent event) {
synchronized (lock) {
while (isFull()) {
try {
lock.wait(); // BLOCKING!
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
buffer[head] = event;
head = (head + 1) % capacity;
lock.notifyAll();
}
}
}
There it was. synchronized. wait(). notifyAll(). The classic producer-consumer pattern that works beautifully for most applications. But for telemetry in a high-frequency system? It was a death sentence.
Have you ever seen a race car pit crew at work? Every movement is measured and precise, every tool positioned for maximum efficiency. Imagine if one of the crew members started blocking the others, creating unnecessary delays that cost precious seconds. This is exactly what happened to our system. The telemetry system, which was meant to help us, had become a bottleneck. And in the world of high-frequency trading, a bottleneck means lost opportunities - and lost money.
By 10:30 AM, I had a temporary fix in place: I disabled telemetry entirely. The system stabilized, but we were flying blind. No metrics. No visibility. No way to know if something else was going wrong.
That afternoon, I sat down with my team and we had an uncomfortable conversation. We needed to rebuild our telemetry from the ground up. Not just faster - fundamentally different. We needed a system that would never block, even if it meant accepting some data loss.
What followed was a two-week deep dive into wait-free algorithms that would change how I think about observability forever.
Before diving into the solution, let's understand why this problem is so insidious and why conventional approaches fail in high-frequency systems.
In quantum physics, the Heisenberg uncertainty principle tells us that the act of measuring a particle changes its behavior. Software observability has an analogous problem: the act of measuring a system can change its performance characteristics.
Consider what happens when you record a metric:
- Memory allocation: Creating a metric event object
- Synchronization: Coordinating between producer and consumer threads
- Memory barriers: Ensuring visibility across CPU cores
- I/O operations: Eventually writing metrics to storage or network
Each of these operations has a cost. And in a system processing millions of events per second, those costs multiply.
Cost per metric record (naive implementation):
| Operation | Cost |
|---|
| Object allocation | 20-50ns |
| Synchronized block (uncontended) | 50-200ns |
| Synchronized block (contended!) | 2,000-10,000ns |
| Memory barriers | 10-40ns |
| Total (uncontended) | 80-290ns |
| Total (contended) | 2,080-10,290ns |
At 15,000 trades/second with 10 metrics/trade (150,000 metric records/second):
- Uncontended: 12-43ms CPU time/second (acceptable)
- Contended: 312-1,543ms CPU time/second (DISASTER)
Under light load, the system appears healthy. Under heavy load - exactly when you need observability most - the observability layer becomes the bottleneck.
The obvious suggestion is to use Java's ConcurrentLinkedQueue or a similar lock-free queue. But these still don't solve our fundamental problem:
// Still problematic for high-frequency telemetry
ConcurrentLinkedQueue<MetricEvent> queue = new ConcurrentLinkedQueue<>();
public void record(MetricEvent event) {
queue.offer(event); // Lock-free, but...
}
The problems:
-
Unbounded memory: ConcurrentLinkedQueue is unbounded. Under sustained high load, it will grow until you hit an OutOfMemoryError.
-
Allocation pressure: Every offer() allocates a new node object. At 150,000 operations/second, that's 150,000 allocations/second - pure fuel for the garbage collector.
-
GC pauses: When the GC runs, all threads pause. In a trading system, a 50ms GC pause can cost real money.
-
No backpressure: If the consumer can't keep up, the queue grows without bound. There's no way to shed load.
Bounded blocking queues like ArrayBlockingQueue solve the memory problem but introduce blocking:
ArrayBlockingQueue<MetricEvent> queue = new ArrayBlockingQueue<>(10000);
public void record(MetricEvent event) {
try {
queue.put(event); // BLOCKS if full!
} catch (InterruptedException e) {
// ...
}
}
Now we're back to our original problem. When the queue fills up, producers block. In a trading system, a blocked thread means missed market data, delayed orders, and real financial losses.
What about offer() instead of put()?
public void record(MetricEvent event) {
if (!queue.offer(event)) {
// Queue full - what now?
droppedMetrics.increment();
}
}
This doesn't block, but it still has problems:
- Contention: Multiple producers still contend on the queue's internal lock
- Allocation:
ArrayBlockingQueue may still allocate internally
- False sharing: The head and tail pointers may share a cache line, causing performance degradation
What we need is something fundamentally different: a data structure designed specifically for high-frequency telemetry where we accept data loss as a feature, not a bug.
Here's the key insight that changed everything: telemetry data has different semantics than business data.
For a trade order:
- Every order MUST be processed
- Order of execution matters
- Data loss is unacceptable
- Blocking to ensure delivery is acceptable
For a telemetry metric:
- Statistical accuracy matters, not individual events
- Recent data is more valuable than old data
- Losing some data under load is acceptable
- Blocking to record metrics is NOT acceptable
This difference in requirements allows us to make fundamentally different design choices. We can build a system that:
- Never blocks producers - ever, under any circumstances
- Overwrites old data when the buffer is full (recent data is more valuable)
- Uses fixed memory - no allocations after initialization
- Provides wait-free guarantees - every operation completes in bounded steps
Before we build our solution, let's establish a solid foundation for what "wait-free" actually means and why it matters for telemetry.
Concurrent algorithms are classified by their progress guarantees:
Blocking: If a thread holding a lock is paused (preempted, page faulted, or crashed), all other threads waiting for that lock are stuck indefinitely. This is what our original telemetry had.
Obstruction-Free: A thread will complete its operation if it runs alone, without interference from other threads. However, competing threads can cause indefinite retry loops.
Lock-Free: The system as a whole always makes progress. At least one thread will complete its operation in a finite number of steps. However, individual threads might starve under pathological conditions.
Wait-Free: Every thread completes its operation in a bounded number of steps, regardless of what other threads are doing. This is the strongest guarantee.
Consider what happens during a system crisis:
Normal operation:
- 4 telemetry producer threads
- 1 consumer thread
- Low contention
- All algorithms perform similarly
Crisis scenario:
- 4 telemetry producers + 20 business threads all recording metrics
- Consumer overwhelmed, buffer filling up
- High contention on shared state
- OS scheduler under pressure
With blocking telemetry:
- Producers block waiting for buffer space
- Business threads (sharing thread pool) get delayed
- System performance degrades
- Telemetry shows nothing (threads are blocked!)
With lock-free telemetry:
- Most producers make progress
- Some producers may spin-wait
- Under extreme contention, individual threads may starve
- System performance partially preserved
With wait-free telemetry:
- EVERY producer completes in bounded time
- No thread ever waits or spins indefinitely
- System performance guaranteed
- Even under maximum load, telemetry keeps flowing
The wait-free guarantee means that no matter how chaotic things get, every metric recording operation will complete in a bounded, predictable amount of time. This is crucial for observability: we need to see what's happening, especially when things are going wrong.
Wait-free algorithms typically have higher complexity and sometimes higher constant-factor overhead than lock-free algorithms. Why? Because they must handle worst-case scenarios that rarely occur in practice.
A lock-free algorithm might look like:
while (true) {
int current = head.get();
if (head.compareAndSet(current, current + 1)) {
return current; // Got a slot!
}
// CAS failed, retry
}
This loop could theoretically run forever if other threads keep winning the CAS race. In practice, this almost never happens - but "almost never" isn't good enough for a telemetry system that needs to remain responsive during a crisis.
A wait-free algorithm must guarantee completion regardless of other threads:
int mySlot = head.getAndIncrement(); // Always succeeds in one step!
// But now we need to handle overflow...
The challenge shifts from "win the race" to "handle the consequences." As we'll see, our wait-free telemetry buffer handles overflow by overwriting old data - a trade-off that makes sense for metrics.
Let's examine what we're replacing. Understanding the problems with the blocking approach will clarify why each design decision in our wait-free buffer matters.
public class BlockingCircularBuffer<T> {
private final Object[] buffer;
private final int capacity;
// Head: next position to write
// Tail: next position to read
private int head = 0;
private int tail = 0;
private int count = 0;
private final Object lock = new Object();
public BlockingCircularBuffer(int capacity) {
this.capacity = capacity;
this.buffer = new Object[capacity];
}
public void put(T element) throws InterruptedException {
synchronized (lock) {
// Wait while buffer is full
while (count == capacity) {
lock.wait(); // BLOCKING POINT
}
buffer[head] = element;
head = (head + 1) % capacity;
count++;
lock.notifyAll();
}
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
synchronized (lock) {
// Wait while buffer is empty
while (count == 0) {
lock.wait(); // BLOCKING POINT
}
T element = (T) buffer[tail];
buffer[tail] = null; // Help GC
tail = (tail + 1) % capacity;
count--;
lock.notifyAll();
return element;
}
}
public int size() {
synchronized (lock) {
return count;
}
}
}
This is textbook correct. It handles all edge cases: empty buffer, full buffer, multiple producers, single consumer. The synchronized block ensures mutual exclusion, and wait()/notifyAll() handle the blocking semantics.
Let's trace through a high-contention scenario with 4 producer threads:
Time Thread-1 Thread-2 Thread-3 Thread-4
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
0ns acquire lock blocked blocked blocked
50ns check full? WAITING WAITING WAITING
55ns write buffer WAITING WAITING WAITING
60ns update head WAITING WAITING WAITING
65ns notifyAll WAITING WAITING WAITING
70ns release lock waking up... waking up... waking up...
// Context switch storm begins
3070ns --- acquire lock blocked blocked
3120ns --- check full? WAITING WAITING
3175ns --- write buffer WAITING WAITING
3230ns --- release lock waking up... waking up...
// And so on...
The actual work (write to buffer, update head) takes about 20-30 nanoseconds. But the synchronization overhead dominates:
- Lock acquisition: 50-200ns uncontended, 2000-5000ns contended
- Context switch: 1000-10000ns each
- notifyAll: Wakes ALL waiting threads, causing thundering herd
- Cache line bouncing: Lock state bounces between CPU cores
Under our peak load scenario:
150,000 metric records/second
4 producer threads + 1 consumer thread
Buffer capacity: 10,000 entries
Observed behavior:
Lock acquisitions/second: 150,000+
Context switches/second: 50,000+
Average time in synchronized: 200-300ns (uncontended)
Average time in synchronized: 2,000-8,000ns (contended!)
Total synchronization overhead: 300ms-1,200ms per second
This leaves only 700ms-(-200ms) for actual work!
Yes, that's negative time for actual work. The system was spending more time on coordination than existed in a second. Threads were queueing up faster than they could be processed.
Beyond the blocking, there's a hidden allocation problem:
// Each wait() call may allocate:
// - AbstractQueuedSynchronizer$Node for wait queue
// - Condition queue nodes
// - Thread state objects
// Under high contention:
// 4 threads * 150,000 ops/sec * potential allocations =
// Hundreds of thousands of small allocations per second
I ran an allocation profiler during our incident:
Hot allocation sites (1 second sample):
2,847,234 bytes: j.u.c.locks.AbstractQueuedSynchronizer$Node
892,456 bytes: MetricEvent objects
234,567 bytes: Various internal structures
Total: 3.97 MB/second from synchronization alone
Almost 4 MB/second of allocation just for locking infrastructure. This is pure fuel for the garbage collector, and it was triggering young generation collections every 2-3 seconds.
Using JMH to establish concrete numbers:
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class BlockingBufferBenchmark {
private BlockingCircularBuffer<Long> buffer;
private AtomicLong counter;
@Setup
public void setup() {
buffer = new BlockingCircularBuffer<>(10000);
counter = new AtomicLong();
// Start consumer thread
Thread consumer = new Thread(() -> {
while (!Thread.interrupted()) {
try {
buffer.take();
} catch (InterruptedException e) {
break;
}
}
});
consumer.setDaemon(true);
consumer.start();
}
@Benchmark
@Group("producers")
@GroupThreads(4)
public void produce() throws InterruptedException {
buffer.put(counter.incrementAndGet());
}
}
Results on Intel Xeon E5-2680 (14 cores, 2.4 GHz):
Benchmark Mode Cnt Score Error Units
BlockingBufferBenchmark.produce sample 1000 287.43 ± 23.12 ns/op
BlockingBufferBenchmark.produce:p50 sample 198.00 ns/op
BlockingBufferBenchmark.produce:p90 sample 412.00 ns/op
BlockingBufferBenchmark.produce:p99 sample 1847.00 ns/op
BlockingBufferBenchmark.produce:p99.9 sample 12456.00 ns/op
BlockingBufferBenchmark.produce:max sample 89234.00 ns/op
Throughput: ~3.5 million operations/second
The median of 198ns looks acceptable. But look at the tail:
- p99 is 1.8 microseconds - 9x the median
- p99.9 is 12.5 microseconds - 63x the median
- Maximum observed was 89 microseconds - 450x the median
This variance is the killer. In a trading system, that 89-microsecond outlier could mean missing a price update that costs thousands of dollars.
Now let's build something better. Our wait-free telemetry buffer is based on several key design principles that emerge from understanding the problem deeply.
This is the fundamental insight. For telemetry, we prefer:
- Guaranteed low latency over complete data capture
- Recent data over historical data
- System stability over metric completeness
When the buffer is full, we don't wait. We overwrite the oldest data. This is the trade-off that enables wait-free operation.
Instead of coordinating between head and tail with locks, we use atomic operations that always succeed:
// Traditional (lock-free but not wait-free):
while (true) {
int current = head.get();
if (head.compareAndSet(current, current + 1)) {
break; // Might loop forever under contention
}
}
// Wait-free approach:
int myPosition = head.getAndIncrement(); // Always succeeds in one step!
The difference is profound. compareAndSet can fail and require retry. getAndIncrement always succeeds - it's wait-free by definition.
When a producer detects that it's about to overwrite unconsumed data, it advances the tail pointer automatically:
long writePosition = head.getAndIncrement();
int index = (int) (writePosition & mask);
// Check if we're about to overwrite unread data
long currentTail = tail.get();
if (writePosition - currentTail >= capacity) {
// We've caught up to (or passed) the tail
// Advance the tail to make room
tail.compareAndSet(currentTail, writePosition - capacity + 1);
}
// Now write - this slot is ours
buffer[index] = element;
This is the magic that enables wait-free overflow handling. We don't wait for the consumer to catch up - we just advance the tail and accept the data loss.
To ensure correct memory ordering without locks, we use VarHandle operations with explicit memory ordering semantics:
private static final VarHandle HEAD;
private static final VarHandle TAIL;
private static final VarHandle BUFFER;
static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
HEAD = lookup.findVarHandle(WaitFreeBuffer.class, "head", long.class);
TAIL = lookup.findVarHandle(WaitFreeBuffer.class, "tail", long.class);
BUFFER = MethodHandles.arrayElementVarHandle(Object[].class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
VarHandle provides fine-grained control over memory ordering:
getOpaque() / setOpaque(): No reordering with other opaque operations
getAcquire() / setRelease(): Acquire-release semantics
getVolatile() / setVolatile(): Full sequential consistency
For our telemetry buffer, we can use weaker ordering for better performance while maintaining correctness.
To prevent false sharing, we pad critical fields to occupy separate cache lines:
// Padding before head
long p01, p02, p03, p04, p05, p06, p07;
private volatile long head;
// Padding between head and tail
long p11, p12, p13, p14, p15, p16, p17;
private volatile long tail;
// Padding after tail
long p21, p22, p23, p24, p25, p26, p27;
Each cache line is 64 bytes on modern x86-64 CPUs. Seven long values (56 bytes) plus the actual field (8 bytes) ensures each critical field occupies its own cache line.
Now let's build the complete wait-free telemetry buffer with detailed commentary.
View source
package com.trading.telemetry;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
/**
* Wait-Free Overwrite Buffer for High-Frequency Telemetry
*
* Design principles:
* 1. Wait-free guarantee: Every operation completes in bounded steps
* 2. Overwrite semantics: Old data is overwritten when buffer is full
* 3. Zero allocation: No allocations after initialization
* 4. Cache-line padding: Prevents false sharing between producers/consumer
*
* Performance characteristics:
* - Producer: ~10-20ns per write (wait-free)
* - Consumer: ~15-25ns per read
* - Throughput: 50+ million ops/sec with 4 producers
*
* Trade-offs:
* - Data loss under heavy load (by design)
* - Consumer may see gaps in sequence
* - Not suitable for ordered/reliable messaging
*
* @param <T> Element type stored in the buffer
*/
public class WaitFreeOverwriteBuffer<T> {
// ========== VarHandle Setup ==========
private static final VarHandle HEAD;
private static final VarHandle TAIL;
private static final VarHandle BUFFER;
static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
HEAD = lookup.findVarHandle(
WaitFreeOverwriteBuffer.class, "head", long.class);
TAIL = lookup.findVarHandle(
WaitFreeOverwriteBuffer.class, "tail", long.class);
BUFFER = MethodHandles.arrayElementVarHandle(Object[].class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
// ========== Cache Line Padding for Head ==========
// 7 longs = 56 bytes padding before head
@SuppressWarnings("unused")
private long p01, p02, p03, p04, p05, p06, p07;
/**
* Next position for producers to claim.
* Monotonically increasing; wraps via modulo for array access.
* Uses getAndIncrement for wait-free slot claiming.
*/
private volatile long head = 0;
// 7 longs = 56 bytes padding between head and tail
@SuppressWarnings("unused")
private long p11, p12, p13, p14, p15, p16, p17;
// ========== Cache Line Padding for Tail ==========
/**
* Next position for consumer to read.
* May be advanced by producers during overflow.
* Consumer is sole reader; producers may advance via CAS.
*/
private volatile long tail = 0;
// 7 longs = 56 bytes padding after tail
@SuppressWarnings("unused")
private long p21, p22, p23, p24, p25, p26, p27;
// ========== Buffer Storage ==========
/** Pre-allocated buffer array. Size is always power of 2. */
private final Object[] buffer;
/** Buffer capacity (power of 2 for fast modulo). */
private final int capacity;
/** Bit mask for index calculation: capacity - 1. */
private final int mask;
// ========== Metrics (optional, for monitoring) ==========
/** Count of items written. */
private volatile long writtenCount = 0;
/** Count of items overwritten before being read. */
private volatile long overwrittenCount = 0;
// ========== Constructor ==========
/**
* Creates a new wait-free overwrite buffer.
*
* @param requestedCapacity Minimum capacity (will be rounded up to power of 2)
* @throws IllegalArgumentException if capacity less than 2
*/
public WaitFreeOverwriteBuffer(int requestedCapacity) {
if (requestedCapacity < 2) {
throw new IllegalArgumentException(
"Capacity must be at least 2, got: " + requestedCapacity);
}
this.capacity = roundUpToPowerOf2(requestedCapacity);
this.mask = this.capacity - 1;
this.buffer = new Object[this.capacity];
}
private static int roundUpToPowerOf2(int value) {
int highBit = Integer.highestOneBit(value);
return (highBit == value) ? value : highBit << 1;
}
// ========== Producer Operations ==========
/**
* Records a telemetry event to the buffer.
*
* WAIT-FREE GUARANTEE: This method always completes in bounded time,
* regardless of other threads or buffer state.
*
* If the buffer is full, the oldest unread data will be overwritten.
* This is by design - for telemetry, recent data is more valuable.
*
* @param element The element to record (should not be null)
*/
public void record(T element) {
// Step 1: Claim a position (WAIT-FREE - always succeeds immediately)
// getAndIncrement is a single atomic operation that cannot fail
long writePosition = (long) HEAD.getAndAdd(this, 1L);
// Step 2: Calculate array index using bitwise AND (faster than modulo)
int index = (int) (writePosition & mask);
// Step 3: Check for overflow condition
// If writePosition has caught up to tail, we're overwriting unread data
long currentTail = (long) TAIL.getOpaque(this);
if (writePosition - currentTail >= capacity) {
// Overflow detected! We're about to overwrite unread data.
// Advance the tail to "forget" the oldest entries.
// Calculate where tail should be to make room for our write
long newTail = writePosition - capacity + 1;
// Try to advance tail. Use CAS because:
// - Multiple overflow producers might race
// - Consumer might have already advanced tail
// - We only advance if tail hasn't moved past our target
long observedTail = currentTail;
while (observedTail < newTail) {
// Try to advance tail
if (TAIL.compareAndSet(this, observedTail, newTail)) {
// Successfully advanced tail
// Count how many entries we skipped
overwrittenCount += (newTail - observedTail);
break;
}
// CAS failed - someone else advanced tail
// Re-read and check if we still need to advance
observedTail = (long) TAIL.getOpaque(this);
}
}
// Step 4: Write the element
// Use setRelease to ensure the write is visible to the consumer
// after it sees the tail advance past this position
BUFFER.setRelease(buffer, index, element);
// Step 5: Update metrics
writtenCount++;
}
/**
* Batch record multiple elements.
* More efficient than individual record() calls due to reduced overhead.
*
* @param elements Array of elements to record
* @param offset Starting offset in the array
* @param length Number of elements to record
*/
public void recordBatch(T[] elements, int offset, int length) {
for (int i = 0; i < length; i++) {
record(elements[offset + i]);
}
}
// ========== Consumer Operations ==========
/**
* Retrieves and removes the next element from the buffer.
*
* This method is designed for SINGLE CONSUMER use only.
* Using multiple consumers will result in data corruption.
*
* @return The next element, or null if buffer is empty
*/
@SuppressWarnings("unchecked")
public T poll() {
// Read current positions
long currentTail = tail;
long currentHead = (long) HEAD.getOpaque(this);
// Check if buffer is empty
if (currentTail >= currentHead) {
return null;
}
// Calculate array index
int index = (int) (currentTail & mask);
// Read the element with acquire semantics
// This ensures we see all writes that happened before the producer's release
T element = (T) BUFFER.getAcquire(buffer, index);
// Clear the slot to help GC (optional, but recommended)
BUFFER.setRelease(buffer, index, null);
// Advance tail (plain write is safe - single consumer)
tail = currentTail + 1;
return element;
}
/**
* Drains all available elements to the provided consumer function.
* More efficient than repeated poll() calls.
*
* @param consumer Function to process each element
* @return Number of elements drained
*/
@SuppressWarnings("unchecked")
public int drain(java.util.function.Consumer<T> consumer) {
long currentTail = tail;
long currentHead = (long) HEAD.getOpaque(this);
int count = 0;
while (currentTail < currentHead) {
int index = (int) (currentTail & mask);
T element = (T) BUFFER.getAcquire(buffer, index);
if (element != null) {
consumer.accept(element);
BUFFER.setRelease(buffer, index, null);
count++;
}
currentTail++;
}
// Batch update tail
tail = currentTail;
return count;
}
/**
* Drains up to maxElements to the provided consumer.
* Useful for rate-limited processing.
*
* @param consumer Function to process each element
* @param maxElements Maximum number of elements to drain
* @return Number of elements actually drained
*/
@SuppressWarnings("unchecked")
public int drainTo(java.util.function.Consumer<T> consumer, int maxElements) {
long currentTail = tail;
long currentHead = (long) HEAD.getOpaque(this);
long available = currentHead - currentTail;
int toDrain = (int) Math.min(available, maxElements);
for (int i = 0; i < toDrain; i++) {
int index = (int) (currentTail & mask);
T element = (T) BUFFER.getAcquire(buffer, index);
if (element != null) {
consumer.accept(element);
BUFFER.setRelease(buffer, index, null);
}
currentTail++;
}
tail = currentTail;
return toDrain;
}
// ========== Query Operations ==========
/**
* Returns approximate size of unread elements.
* May be stale due to concurrent modifications.
*/
public int size() {
long currentHead = (long) HEAD.getOpaque(this);
long currentTail = tail;
long size = currentHead - currentTail;
if (size < 0) return 0;
if (size > capacity) return capacity;
return (int) size;
}
/**
* Returns true if buffer appears empty.
*/
public boolean isEmpty() {
return (long) HEAD.getOpaque(this) <= tail;
}
/**
* Returns the buffer's capacity.
*/
public int capacity() {
return capacity;
}
/**
* Returns total number of items written since creation.
*/
public long getWrittenCount() {
return writtenCount;
}
/**
* Returns number of items overwritten before being read.
*/
public long getOverwrittenCount() {
return overwrittenCount;
}
/**
* Returns the data loss ratio (overwritten / written).
* A high ratio indicates the consumer can't keep up.
*/
public double getDataLossRatio() {
long written = writtenCount;
if (written == 0) return 0.0;
return (double) overwrittenCount / written;
}
}
Why getAndAdd instead of compareAndSet?
// Wait-free (always succeeds in one step)
long writePosition = (long) HEAD.getAndAdd(this, 1L);
// vs. Lock-free (might retry indefinitely)
while (true) {
long current = head;
if (HEAD.compareAndSet(this, current, current + 1)) {
break;
}
}
getAndAdd is a single atomic operation that always succeeds. It's implemented as a single CPU instruction (LOCK XADD on x86-64). There's no possibility of retry or spinning - every producer gets a unique position in one step.
Why do we use getOpaque for reading positions?
long currentTail = (long) TAIL.getOpaque(this);
getOpaque provides "opaque" memory ordering - it prevents the compiler from caching the value, but doesn't impose the full memory barrier of a volatile read. For our use case:
- We need a fresh value (can't use a cached local variable)
- We don't need acquire semantics (we're not coordinating with specific writes)
- The slight staleness is acceptable (we'll see the update soon enough)
This is faster than getVolatile while still being correct for our algorithm.
Why do we need the overflow CAS loop?
while (observedTail < newTail) {
if (TAIL.compareAndSet(this, observedTail, newTail)) {
break;
}
observedTail = (long) TAIL.getOpaque(this);
}
Multiple producers might overflow simultaneously and all try to advance the tail. The CAS loop ensures:
- Only one producer's advance "wins"
- The tail moves forward monotonically
- We don't accidentally move tail backward
But note: this loop is bounded! Each producer only executes the loop body a limited number of times because:
- Each CAS failure means another producer advanced tail
- Eventually,
observedTail >= newTail and we exit
- The maximum iterations equals the number of concurrent overflow producers
Why setRelease for writing elements?
BUFFER.setRelease(buffer, index, element);
Release semantics ensure that all writes before this store are visible to any thread that subsequently reads this location with acquire semantics. In our case:
- Producer writes the element with release
- Consumer reads with acquire
- Consumer is guaranteed to see the complete element, not a partially constructed object
Understanding memory ordering is crucial for lock-free programming. Let's examine exactly what guarantees our buffer provides and why.
The Java Memory Model (JMM) defines how threads interact through memory. Key concepts:
-
Happens-before relationship: If action A happens-before action B, then A's effects are visible to B.
-
Synchronization actions: volatile reads/writes, lock acquire/release, thread start/join establish happens-before relationships.
-
Reordering: The compiler and CPU may reorder operations for performance, as long as they respect happens-before.
Our buffer uses three types of memory access:
// 1. Plain access - no barriers
long localCopy = someField;
someField = newValue;
// 2. Opaque access - prevents compiler caching
long fresh = (long) VARHANDLE.getOpaque(this);
// 3. Release/Acquire - establishes happens-before
VARHANDLE.setRelease(this, value); // Writer
value = (long) VARHANDLE.getAcquire(this); // Reader
Let's trace through a write-read cycle:
The release-acquire pairing ensures:
- Consumer at [B] sees the element written at [3]
- Consumer also sees any object fields written before [3]
- No data races or torn reads
We could declare the buffer array elements as volatile:
private volatile Object[] buffer; // Doesn't help - array reference is volatile
// but elements are not!
But even if we could make elements volatile, it would be overkill:
- Volatile has full sequential consistency overhead
- We only need producer-to-consumer visibility
- Release/acquire is sufficient and faster
On x86-64, the difference is small (x86 has strong memory ordering), but on ARM or other weakly-ordered architectures, the performance difference can be significant.
The trickiest part of our algorithm is the overflow handling:
Producer A and Producer B both overflow simultaneously (Capacity = 1024):
| State | Producer A | Producer B |
|---|
| writePos | 50000 | 50001 |
| currentTail | 48900 | 48900 |
| distance | 1100 >= 1024 = OVERFLOW | 1101 >= 1024 = OVERFLOW |
| newTail | 50000 - 1024 + 1 = 48977 | 50001 - 1024 + 1 = 48978 |
Now both producers try to advance tail from 48900 to their respective targets:
A: CAS(48900, 48977) → SUCCESS (A advanced tail)
B: CAS(48900, 48978) → FAIL (tail is now 48977, not 48900)
B: Re-read tail = 48977
B: Is 48977 < 48978? YES
B: CAS(48977, 48978) → SUCCESS (B advances tail further)
The result: tail ends up at 48978, which is correct - it's past both producers' write positions, ensuring neither overwrites active data.
Let's validate our claims with rigorous benchmarks.
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 2)
@Measurement(iterations = 10, time = 5)
@Fork(value = 3, jvmArgs = {
"-Xms4g", "-Xmx4g",
"-XX:+UseG1GC",
"-XX:+AlwaysPreTouch"
})
@State(Scope.Benchmark)
public class TelemetryBufferBenchmark {
@Param({"1", "4", "8", "16"})
private int producerCount;
private BlockingCircularBuffer<Long> blockingBuffer;
private WaitFreeOverwriteBuffer<Long> waitFreeBuffer;
private AtomicLong counter;
private volatile boolean running;
private Thread consumerThread;
@Setup(Level.Trial)
public void setup() {
blockingBuffer = new BlockingCircularBuffer<>(65536);
waitFreeBuffer = new WaitFreeOverwriteBuffer<>(65536);
counter = new AtomicLong();
running = true;
// Background consumer that drains both buffers
consumerThread = new Thread(() -> {
while (running) {
try {
blockingBuffer.take();
} catch (InterruptedException e) {
break;
}
waitFreeBuffer.poll();
}
});
consumerThread.start();
}
@TearDown(Level.Trial)
public void teardown() throws InterruptedException {
running = false;
consumerThread.interrupt();
consumerThread.join(1000);
}
@Benchmark
@Group("blocking")
@GroupThreads(4)
public void blockingRecord() throws InterruptedException {
blockingBuffer.put(counter.incrementAndGet());
}
@Benchmark
@Group("waitfree")
@GroupThreads(4)
public void waitFreeRecord() {
waitFreeBuffer.record(counter.incrementAndGet());
}
}
4 Producer Threads:
| Metric | Blocking Buffer | Wait-Free Buffer | Improvement |
|---|
| Mean | 287ns | 18ns | 15.9x |
| p50 | 198ns | 14ns | 14.1x |
| p90 | 412ns | 23ns | 17.9x |
| p99 | 1,847ns | 42ns | 44.0x |
| p99.9 | 12,456ns | 89ns | 140.0x |
| Max | 89,234ns | 234ns | 381.3x |
The wait-free buffer is 15-20x faster on average, but the tail latency improvement is even more dramatic - over 100x better at p99.9!
Scaling with Producer Count:
| Producers | Blocking (mean) | Wait-Free (mean) | Improvement |
|---|
| 1 | 98ns | 12ns | 8.2x |
| 4 | 287ns | 18ns | 15.9x |
| 8 | 534ns | 24ns | 22.3x |
| 16 | 1,123ns | 38ns | 29.6x |
As contention increases, the wait-free buffer maintains consistent performance while the blocking buffer degrades dramatically.
| Producers | Blocking (ops/sec) | Wait-Free (ops/sec) | Improvement |
|---|
| 1 | 10.2M | 83.3M | 8.2x |
| 4 | 13.9M | 222.2M | 16.0x |
| 8 | 15.0M | 333.3M | 22.2x |
| 16 | 14.2M | 421.1M | 29.7x |
The wait-free buffer scales nearly linearly with producer count, while the blocking buffer saturates around 15M ops/sec due to lock contention.
Latency Distribution (4 producers, log scale)
Blocking Buffer:
10ns |
20ns |
50ns |█
100ns |████████████████
200ns |██████████████████████████████████████████████ (peak)
500ns |████████████████████
1μs |████████
2μs |███
5μs |██
10μs |█
50μs+ |█ (tail)
Wait-Free Buffer:
10ns |████████████████████████ (peak)
20ns |██████████████████████████████████████████████████████
50ns |████
100ns |█
200ns+ | (rare outliers)
The wait-free distribution is tightly clustered around 10-20ns with minimal tail, while the blocking buffer has a long tail extending into tens of microseconds.
5-minute sustained load test with 4 producers at 100,000 ops/sec:
Blocking Buffer:
- Young GC events: 234
- Total GC pause: 4,120ms
- Average pause: 17.6ms
- Max pause: 156ms
- Allocation rate: 4.2 MB/sec
Wait-Free Buffer:
- Young GC events: 8
- Total GC pause: 120ms
- Average pause: 15ms
- Max pause: 21ms
- Allocation rate: 0.1 MB/sec (only metric event objects)
The wait-free buffer generates 97% less allocation and 30x fewer GC events. The maximum GC pause dropped from 156ms to 21ms - critical for latency-sensitive systems.
Using perf stat on Linux:
Blocking Buffer (4 producers):
L1-dcache-load-misses: 1,247,891,234
LLC-load-misses: 23,456,789
cycles per operation: ~690
Wait-Free Buffer (4 producers):
L1-dcache-load-misses: 187,234,567
LLC-load-misses: 3,123,456
cycles per operation: ~45
The wait-free buffer has 85% fewer L1 cache misses due to:
- Cache line padding preventing false sharing
- No lock state cache bouncing
- Predictable memory access patterns
Building the buffer is only half the battle. Deploying it successfully requires careful consideration of operational concerns.
The buffer accepts data loss as a trade-off. You need to monitor this:
public class TelemetryMonitor {
private final WaitFreeOverwriteBuffer<?> buffer;
private final ScheduledExecutorService scheduler;
private long lastWrittenCount = 0;
private long lastOverwrittenCount = 0;
public TelemetryMonitor(WaitFreeOverwriteBuffer<?> buffer) {
this.buffer = buffer;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
public void start() {
scheduler.scheduleAtFixedRate(this::checkHealth,
1, 1, TimeUnit.SECONDS);
}
private void checkHealth() {
long currentWritten = buffer.getWrittenCount();
long currentOverwritten = buffer.getOverwrittenCount();
long writtenDelta = currentWritten - lastWrittenCount;
long overwrittenDelta = currentOverwritten - lastOverwrittenCount;
double lossRate = writtenDelta > 0
? (double) overwrittenDelta / writtenDelta
: 0.0;
if (lossRate > 0.01) { // > 1% loss
logger.warn("Telemetry data loss: {:.2f}% ({} of {} events)",
lossRate * 100, overwrittenDelta, writtenDelta);
}
if (lossRate > 0.10) { // > 10% loss
alerting.fire("TELEMETRY_HIGH_DATA_LOSS",
"Loss rate: " + lossRate * 100 + "%");
}
// Export metrics for dashboards
metrics.gauge("telemetry.write_rate", writtenDelta);
metrics.gauge("telemetry.loss_rate", lossRate);
metrics.gauge("telemetry.buffer_size", buffer.size());
lastWrittenCount = currentWritten;
lastOverwrittenCount = currentOverwritten;
}
}
Buffer size is a critical tuning parameter:
Too small:
- High data loss under load
- Consumer can't keep up
- Metrics gaps during spikes
Too large:
- Higher memory usage
- Longer drain times
- Older data when backlogged
Sizing formula:
buffer_size = peak_write_rate * acceptable_latency * safety_factor
Example:
peak_write_rate = 200,000 events/sec
acceptable_latency = 100ms (time for consumer to catch up)
safety_factor = 2x
buffer_size = 200,000 * 0.1 * 2 = 40,000 entries
Round up to power of 2: 65,536 (64K entries)
Pattern 1: Dedicated Consumer Thread
public class TelemetryConsumer implements Runnable {
private final WaitFreeOverwriteBuffer<MetricEvent> buffer;
private final MetricsSink sink;
private volatile boolean running = true;
@Override
public void run() {
while (running) {
int drained = buffer.drain(event -> {
try {
sink.record(event);
} catch (Exception e) {
// Log but don't crash - telemetry shouldn't kill the system
logger.warn("Failed to record metric", e);
}
});
if (drained == 0) {
// Buffer empty - back off to reduce CPU
LockSupport.parkNanos(100_000); // 100μs
}
}
}
public void stop() {
running = false;
}
}
Pattern 2: Batch Processing
public class BatchingConsumer {
private static final int BATCH_SIZE = 1000;
private final List<MetricEvent> batch = new ArrayList<>(BATCH_SIZE);
public void processBatch(WaitFreeOverwriteBuffer<MetricEvent> buffer) {
batch.clear();
buffer.drainTo(batch::add, BATCH_SIZE);
if (!batch.isEmpty()) {
// Process entire batch together - more efficient
metricsSink.recordBatch(batch);
}
}
}
Pattern 3: Sampling Under Load
public class SamplingConsumer {
private final WaitFreeOverwriteBuffer<MetricEvent> buffer;
private final Random random = ThreadLocalRandom.current();
public void process() {
int size = buffer.size();
// If buffer is getting full, sample instead of draining everything
double sampleRate = size > buffer.capacity() * 0.8
? 0.1 // Sample 10% when buffer is 80%+ full
: 1.0; // Process everything otherwise
buffer.drain(event -> {
if (random.nextDouble() < sampleRate) {
sink.record(event);
}
});
}
}
For maximum performance, pin threads to specific CPU cores:
// Using Java Thread Affinity library
AffinityLock producerLock = AffinityLock.acquireCore();
try {
// Producer runs on dedicated core
while (running) {
buffer.record(generateEvent());
}
} finally {
producerLock.release();
}
Or via taskset on Linux:
# Pin producer JVM to cores 0-3
taskset -c 0-3 java -jar producer.jar
# Pin consumer JVM to core 4
taskset -c 4 java -jar consumer.jar
Recommended JVM flags for wait-free telemetry:
java \
-Xms4g -Xmx4g \ # Fixed heap size
-XX:+UseZGC \ # Low-latency GC
-XX:+AlwaysPreTouch \ # Pre-fault heap pages
-XX:-UseBiasedLocking \ # Disable biased locking
-XX:+UseNUMA \ # NUMA awareness
-XX:+PerfDisableSharedMem \ # Disable perf shared memory
-Djava.lang.Integer.IntegerCache.high=10000 \ # Larger integer cache
-jar telemetry.jar
Real telemetry systems handle multiple event types. Here's how to extend the buffer:
public class TypedMetricEvent {
public enum Type { COUNTER, GAUGE, HISTOGRAM, TIMER }
private final Type type;
private final String name;
private final long value;
private final long timestamp;
private final String[] tags;
// Use object pooling to avoid allocation
private static final ThreadLocal<TypedMetricEvent> POOL =
ThreadLocal.withInitial(TypedMetricEvent::new);
public static TypedMetricEvent acquire() {
return POOL.get();
}
public TypedMetricEvent set(Type type, String name, long value, String... tags) {
this.type = type;
this.name = name;
this.value = value;
this.timestamp = System.nanoTime();
this.tags = tags;
return this;
}
}
To eliminate all contention, use per-thread buffers:
public class ShardedTelemetryBuffer<T> {
private final WaitFreeOverwriteBuffer<T>[] shards;
private final ThreadLocal<Integer> shardIndex;
@SuppressWarnings("unchecked")
public ShardedTelemetryBuffer(int shardCount, int shardCapacity) {
shards = new WaitFreeOverwriteBuffer[shardCount];
for (int i = 0; i < shardCount; i++) {
shards[i] = new WaitFreeOverwriteBuffer<>(shardCapacity);
}
AtomicInteger counter = new AtomicInteger();
shardIndex = ThreadLocal.withInitial(() ->
counter.getAndIncrement() % shardCount);
}
public void record(T element) {
// Each thread writes to its own shard - zero contention!
shards[shardIndex.get()].record(element);
}
public void drainAll(java.util.function.Consumer<T> consumer) {
for (WaitFreeOverwriteBuffer<T> shard : shards) {
shard.drain(consumer);
}
}
}
For metrics that need aggregation, combine with time windows:
public class WindowedAggregator {
private final WaitFreeOverwriteBuffer<MetricEvent> buffer;
private final Duration windowSize;
private final Map<String, LongAdder> currentWindow = new ConcurrentHashMap<>();
private volatile long windowStart = System.currentTimeMillis();
public void recordCounter(String name, long delta) {
buffer.record(new MetricEvent(name, delta, System.nanoTime()));
}
public void aggregate() {
long now = System.currentTimeMillis();
if (now - windowStart >= windowSize.toMillis()) {
// Window complete - emit aggregates
Map<String, Long> snapshot = new HashMap<>();
currentWindow.forEach((k, v) -> snapshot.put(k, v.sumThenReset()));
emitAggregates(snapshot);
windowStart = now;
}
// Drain events into current window
buffer.drain(event -> {
currentWindow
.computeIfAbsent(event.getName(), k -> new LongAdder())
.add(event.getValue());
});
}
}
While our buffer never blocks producers, we can signal backpressure to allow upstream adjustment:
public class BackpressureAwareTelemetry {
private final WaitFreeOverwriteBuffer<MetricEvent> buffer;
private final AtomicBoolean backpressureSignal = new AtomicBoolean(false);
public void record(MetricEvent event) {
buffer.record(event);
// Signal backpressure when buffer is getting full
double utilization = (double) buffer.size() / buffer.capacity();
backpressureSignal.set(utilization > 0.8);
}
public boolean isBackpressured() {
return backpressureSignal.get();
}
// Upstream can use this to shed load
public void recordIfNotBackpressured(MetricEvent event) {
if (!isBackpressured()) {
record(event);
}
// Silently drop if backpressured
}
}
Let's be honest about what we're giving up with this approach.
- Guaranteed Low Latency: 10-20ns per record, regardless of load
- Wait-Free Progress: Every operation completes in bounded steps
- Zero Allocation: No GC pressure from telemetry
- Linear Scaling: Performance improves with more producers
- Predictable Behavior: No tail latency surprises
- Data Completeness: Old data is overwritten under load
- Ordering Guarantees: Events may appear out of order (different producers)
- Single Consumer: Multiple consumers would corrupt state
- Memory Efficiency: Fixed buffer size, even when lightly loaded
- Complexity: More difficult to reason about than blocking queues
Use it when:
- Observability must never impact system performance
- Data loss under extreme load is acceptable
- Latency consistency matters more than average throughput
- You need to observe the system during crisis scenarios
- GC pauses are unacceptable
Don't use it when:
- Every metric event must be captured (audit logs)
- You need strict ordering of events
- Multiple consumers need to process the same data
- The complexity isn't justified by your performance requirements
- Your team isn't comfortable with lock-free programming
| Characteristic | Blocking Queue | Lock-Free Queue | Wait-Free Buffer |
|---|
| Progress Guarantee | Blocking | Lock-free | Wait-free |
| Worst-case Latency | Unbounded | Unbounded (rare) | Bounded |
| Average Latency | 200-300ns | 50-100ns | 10-20ns |
| Data Loss | Never | Never | Under load |
| Ordering | FIFO | FIFO | Approximate |
| Consumers | Multiple | Multiple | Single |
| Memory | Fixed or growing | Growing | Fixed |
| GC Pressure | High | Medium | Zero |
| Complexity | Low | Medium | High |
That Tuesday morning crisis taught me something fundamental about observability: the observer must never become the observed problem.
Our original telemetry system was technically correct. It guaranteed delivery of every metric. It maintained strict ordering. It was simple to understand. And it was completely useless when we needed it most - during a crisis, it became part of the crisis.
The wait-free telemetry buffer we built represents a different philosophy:
-
Availability over Consistency: It's better to have approximate metrics than no metrics at all.
-
Recent over Complete: Recent data is more valuable than historical data when debugging live issues.
-
Performance over Guarantees: The system's primary function must never be compromised by observability.
-
Bounded over Unbounded: Predictable resource usage, even under extreme load.
The results validated this philosophy:
- 15-20x lower average latency (287ns to 18ns)
- 100x+ better tail latency (12.5us to 89ns at p99.9)
- 97% reduction in GC pressure
- Linear scaling with producer count
But perhaps the most important result isn't in the numbers. It's that during subsequent high-load events, we could actually see what was happening. The telemetry kept flowing. The dashboards stayed updated. We could diagnose issues in real-time instead of piecing together logs after the fact.
-
Measure first, then optimize. We found the problem through profiling, not guessing.
-
Question your assumptions. "Telemetry must capture every event" seemed obvious - until it wasn't.
-
Understand the trade-offs. Wait-free isn't universally better. Know what you're giving up.
-
Design for the worst case. Systems fail under load. Your observability layer shouldn't.
-
Hardware matters. Cache lines, memory ordering, atomic operations - these are real constraints that affect real performance.
The 2AM alerts that started this journey were painful. But they led to a telemetry system that now handles 400+ million events per second without impacting the trading system at all. More importantly, it keeps working when everything else is falling apart.
Next time you build a telemetry system, ask yourself: will this help me or hurt me during a crisis? If you can't confidently answer "help," it might be time to reconsider your approach.
Producer Protocol (WAIT-FREE):
- getAndIncrement head (always succeeds)
- Calculate index = head & mask
- Check overflow (writePos - tail >= capacity)
- If overflow: CAS advance tail
- Write element with release semantics
Consumer Protocol:
- Read tail and head
- If tail >= head: return null (empty)
- Read element with acquire semantics
- Clear slot
- Advance tail
Key Properties:
- Producers: wait-free (bounded steps)
- Consumer: single-threaded (no sync needed)
- Overflow: old data overwritten (by design)
- Memory: zero allocation after init
| Metric | Blocking | Wait-Free | Improvement |
|---|
| Mean Latency | ~287ns | ~18ns | 15.9x |
| p99.9 Latency | ~12.5us | ~89ns | 140x |
| Throughput (4P) | ~14M/s | ~222M/s | 16x |
| GC Allocation | 4.2 MB/s | 0.1 MB/s | 42x less |
// Creating the buffer
WaitFreeOverwriteBuffer<MetricEvent> buffer =
new WaitFreeOverwriteBuffer<>(65536);
// Recording metrics (from any thread)
buffer.record(new MetricEvent("request.latency", latencyNs));
// Consuming metrics (single consumer thread)
buffer.drain(event -> sink.record(event));
// Monitoring health
double lossRate = buffer.getDataLossRatio();
if (lossRate > 0.01) {
alert("Telemetry data loss: " + (lossRate * 100) + "%");
}
- Herlihy & Shavit, "The Art of Multiprocessor Programming" (2008)
- Michael & Scott, "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" (1996)
- Kogan & Petrank, "Wait-Free Queues with Multiple Enqueuers and Dequeuers" (2011)
- VarHandle Javadoc - java.lang.invoke.VarHandle
- JMM Specification - JSR-133
package com.trading.benchmark;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 2)
@Measurement(iterations = 10, time = 5)
@Fork(value = 3, jvmArgs = {"-Xms4g", "-Xmx4g", "-XX:+UseG1GC"})
@State(Scope.Benchmark)
public class FullTelemetryBenchmark {
@Param({"4", "8", "16"})
private int producers;
@Param({"16384", "65536", "262144"})
private int bufferSize;
private BlockingCircularBuffer<Long> blockingBuffer;
private WaitFreeOverwriteBuffer<Long> waitFreeBuffer;
private AtomicLong counter;
private volatile boolean running;
private Thread consumer;
@Setup(Level.Trial)
public void setup() {
blockingBuffer = new BlockingCircularBuffer<>(bufferSize);
waitFreeBuffer = new WaitFreeOverwriteBuffer<>(bufferSize);
counter = new AtomicLong();
running = true;
consumer = new Thread(() -> {
while (running) {
try {
blockingBuffer.take();
} catch (InterruptedException e) {
break;
}
waitFreeBuffer.poll();
}
});
consumer.setDaemon(true);
consumer.start();
}
@TearDown(Level.Trial)
public void teardown() throws InterruptedException {
running = false;
consumer.interrupt();
consumer.join(1000);
}
@Benchmark
@Group("blocking")
@GroupThreads(4)
public void blockingWrite(Blackhole bh) throws InterruptedException {
bh.consume(blockingBuffer.put(counter.incrementAndGet()));
}
@Benchmark
@Group("waitfree")
@GroupThreads(4)
public void waitFreeWrite(Blackhole bh) {
waitFreeBuffer.record(counter.incrementAndGet());
bh.consume(true);
}
@Benchmark
@Group("blocking_throughput")
@GroupThreads(4)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void blockingThroughput() throws InterruptedException {
blockingBuffer.put(counter.incrementAndGet());
}
@Benchmark
@Group("waitfree_throughput")
@GroupThreads(4)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void waitFreeThroughput() {
waitFreeBuffer.record(counter.incrementAndGet());
}
}
Building wait-free data structures is notoriously error-prone. Here are the most common mistakes I've seen (and made) along with how to avoid them.
The most insidious bugs in lock-free code come from incorrect memory ordering assumptions.
// WRONG: No memory ordering guarantee
buffer[index] = element;
head = newHead; // Consumer might see new head before element is written!
// CORRECT: Release semantics ensure element is visible before head update
BUFFER.setRelease(buffer, index, element);
// Or if using volatile:
buffer[index] = element;
VarHandle.releaseFence();
head = newHead; // Now correctly ordered
Debugging tip: If you see occasional data corruption that's hard to reproduce, suspect memory ordering issues. Add explicit barriers and see if the problem disappears. Then carefully analyze which barriers are actually necessary.
Our overflow detection compares positions, but positions wrap around. Consider:
// Dangerous if positions can wrap
if (writePosition - currentTail >= capacity) {
// Overflow detected
}
With 32-bit integers, wraparound happens after ~4 billion operations. At 100 million ops/sec, that's 40 seconds. At 1 million ops/sec, it's about 70 minutes. Either way, it will happen in production.
Solution: Use 64-bit long for positions. At 100 million ops/sec, wraparound takes 5,800 years.
private volatile long head = 0; // 64-bit, practically never wraps
private volatile long tail = 0;
Adding monitoring counters can accidentally destroy performance:
// WRONG: Counters share cache line with hot fields
private volatile long head;
private volatile long writtenCount; // False sharing!
private volatile long overwrittenCount; // False sharing!
private volatile long tail;
// CORRECT: Pad counters onto separate cache lines
private volatile long head;
long p1, p2, p3, p4, p5, p6, p7; // Padding
private volatile long writtenCount;
long p8, p9, p10, p11, p12, p13, p14;
private volatile long overwrittenCount;
long p15, p16, p17, p18, p19, p20, p21;
private volatile long tail;
Debugging tip: Use perf or similar tools to measure L1 cache misses. High miss rates often indicate false sharing.
Even though we have a "single consumer," it's easy to accidentally violate this:
// DANGEROUS: Two places that might call poll()
scheduler.scheduleAtFixedRate(() -> {
while (buffer.poll() != null) { /* process */ }
}, 0, 100, TimeUnit.MILLISECONDS);
// Somewhere else, on shutdown:
void shutdown() {
while (buffer.poll() != null) { /* drain remaining */ }
}
If both execute simultaneously, you get data corruption.
Solution: Enforce single consumer through design:
public class TelemetryConsumer {
private final AtomicBoolean running = new AtomicBoolean(true);
private final Thread consumerThread;
public TelemetryConsumer(WaitFreeOverwriteBuffer<?> buffer) {
consumerThread = new Thread(() -> {
while (running.get()) {
buffer.drain(this::process);
LockSupport.parkNanos(100_000);
}
// Final drain on shutdown
buffer.drain(this::process);
});
consumerThread.start();
}
public void shutdown() {
running.set(false);
consumerThread.interrupt();
try {
consumerThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
If your buffer allows null elements, the consumer can't distinguish "empty slot" from "null data":
// Problem: null could mean empty OR actual null value
T element = (T) BUFFER.getAcquire(buffer, index);
if (element == null) {
return null; // Is buffer empty, or was null stored?
}
Solutions:
- Disallow nulls (our approach):
public void record(T element) {
if (element == null) {
throw new NullPointerException("Null elements not permitted");
}
// ...
}
- Use a wrapper:
private static final Object EMPTY = new Object();
// Store wrapper instead of raw value
buffer[index] = (element == null) ? NULL_WRAPPER : element;
- Track slot state separately:
private final long[] slotState; // Parallel array tracking filled/empty
It's not enough to test that the buffer works - you need to verify the wait-free guarantee:
@Test
void verifyWaitFreeProperty() {
WaitFreeOverwriteBuffer<Long> buffer = new WaitFreeOverwriteBuffer<>(1024);
AtomicLong maxOperationTime = new AtomicLong(0);
// Run many operations and track worst-case time
for (int i = 0; i < 10_000_000; i++) {
long start = System.nanoTime();
buffer.record((long) i);
long elapsed = System.nanoTime() - start;
maxOperationTime.updateAndGet(max -> Math.max(max, elapsed));
}
// Wait-free means bounded worst case
// Anything over 1us suggests blocking or unbounded spinning
assertTrue(maxOperationTime.get() < 1000,
"Worst case time " + maxOperationTime.get() + "ns exceeds wait-free bound");
}
Let me share how we actually deployed this in production - the good, the bad, and the lessons learned.
We didn't replace the old telemetry immediately. Instead, we ran both systems in parallel:
public class DualTelemetryRecorder {
private final BlockingCircularBuffer<MetricEvent> oldBuffer;
private final WaitFreeOverwriteBuffer<MetricEvent> newBuffer;
private final AtomicLong oldCount = new AtomicLong();
private final AtomicLong newCount = new AtomicLong();
public void record(MetricEvent event) {
// Always record to new buffer (never blocks)
newBuffer.record(event);
newCount.incrementAndGet();
// Try to record to old buffer (might block)
try {
if (oldBuffer.offer(event, 1, TimeUnit.MILLISECONDS)) {
oldCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
This let us:
- Verify the new buffer was capturing the same metrics
- Measure performance difference in production
- Build confidence before cutting over
Results from shadow mode:
- Old buffer: 2.3% of records timed out under load
- New buffer: 0.8% data loss (overwritten, not blocked)
- Latency improvement: 12x better p99
We used feature flags to gradually shift traffic:
public class TelemetryRouter {
private final WaitFreeOverwriteBuffer<MetricEvent> newBuffer;
private final BlockingCircularBuffer<MetricEvent> oldBuffer;
private volatile int newBufferPercentage = 0; // 0-100
private final Random random = ThreadLocalRandom.current();
public void record(MetricEvent event) {
if (random.nextInt(100) < newBufferPercentage) {
newBuffer.record(event);
} else {
try {
oldBuffer.put(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Called by feature flag system
public void setNewBufferPercentage(int percentage) {
this.newBufferPercentage = Math.max(0, Math.min(100, percentage));
}
}
Rollout schedule:
- Day 1: 5% to new buffer
- Day 2: 25% to new buffer
- Day 3: 50% to new buffer
- Day 4: 90% to new buffer
- Day 5: 100% to new buffer
At each stage, we monitored:
- Trading system latency
- Metric completeness
- Data loss rates
- GC behavior
Once fully deployed, we discovered several things that needed tuning:
Issue 1: Buffer too small during market open
Market open (9:30 AM) generates 10x normal volume for about 5 minutes. Our initial 64K buffer was losing 15% of data.
Solution: Increased to 256K buffer. Data loss dropped to 0.3%.
Issue 2: Consumer thread starvation
Under extreme load, the consumer thread was getting CPU-starved, causing the buffer to fill up.
Solution: Pinned consumer to dedicated CPU core:
taskset -c 7 java -jar telemetry-consumer.jar
Issue 3: Metrics aggregation bottleneck
The consumer was doing aggregation inline, which slowed it down:
// SLOW: Aggregation inline with consumption
buffer.drain(event -> {
String key = event.getName() + ":" + Arrays.toString(event.getTags());
aggregates.computeIfAbsent(key, k -> new LongAdder()).add(event.getValue());
});
Solution: Separate aggregation to batch processing:
// FAST: Just collect, aggregate later
List<MetricEvent> batch = new ArrayList<>(10000);
buffer.drainTo(batch::add, 10000);
// Aggregate in bulk (more cache-friendly)
for (MetricEvent event : batch) {
// ... aggregation logic
}
After two months of tuning, our final configuration:
telemetry:
buffer:
capacity: 262144 # 256K entries
type: wait-free-overwrite
consumer:
thread-affinity: core-7
drain-batch-size: 10000
idle-park-nanos: 100000 # 100us
monitoring:
loss-alert-threshold: 0.01 # 1%
loss-critical-threshold: 0.05 # 5%
jvm:
gc: ZGC
heap: 8g
flags:
- -XX:+AlwaysPreTouch
- -XX:-UseBiasedLocking
After 6 months in production:
| Metric | Before | After | Improvement |
|---|
| Telemetry p99 latency | 2.3ms | 89ns | 25,000x |
| Telemetry data loss | 2.3% (blocking timeout) | 0.2% (overwritten) | Similar, but no blocking! |
| Trading p99 latency | 1.2ms | 0.8ms | 1.5x |
| GC pauses | 34ms avg | 12ms avg | 2.8x |
| Production incidents | 3/month | 0/month | Eliminated |
The most important improvement isn't in the numbers - it's that we can now actually observe the system during high-load events. When the market goes crazy, we see what's happening in real-time instead of reconstructing events from sparse logs afterward.
This article is part of the Lock-Free in Java series. See the companion repository at https://github.com/techishthoughts-org/off_heap_algorithms for complete source code, benchmarks, and additional examples.