The pipeline finished. All four checkpoints evaluated. Metrics written to cloud storage. Zero errors in the logs. And 68% of the results were empty strings.
Not errors. Not exceptions. Not null values with a helpful stack trace. Empty strings. The pipeline had done everything it was supposed to do -- read images, distribute them to GPU workers, run inference, collect results, write output -- and two-thirds of the results were blank. The metrics dashboard showed accuracy numbers. They were plausible. They were also computed on one-third of the actual data.
We had built a system that was confidently wrong.
This is the story of how we found and fixed 21 bugs in 9 days, turning a silently broken GPU inference pipeline into one that survives actor crashes, CUDA deadlocks, and stalled workers with zero data loss. The pipeline evaluates driving scenes through a fine-tuned vision-language model -- classifying road conditions and predicting vehicle waypoints. Safety-critical decisions. Full dataset coverage is not optional.
The tools were fine. Ray Data, vLLM, Kubernetes, experiment tracking -- none of them were broken. Every single one of the 21 bugs was in how they were integrated.
Act I: "The Pipeline Works!"
The False Sense of Security
The pipeline reported success because it was configured to tolerate failure. Somewhere in the early development, someone had set max_errored_blocks to a generous value. The reasoning was sound: if a handful of samples fail due to transient cloud storage errors, don't crash the entire 8,600-sample evaluation. Keep going. Report what you have.
The problem is that "keep going" and "silently discard 68% of your data" are the same instruction when the errors are systemic rather than transient.
The first clue was a boolean argument that wasn't being parsed correctly. The evaluation config had a flag -- something like --use_detailed_metrics -- that was being read as a string. In Python, bool("False") is True. Every string except the empty string is truthy. So the flag was always on, regardless of what you passed. This wasn't causing the 68% empty results directly, but it was a symptom of a larger disease: nobody was validating that configurations were doing what they claimed.
# The bug: bool("False") == True
use_detailed = bool(args.use_detailed_metrics) # Always True
# The fix: explicit parsing
use_detailed = args.use_detailed_metrics.lower() in ("true", "1", "yes")
A TypeError guard in the inference wrapper was catching exceptions from malformed inputs and returning empty dictionaries. Silently. No log line. No counter. Just an empty result that looked indistinguishable from a successful inference that happened to produce no output.
# The silent swallower
try:
result = engine.generate(inputs)
except TypeError:
result = {} # Looks fine. Is not fine.
This is the student-turning-in-blank-pages problem. The teacher collects 30 papers, sees 30 papers in the stack, and assumes everyone answered the questions. It isn't until you actually read them that you realize 20 are blank. Our pipeline was counting papers, not reading them.
The core insight: error tolerance without error visibility is just data loss with extra steps. The pipeline was designed to be resilient. It was resilient to the wrong things. It tolerated failures that should have been hard crashes, and it reported success when it should have reported catastrophe.
Act II: "The Machine Runs Out of Room"
Three Places Data Piles Up
In a Ray pipeline, data accumulates in three places: the driver node (the coordinator), the workers (the GPU actors), and the object store (shared memory between them). Understanding which one is drowning is the difference between a targeted fix and a week of thrashing.
Our pipeline had the architecture equivalent of an hourglass. All data flowed through a single point -- the driver -- before fanning out to workers. Here's what that looked like:
Cloud Storage (8,600 samples x 1-2 MB = ~17 GB)
│
▼
┌──────────────┐
│ Ray Driver │ ← deserializes ALL images here
│ (1 node) │ ← holds ~17 GB in Python heap
└──────┬───────┘
│ ray.data.from_items() serializes to object store
┌────┴────┬────────┐
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│ GPU │ │ GPU │ │ GPU │ 6 actors
│ W-1 │ │ W-2 │ │ W-3 │ (VLM inference)
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
└────┬────┴────────┘
▼
┌──────────────┐
│ Results │ ← take_all() pulls everything back to driver
└──────────────┘
The driver was doing all the heavy lifting: reading every sample from cloud storage, decoding every base64 image into memory, building Python dictionaries with ~2 MB of image data each, then handing the entire collection to ray.data.from_items(). That call serializes everything into the object store. For 8,600 samples at 2 MB each, that's roughly 17 GB of Python objects materialized on a single machine before any GPU touches a single pixel.
The Before: Driver-Side Materialization
def load_dataset(self, dataset_path, num_samples):
mds_dataset = StreamingDataset(streams=[stream], shuffle=False)
samples = []
for idx in range(len(mds_dataset)):
raw = mds_dataset[idx]
prompt_blocks = self._parse_images_to_base64(raw)
samples.append({
"sample_id": idx,
"prompt_blocks": prompt_blocks, # ~2 MB per sample
"target": raw["target"],
"metadata": raw["metadata"],
})
# All 17 GB lives in driver memory at this point
dataset = ray.data.from_items(samples) # 8,600 x 2 MB = OOM
return dataset
Under load, the driver ran out of memory. Python's garbage collector fought with Ray's object store for the same physical RAM. Workers received empty batches -- not because they failed, but because the objects they were trying to read had been evicted from the object store to make room for newer ones. No error. No exception. Just empty data.
The After: Worker-Side Shard Reading
def load_dataset(self, dataset_path, num_samples):
return ray.data.read_datasource(
MDSShardedDatasource(dataset_path, num_samples),
parallelism=64,
)
class MDSShardedDatasource(Datasource):
"""Each worker reads its own shard directly from cloud storage."""
def get_read_tasks(self, parallelism):
# Driver only reads index.json (~1 KB)
shard_paths = self._compute_shard_assignments(parallelism)
for shard in shard_paths:
yield ReadTask(
read_fn=lambda p=shard: self._read_mds_shard(p),
metadata=BlockMetadata(
num_rows=shard.num_samples,
size_bytes=shard.estimated_size,
),
)
The driver now reads a single index file -- roughly 1 KB -- and tells each of 64 workers which shard to pull from cloud storage. Each worker deserializes only its own portion. Memory usage on the driver drops from 17 GB to effectively nothing.
The same antipattern existed in reverse for output. results.materialize() followed by take_all() pulled every result back to the driver before writing to disk. We replaced it with write_parquet() which streams results directly from workers to cloud storage. The driver never touches the result data.
The Object Store Spill Cascade
Even after fixing the driver bottleneck, we hit a second memory wall when running multiple evaluation pipelines concurrently. Four pipelines, each preprocessing 8,600 samples with PIL Images and PyTorch tensors (not Arrow-safe, so Ray falls back to pickle serialization -- full copy per consumer):
4 pipelines x 8,600 samples x ~25 MB preprocessed = ~860 GB object store pressure
Our node had 154 GB of object store (30% of 512 GB RAM). The cascade:
- Object store hits 100%
- Ray spills to
/tmp(10-50x slower) -
/tmpfills up - Ray evicts unconsumed objects
-
max_errored_blocksabsorbs the errors - Pipeline "succeeds" with missing data
The fix was architectural: preprocess once, fan out to N inference pools via ray.put(). One copy in the object store, N readers. Object store usage dropped from hundreds of GB to single-digit GB.
Act III: "Everything Fails All the Time" -- The Eight-Fix Monday
This was the centerpiece. Eight fixes shipped in a single day, each addressing a different failure mode. Together, they form a defense-in-depth pattern -- six layers of protection between a failure and data loss.
Think of it like a substitute teacher managing a classroom of 32 students (GPU workers). Any individual student might fall asleep (stall), have a meltdown (crash), or turn in a blank worksheet (empty result). The substitute can't prevent any of these. But they can notice them, respond to them, and make sure the class still finishes the assignment.
Layer 1: Error Containment -- The _error Column
The first layer is the simplest. When any stage encounters an error, it tags the row instead of crashing.
def preprocess(self, row):
try:
return self._process(row)
except Exception as e:
row["_error"] = f"preprocess: {e}"
return row # Row survives with error tag
def postprocess(self, row):
if row.get("_error") or row.get("generated_text") is None:
row["_error"] = row.get("_error", "inference: empty response")
return row
return self._normal_postprocess(row)
Once _error is set, every downstream stage checks for it and passes the row through untouched. The row shows up in the final output with a clear failure reason. No data loss. No silent swallowing. You can grep the output for _error and see exactly which samples failed and why.
Layer 2: Per-Sample Fallback
Batch inference is fast -- 64 samples at once through the GPU. But one corrupted image in a batch of 64 can crash the entire batch. Without isolation, that one bad apple kills 63 good results.
def __call__(self, batch):
try:
return self._generate_batch(batch) # Fast path: full batch
except Exception as batch_err:
logger.warning("Batch of %d failed (%s), retrying per-sample",
len(batch["prompt"]), batch_err)
results = []
for i in range(len(batch["prompt"])):
try:
result = self._generate_single(batch, i)
results.append(result)
except Exception as sample_err:
results.append({
"generated_text": None,
"_error": f"inference: {sample_err}",
})
return self._merge_results(results)
The happy path tries the full batch. On failure, it falls back to per-sample inference. Only the broken sample gets an error tag. The other 63 flow through normally. The cost is one extra inference call per bad sample. The benefit is 63 results that would otherwise be lost.
Layer 3: Stall Watchdog
This is the one that saved us at 98% progress. A vLLM actor hit a multimodal cache bug and froze -- not crashed, not errored, frozen. A futex deadlock in the CUDA runtime. No error signal. No exit code. Just silence.
class VLMInferenceEngine:
def __init__(self, config):
self._last_progress = time.time()
self._watchdog = threading.Thread(
target=self._watchdog_loop, daemon=True
)
self._watchdog.start()
def _watchdog_loop(self):
"""Dead man's switch: if no batch completes in 120s, force-kill."""
while True:
time.sleep(30) # Check every 30 seconds
stall_seconds = time.time() - self._last_progress
if stall_seconds > 120:
logger.error(
"WATCHDOG: No progress for %ds. Force-killing actor.",
stall_seconds
)
os._exit(1) # Not sys.exit(). Not an exception. Hard kill.
def __call__(self, batch):
outputs = self.llm.generate(inputs, sampling_params)
self._last_progress = time.time() # Pet the watchdog
return self._format_outputs(outputs, batch)
Why os._exit(1) instead of sys.exit() or raising an exception? Because when you're in a CUDA deadlock, polite shutdown is not an option. sys.exit() raises SystemExit, which Python's exit handlers try to process. Those handlers can themselves deadlock on the same futex. os._exit() calls the C runtime's _exit() directly -- no handlers, no cleanup, no chance to hang. The process dies. Ray detects the death. Ray restarts the actor on a fresh node.
A train's dead man's switch requires the engineer to press a button every 30 seconds. If the engineer is incapacitated, the switch is released and the train brakes. Our watchdog requires the actor to complete a batch every 120 seconds. If it doesn't, the actor is killed and restarted. The system doesn't diagnose why the actor stopped responding. It just knows that silence is dangerous.
Layer 4: Engine Retry with Exponential Backoff
Initializing a GPU inference engine can fail for transient reasons: stale CUDA state from a previously crashed process, fragmented GPU memory, a race condition during model loading. Without retry logic, a single init failure means a permanently broken actor.
def _initialize_engine(self):
"""Retry engine init with exponential backoff and GPU cleanup."""
for attempt in range(3):
try:
self.llm = LLM(
model=self.model_path,
gpu_memory_utilization=0.85, # 10% headroom for restarts
)
self._initialized = True
logger.info("Engine initialized on attempt %d/3", attempt + 1)
return
except Exception as e:
logger.warning("Init attempt %d/3 failed: %s", attempt + 1, e)
# Aggressive GPU cleanup between attempts
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
time.sleep(10 * (2 ** attempt)) # 10s, 20s, 40s
# All retries exhausted: mark permanently dead
self._init_error = True
logger.error("Engine init failed after 3 attempts. Marking actor as dead.")
The gpu_memory_utilization=0.85 is deliberate. We dropped it from 0.95 to leave 10% headroom for cold starts after a crash. That 10% sounds wasteful until you realize it's the difference between a restart succeeding and entering a death spiral.
Layer 5: Dead Engine Fast-Path (Circuit Breaker)
Once an engine is permanently dead, every batch that arrives should be immediately returned as an error, not retried.
def __call__(self, batch):
if self._init_error:
# Don't waste time. Don't waste GPU. Fast-return errors.
return {
col: batch[col] for col in batch
} | {
"generated_text": [None] * len(batch["prompt"]),
"_error": ["engine_init_failed"] * len(batch["prompt"]),
}
if not self._initialized:
self._initialize_engine()
if self._init_error:
return self.__call__(batch) # Will hit fast-path above
return self._run_inference(batch)
This is a circuit breaker pattern. Once the circuit opens (engine dead), all requests fast-fail. Ray detects that the actor is producing only errors and stops routing new work to it. Without this, the dead actor would keep accepting batches, running through the retry logic on every single one, and returning empty results after a 70-second delay each time.
Layer 6: Pipeline-Level Fault Tolerance
All five layers above protect individual actors and samples. The sixth layer protects the pipeline itself.
ctx = ray.data.DataContext.get_current()
ctx.max_errored_blocks = 10 # Tolerate up to 10 failed blocks
ctx.actor_task_retry_on_errors = True
ctx.actor_init_retry_on_errors = True
processed = dataset.map_batches(
VLMInferenceEngine,
batch_size=64,
num_gpus=1,
compute=ActorPoolStrategy(size=6, max_tasks_in_flight_per_actor=4),
max_restarts=3, # Actor dies 3 times -> permanent failure
max_task_retries=3, # Block retried 3 times -> marked errored
)
max_restarts=3 is actor-level: a GPU that crashes three times is considered permanently unhealthy. max_task_retries=3 is block-level: a batch of data that fails on three different actors is considered poison. Together, they create a two-tier retry policy. A transiently sick GPU gets three chances to recover. A poison batch gets three tries across different actors. The combination of these six layers meant that when our actor froze at 98% progress, the watchdog killed it, Ray restarted it on a fresh node, the remaining 178 samples were processed, and the final output contained every single one of the 8,600 input samples. Zero dropped.
Act IV: "The Fleet"
With the pipeline itself hardened, we turned to the operational layer -- running dozens of evaluations across multiple checkpoints on a shared cluster. This introduced a new category of problems: not "does the pipeline work?" but "does the fleet of pipelines play nice together?"
The Metrics Deadlock
Our experiment tracking SDK had a method -- experiment.end() -- that blocks until its internal upload queue drains. Under normal conditions, this takes a few seconds. Under throttled conditions (the SDK rate-limits itself when you log too many metrics too fast), the queue never drains. The call blocks forever.
This was happening inside Ray actors. A deadlocked actor holds its GPU allocation, accepts no new work, and never exits. We were losing GPUs to a metrics library.
# Before: blocks forever under throttling
def close(self):
self.experiment.end() # Can hang for hours
# After: bounded timeout in a daemon thread
def close(self, timeout=60):
if not self.experiment:
return
self._flush_metrics_queue()
thread = threading.Thread(target=self.experiment.end, daemon=True)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
logger.warning(
"Experiment end() did not complete in %ds. "
"Server will auto-close the orphaned experiment.", timeout
)
The daemon thread ensures that if end() hangs, the actor can still exit cleanly. The tracking server auto-closes orphaned experiments after a timeout, so no data is permanently lost. We also batched all log_metrics() calls into a single call per batch (was 10 separate network round-trips), disabled auto-logging of environment info and git state, and set explicit timeouts of 60 seconds instead of the default 3,600.
Job Queuing: The Bouncer Pattern
When you submit 80 evaluation jobs to a shared Ray cluster, you need to control how many run concurrently. Without resource gating, all 80 try to start simultaneously, each requesting 6 GPUs. The cluster has 24 GPUs. Four jobs fit. The other 76 sit in PENDING state, holding driver-node resources (CPU, memory for the coordinator process) while waiting for GPUs that won't be free for hours.
# entrypoint_resources acts as a bouncer:
# each job "holds" 6 GPUs worth of admission tickets
ray.job_submission.JobSubmissionClient(
address=cluster_address
).submit_job(
entrypoint="python run_eval.py --checkpoint $CKPT",
entrypoint_resources={"GPU": 6}, # Job won't start until 6 GPUs free
runtime_env={"env_vars": {"CHECKPOINT": ckpt_path}},
)
entrypoint_resources is the bouncer at the door. Each job declares upfront how many GPUs it needs. Ray's scheduler won't start the job until those GPUs are available. Jobs queue cleanly instead of stampeding. Four jobs run concurrently on 24 GPUs. As each finishes, the next one starts. No manual orchestration.
The Version Mismatch
We lost half a day to a Ray version mismatch. The cluster was running Ray 2.38. Our container image had Ray 2.35. The job submitted successfully, the actors started, and then inference produced subtly wrong results. No crash. No version check error. Just a different code path in the data serialization layer that handled edge cases differently.
The fix was pinning the Ray version in the container image to match the cluster exactly. But the real lesson was adding a version check to the job entrypoint:
import ray
expected = "2.38.0"
actual = ray.__version__
if actual != expected:
raise RuntimeError(
f"Ray version mismatch: expected {expected}, got {actual}. "
f"Update your container image."
)
Force-Stopping Stuck Jobs
Even with all the resilience logic, some jobs get stuck in PENDING state and never start -- usually because a previous job didn't release its resources cleanly. The Ray dashboard shows the job as PENDING, but there's no built-in "force stop" button that works reliably for jobs that haven't started yet.
from ray.job_submission import JobSubmissionClient
client = JobSubmissionClient(address=cluster_address)
for job_info in client.list_jobs():
if job_info.status in ("PENDING", "RUNNING"):
elapsed = time.time() - job_info.start_time
if elapsed > 7200: # 2 hours without progress
logger.warning("Force-stopping stuck job %s", job_info.job_id)
client.stop_job(job_info.job_id)
We wrapped this in a cron job that runs every 30 minutes. Aggressive, but the alternative was waking up to find 6 GPUs held hostage by a zombie job.
Act V: "The Result"
Here is where we ended up, 9 days after finding the 68% empty results:
| Metric | Day 1 (broken) | Day 9 (fixed) |
|---|---|---|
| Results present | 2,752 / 8,600 (32%) | 8,600 / 8,600 (100%) |
| Results empty | 5,848 (68%) | 0 |
| Crash recovery | None (manual restart) | Automatic (watchdog + Ray retry) |
| Actor stall detection | None | 120s watchdog, os._exit(1)
|
| Per-sample isolation | None (one bad sample kills batch) | Batch fallback to per-sample |
| Concurrent checkpoints | 1 (manual) | 4 concurrent, 80+ queued |
| Job control | Cannot kill stuck jobs | REST API + cron reaper |
| Metrics deadlocks | Frequent (hours-long hangs) | Zero (60s timeout) |
We ran 80+ checkpoint evaluations unattended over the following week. Every single one produced a complete result set. Two actors crashed during that period -- one to a CUDA OOM, one to the same multimodal cache bug. Both were detected by the watchdog within 150 seconds, restarted by Ray, and the affected batches were reprocessed. Zero data loss in both cases.
What We Learned
The 21 fixes fell into a clear pattern. They weren't individually difficult. Most were 5-20 lines of code. The hard part was finding them, because the pipeline's error-tolerance features were actively hiding the problems.
Three principles emerged:
Reconcile relentlessly. Input count must equal output count. If they don't match, the run failed, regardless of what the logs say. We added a hard assertion at the end of every pipeline run: assert output_count == input_count.
Fail loud, recover quiet. Every failure should produce a visible signal -- a log line, an error tag, a metric increment. Recovery should be automatic and silent. The worst failure mode is one that is both invisible and unrecoverable.
Defense in depth. No single layer prevents all failures. The watchdog doesn't prevent CUDA deadlocks -- it detects and recovers from them. The per-sample fallback doesn't prevent corrupted images -- it isolates them. The circuit breaker doesn't prevent init failures -- it stops wasting time on dead actors. Each layer handles the failures that slip past the layer above it.
Production is not a feature you ship. It is a place your code lives. The weather there is harsh and unpredictable. You don't make code "production-ready" by adding a feature flag. You make it production-ready by assuming everything will fail, and building the systems to notice, contain, and recover -- automatically, silently, completely.
Our pipeline doesn't crash less than it used to. It crashes exactly as often. It just recovers now.
All code examples are simplified for clarity. The actual implementations include additional error handling, logging, and configuration.
Top comments (0)