DEV Community

DataDriven
DataDriven

Posted on

Top 12 Python Interview Problems for Data Engineers, With Answers

I've sat on both sides of the python interview table more times than I can count. Somewhere around 20+ loops in a single job search. Here's what I've learned: the problems that actually separate candidates in data engineering screens aren't LeetCode hards. They're medium-difficulty Python problems reframed around pipeline logic, messy data, and memory trade-offs.

The Python portion of a DE loop is fundamentally different from a backend SWE loop. It's a pipeline-correctness loop. You're not being asked if you can write the code. You're being asked if you'd approve this code at 3 AM on a pipeline you own. These 12 coding interview problems cover the patterns I've seen repeatedly across FAANG, Databricks, Stripe, and mid-market companies. Every one tests a concept that transfers across tools.

Want to practice these for real? Solve these problems live here with a real editor and graded solutions.

1. Parse Log Lines Into Structured Records

The question: Given a list of log strings in the format "2026-06-11 14:23:01 ERROR db_connection timeout after 30s", parse each line into a dict with keys timestamp, level, and message. Some lines are malformed (missing fields, extra whitespace). Return only valid records.

VALID_LEVELS = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'}

def parse_logs(lines):
    records = []
    for line in lines:
        parts = line.strip().split(None, 3)
        if len(parts) < 4:
            continue
        date, time_str, level, message = parts
        if level not in VALID_LEVELS:
            continue
        records.append({
            'timestamp': f'{date} {time_str}',
            'level': level,
            'message': message,
        })
    return records
Enter fullscreen mode Exit fullscreen mode

Why it matters: String parsing is the gatekeeper problem in DE screens. Interviewers hand you messy input on purpose. The trap is coding the happy path and ignoring malformed lines. Quarantining bad rows instead of crashing is the single most senior-signaling habit you can show. The follow-up is always: "What if this file is 10GB?" Answer: swap the list for a generator. Which brings us to...

2. Stream a Large File With a Generator

The question: Rewrite the log parser above to handle a 10GB file using O(1) memory. Accept a file path, yield one parsed record at a time.

def stream_logs(filepath):
    valid = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'}
    with open(filepath) as f:
        for line in f:
            parts = line.strip().split(None, 3)
            if len(parts) < 4:
                continue
            date, time_str, level, message = parts
            if level not in valid:
                continue
            yield {
                'timestamp': f'{date} {time_str}',
                'level': level,
                'message': message,
            }
Enter fullscreen mode Exit fullscreen mode

Why it matters: For 100 million integers, a list consumes ~411 MB while a generator uses 208 bytes. That's a 1.97 million-fold difference. Interviewers don't just want you to know that; they want you to reach for it instinctively. The gotcha they'll push: "What happens if you iterate this generator twice?" It's exhausted after one pass. If you need multiple passes, you need to recreate it or cache results. Knowing when not to use a generator is what separates memorization from understanding.

3. Count Event Frequencies

The question: Given a list of event strings ["click", "view", "click", "purchase", "view", "view"], return the K most frequent events and their counts.

from collections import Counter

def top_events(events, k=2):
    return Counter(events).most_common(k)
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is three lines. Most candidates write fifteen. Counter.most_common(k) uses a heap internally, giving you O(n + k log n) without building one yourself. The real test is whether you know the API exists. The follow-up: "What's the space complexity of your frequency map?" Most people say O(n). It's O(u) where u is unique elements. If your input is user click types, u is tiny; if it's session IDs, u could be enormous. That distinction matters in production.

4. Deduplicate Records by Composite Key

The question: Given a list of event dicts with keys user_id, event_type, and timestamp, deduplicate by (user_id, event_type), keeping only the record with the latest timestamp.

def dedup_latest(events):
    latest = {}
    for e in events:
        key = (e['user_id'], e['event_type'])
        if key not in latest or e['timestamp'] > latest[key]['timestamp']:
            latest[key] = e
    return list(latest.values())
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is one of the five recurring data engineering interview patterns. It tests whether you think in terms of grain. Every pipeline you'll ever build has a grain, and dedup logic follows from it. The follow-up: "What if there are 100M events?" At that scale, storing all unique keys in a dict might blow memory. Mentioning a Bloom filter for approximate dedup or partitioning by key range signals systems thinking.

5. Top K Most Frequent User IDs

The question: Given a stream of event dicts, each with a user_id field, return the K users with the most events. Do it in better than O(n log n).

import heapq
from collections import Counter

def top_k_users(events, k):
    counts = Counter(e['user_id'] for e in events)
    return heapq.nlargest(k, counts.items(), key=lambda x: x[1])
Enter fullscreen mode Exit fullscreen mode

Why it matters: Naive sorting is O(n log n). A heap gives you O(n log k). When k is 10 and n is 50 million, that's not academic; it's the difference between your job finishing and your job timing out. But here's the real test: candidates who can recite "use a min-heap for top K" often can't explain why it beats sorting when k is small. Pattern memorization is a liability if you can't reason through the trade-off when the interviewer changes k to n/2.

6. Multi-Key Sort

The question: Sort a list of task dicts by priority (ascending) then by created_at (descending) for ties.

def sort_tasks(tasks):
    return sorted(tasks, key=lambda t: (t['priority'], -t['created_at']))
Enter fullscreen mode Exit fullscreen mode

Why it matters: Python's sorted() is stable and accepts tuple keys. This is the standard pattern for rank aggregation in interview questions and in production. The negative sign trick only works on numeric fields; for strings, you'd need a second sorted() call leveraging stability. Interviewers push: "What if created_at is an ISO string?" Then you parse it or reverse the sort with a two-pass approach. Knowing that Python's sort is stable isn't trivia; it's what makes multi-key sorting work without a custom comparator.

7. Sliding Window: Maximum Throughput

The question: Given a list of timestamps (sorted integers representing seconds), find the maximum number of events in any 60-second window.

def max_throughput(timestamps, window=60):
    max_count = 0
    left = 0
    for right in range(len(timestamps)):
        while timestamps[right] - timestamps[left] >= window:
            left += 1
        max_count = max(max_count, right - left + 1)
    return max_count
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is O(n) instead of the O(n^2) brute force. Sliding window isn't just an algorithm pattern; it's how you monitor pipeline throughput, detect burst traffic, and build alerting logic. The follow-up is usually about sessionization: "Now group events into sessions where any gap > 30 minutes starts a new session." Same pointer mechanics, different business logic.

8. Group and Aggregate With Plain Python

The question: Given records with department and salary, compute the average salary per department without pandas.

from collections import defaultdict

def avg_salary_by_dept(records):
    totals = defaultdict(lambda: [0, 0])
    for r in records:
        totals[r['department']][0] += r['salary']
        totals[r['department']][1] += 1
    return {dept: s / c for dept, (s, c) in totals.items()}
Enter fullscreen mode Exit fullscreen mode

Why it matters: Candidates who reach for pandas in a vanilla Python round are telling the interviewer they can't think without a framework. The defaultdict pattern here is the same logic under the hood of GROUP BY. And the most common failure mode in pandas groupby? Using apply() instead of agg(), which is 10-100x slower. Interviewers want to see that you understand the mechanics, not just the API.

9. Flatten Nested JSON

The question: Given a nested dict like {"a": 1, "b": {"c": 2, "d": {"e": 3}}}, flatten it to {"a": 1, "b.c": 2, "b.d.e": 3}.

def flatten_json(obj, prefix=''):
    out = {}
    for k, v in obj.items():
        key = f'{prefix}.{k}' if prefix else k
        if isinstance(v, dict):
            out.update(flatten_json(v, key))
        else:
            out[key] = v
    return out
Enter fullscreen mode Exit fullscreen mode

Why it matters: Every data engineer has dealt with APIs that return arbitrarily nested JSON. Real Databricks scenarios demand handling inconsistent fields, shifting data types, and partial records. The recursive approach is clean but blows the stack on deeply nested docs. The follow-up: "Make this iterative." That's where you use a stack. Bonus points if you mention that production systems should schema-validate before flattening, not after.

10. Merge Two Sorted Iterators

The question: Given two sorted iterators of integers, merge them into a single sorted iterator without loading either into memory.

def merge_sorted(iter_a, iter_b):
    a = next(iter_a, None)
    b = next(iter_b, None)
    while a is not None and b is not None:
        if a <= b:
            yield a
            a = next(iter_a, None)
        else:
            yield b
            b = next(iter_b, None)
    while a is not None:
        yield a
        a = next(iter_a, None)
    while b is not None:
        yield b
        b = next(iter_b, None)
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is the merge step of merge sort, but it's also how you combine two sorted log files, two time-series feeds, or two partitions of a distributed pipeline. O(n + m) time, O(1) memory. The iterator-in, iterator-out pattern is exactly what coaching platforms mean when they say "rewrite every problem to accept an iterator and return an iterator." If your first instinct is sorted(list(a) + list(b)), you just loaded both files into memory. That's a red flag.

11. Validate and Quarantine Bad Rows

The question: Given a CSV-like list of row strings with the header "name,age,email", parse each row, validate that age is a positive integer and email contains @, and return two lists: valid records and quarantined rows with the reason.

def validate_rows(rows):
    valid, quarantined = [], []
    for i, row in enumerate(rows):
        fields = row.split(',')
        if len(fields) != 3:
            quarantined.append((i, row, 'wrong field count'))
            continue
        name, age_str, email = (f.strip() for f in fields)
        try:
            age = int(age_str)
            assert age > 0
        except (ValueError, AssertionError):
            quarantined.append((i, row, 'invalid age'))
            continue
        if '@' not in email:
            quarantined.append((i, row, 'invalid email'))
            continue
        valid.append({'name': name, 'age': age, 'email': email})
    return valid, quarantined
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is the problem that exposes whether you've shipped production pipelines. A candidate who writes a five-line happy-path parser and says "done" hasn't touched real CSV files. Quoted commas, multi-line fields, non-standard date formats; these aren't gotchas, they're the job. The follow-up: "Should bad rows halt the pipeline or get logged?" The answer is always logged. Dead-letter queues exist for a reason.

12. Chunk a File for Parallel Processing

The question: Write a generator that reads a file in chunks of N lines, yielding each chunk as a list. The last chunk may be smaller than N.

def chunk_file(filepath, n=1000):
    chunk = []
    with open(filepath) as f:
        for line in f:
            chunk.append(line.rstrip('\n'))
            if len(chunk) == n:
                yield chunk
                chunk = []
    if chunk:
        yield chunk
Enter fullscreen mode Exit fullscreen mode

Why it matters: This is pandas.read_csv(chunksize=N) from scratch. Interviewers want to see that you understand the mechanics underneath the convenience function. It's also the foundation for fan-out parallelism: yield chunks, distribute to workers. The follow-up hits the real complexity: "What if records span multiple lines?" Now you need a stateful parser, and the problem stops being about chunking and starts being about protocol awareness.

The actual job is less "write a DAG" and more "figure out why this pipeline silently dropped 2M rows last Tuesday." Nobody interviews for that. They interview for Python and SQL. These 12 problems are the closest the coding interview gets to the real thing.

Every one of these problems tests a concept, not a tool. Data modeling, memory trade-offs, grain awareness, error handling as architecture. The syntax is the easy part. If you can explain why you chose a generator over a list, why a heap beats a sort for small K, why you quarantine instead of crash; you're demonstrating the thinking that gets you hired.

What problem would you add to this list? I'm curious what patterns are showing up in your loops that aren't covered here.

Top comments (0)