Comprehensive Guide to Queues
1. Foundational Understanding
1.1 What is a Queue?
A queue is a linear data structure that follows the First-In-First-Out (FIFO) principle, where the first element added is the first one to be removed. Think of it as a line of people waiting: the person who arrives first is served first.
Key Characteristics:
- FIFO ordering: First element inserted is first to be removed
- Two-end access: Insertion at rear (back), deletion at front
- Sequential processing: Elements processed in arrival order
- Two primary operations: Enqueue (add) and Dequeue (remove)
Real-World Analogies:
- Line at a ticket counter
- Print queue in operating systems
- Customer service call waiting
- Cars at a toll booth
- Playlist in a music player
- Task queue in a CPU scheduler
1.2 Why Use Queues?
Problem-Solving Benefits:
- Order preservation: Maintains arrival order for fair processing
- Buffering: Handles speed mismatch between producer and consumer
- Scheduling: Fair resource allocation (first-come, first-served)
- BFS traversal: Level-by-level graph/tree exploration
- Asynchronous processing: Decouples task generation from execution
When to Use Queues:
- Task scheduling and job processing
- Breadth-First Search (BFS) algorithms
- Request handling in servers
- Print job management
- Message passing between processes
- Cache implementation (LRU with queue)
- Resource sharing (printer, CPU)
- Event handling systems
When NOT to Use Queues:
- Need LIFO ordering (use stack)
- Need random access (use array/list)
- Need priority-based processing (use priority queue/heap)
- Need access to middle elements (use array/deque)
1.3 Queue Operations Overview
Core Operations:
class Queue:
"""Basic queue interface"""
def enqueue(self, item):
"""Add item to rear - O(1)"""
pass
def dequeue(self):
"""Remove and return front item - O(1)"""
pass
def front(self):
"""View front item without removing - O(1)"""
pass
def is_empty(self):
"""Check if queue is empty - O(1)"""
pass
def size(self):
"""Get number of elements - O(1)"""
pass
Visual Representation:
Enqueue operations: Dequeue operations:
Enqueue(1) Enqueue(2) Enqueue(3) Dequeue() Dequeue()
Front → [ ] ← Rear Front → [2][3] ← Rear
↓ ↓
Front → [1] ← Rear Front → [3] ← Rear
↓
Front → [1][2] ← Rear
↓
Front → [1][2][3] ← Rear
1.4 Classification of Queue Topics
By Implementation:
- Array-based queue: Fixed or dynamic size using arrays
- Linked list-based queue: Dynamic size using linked nodes
- Circular queue: Array with wrap-around (efficient space usage)
By Functionality:
- Simple queue: Standard FIFO operations
- Circular queue: Fixed-size with circular indexing
- Priority queue: Elements have priorities (heap-based)
- Deque: Double-ended queue (insert/delete both ends)
By Application Domain:
- BFS traversal: Level-order tree/graph traversal
- Task scheduling: CPU scheduling, job queues
- Sliding window: Maximum/minimum in window
- Monotonic queue: Maintains elements in order
- Multi-structure: Queue using stacks, stack using queues
1.5 Queue Variants
Standard Queue:
Front → [1][2][3][4] ← Rear
Enqueue at rear, dequeue at front
Circular Queue:
[3][4]
↑ ↓
| |
[2] [5] ← Rear
↑
Front
Wrap-around when reaching end
Priority Queue:
Front → [1(p3)][5(p2)][7(p1)] ← Rear
Higher priority elements at front
Deque (Double-Ended Queue):
Front ⇄ [1][2][3][4] ⇄ Rear
Can enqueue/dequeue from both ends
Monotonic Queue:
Front → [4][3][2] ← Rear
Maintains monotonic decreasing order
1.6 Queue in Different Contexts
In Operating Systems:
- Process scheduling: Ready queue, waiting queue
- I/O requests: Disk I/O queue, printer queue
- Network packets: Router queue buffering
- Interrupt handling: Interrupt queue
In Algorithms:
- BFS traversal: Level-order exploration
- Shortest path: BFS for unweighted graphs
- Cache replacement: FIFO cache policy
- Producer-consumer: Buffer between threads
In System Design:
- Message queues: RabbitMQ, Kafka, SQS
- Task queues: Celery, Redis Queue
- Request handling: Web server request queue
- Rate limiting: Token bucket with queue
1.7 Historical Context and Evolution
Early Computing Era (1940s-1950s):
The concept of queues emerged from the need to manage sequential task processing in early computers. The first documented use of queue-like structures appeared in:
- 1945: John von Neumann's EDVAC report described sequential processing
- 1946: The ENIAC used input/output buffers (primitive queues)
- 1951: UNIVAC I implemented job scheduling using queue principles
Theoretical Foundation (1950s-1960s):
Key Contributions:
1. 1951: Alan Turing formalized FIFO behavior in "Programmable Digital Computers"
2. 1957: Queuing Theory developed by Agner Krarup Erlang
3. 1960: Circular buffer concept introduced by IBM for the IBM 7090
4. 1962: Priority queues formalized in scheduling algorithms
Example of Early Queue Usage:
# Simulating early punch card job processing (1960s style)
class PunchCardQueue:
"""
Simulates how mainframe computers processed batch jobs
Jobs submitted via punch cards were queued for processing
"""
def __init__(self):
self.job_queue = []
self.job_counter = 0
def submit_job(self, job_name, cards):
"""Submit a job (deck of punch cards)"""
self.job_counter += 1
job = {
'id': self.job_counter,
'name': job_name,
'cards': cards,
'submit_time': self.job_counter
}
self.job_queue.append(job)
print(f"Job #{job['id']} '{job_name}' submitted with {cards} cards")
def process_next_job(self):
"""Process next job in queue (FIFO)"""
if not self.job_queue:
print("No jobs in queue")
return None
job = self.job_queue.pop(0)
wait_time = self.job_counter - job['submit_time']
print(f"Processing Job #{job['id']}: '{job['name']}' (waited {wait_time} time units)")
return job
# Historical example
mainframe = PunchCardQueue()
mainframe.submit_job("Payroll Calculation", 500)
mainframe.submit_job("Inventory Report", 300)
mainframe.submit_job("Sales Analysis", 750)
mainframe.process_next_job() # Processes Payroll (first in)
Modern Evolution (1970s-Present):
- 1970s: Introduction of circular queues in operating systems
- 1980s: Priority queues become standard in CPU scheduling
- 1990s: Message queues emerge for distributed systems
- 2000s: Lock-free concurrent queues for multi-core processors
- 2010s: Distributed message queues (Kafka, RabbitMQ) at scale
- 2020s: Queue-based microservices and event-driven architectures
1.8 Mathematical Foundations
Set Theory Representation:
A queue Q can be formally defined as:
- Q = (E, ≤, front, rear) where:
- E is a finite set of elements
- ≤ is a total ordering relation
- front: Q → E returns the oldest element
- rear: Q → E returns the newest element
Sequence Notation:
Queue as ordered sequence: Q = ⟨e₁, e₂, e₃, ..., eₙ⟩
Operations:
- enqueue(Q, x) = ⟨e₁, e₂, ..., eₙ, x⟩
- dequeue(Q) = ⟨e₂, e₃, ..., eₙ⟩ and returns e₁
- front(Q) = e₁
- isEmpty(Q) = (n = 0)
- size(Q) = n
Invariants (Properties that always hold):
def queue_invariants():
"""
Mathematical properties that must always be true for a valid queue
"""
invariants = {
"FIFO Property": """
For any two elements a, b where a enqueued before b:
If both are in queue Q, then dequeue(Q) will return a before b
""",
"Size Conservation": """
|Q| after enqueue = |Q| before + 1
|Q| after dequeue = |Q| before - 1 (if not empty)
""",
"Boundary Conditions": """
enqueue on full queue → error (bounded queue)
dequeue on empty queue → error
front on empty queue → error
""",
"Ordering Preservation": """
If sequence S = [a₁, a₂, ..., aₙ] enqueued in order,
then dequeue sequence = [a₁, a₂, ..., aₙ] (same order)
""",
"Position Invariant": """
For circular queue with capacity C:
Number of elements = (rear - front + C) % C
Queue is full when (rear + 1) % C == front
"""
}
return invariants
Proof: FIFO Property Preservation
Theorem: Queue maintains FIFO ordering
Proof by induction:
Base case: Empty queue Q = ⟨⟩
- Property holds trivially (no elements to order)
Inductive hypothesis:
- Assume FIFO holds for queue Q = ⟨e₁, e₂, ..., eₖ⟩
- Where e₁ was enqueued first, e₂ second, etc.
Inductive step:
Case 1: enqueue(Q, eₖ₊₁)
- New queue Q' = ⟨e₁, e₂, ..., eₖ, eₖ₊₁⟩
- front(Q') = e₁ (unchanged, still oldest)
- eₖ₊₁ at rear (newest)
- Relative order of e₁...eₖ preserved
- Therefore FIFO holds for Q'
Case 2: dequeue(Q)
- Returns e₁ (oldest element)
- New queue Q' = ⟨e₂, e₃, ..., eₖ⟩
- front(Q') = e₂ (now the oldest)
- Relative order of e₂...eₖ preserved
- Therefore FIFO holds for Q'
Conclusion: FIFO property preserved under all operations ∎
1.9 Cognitive Models and Intuition
Mental Model 1: Physical Queue
Real-world mapping:
Queue = Line of people
Enqueue = Person joins end of line
Dequeue = Person at front is served and leaves
Front = First person in line
Rear = Last person in line
Key insight: Position in line = arrival order
Fair processing: First come, first served
Mental Model 2: Conveyor Belt
Queue = Conveyor belt moving items
Enqueue = Place item on belt at loading end
Dequeue = Remove item from belt at unloading end
Movement direction = One way only (FIFO)
Key insight: Items move in sequence
Natural flow: Continuous processing
Mental Model 3: Pipe/Tube
Queue = Pipe with items flowing through
Enqueue = Insert at one end
Dequeue = Remove from other end
Capacity = Pipe diameter (bounded queue)
Key insight: First in is first out
Physical constraint: Can't access middle items
Mental Model 4: Timeline
def timeline_visualization():
"""
Queue as timeline of events
Past ← [Event1][Event2][Event3][Event4] → Future
↑ ↑
Front Rear
(oldest) (newest)
Dequeue = Process oldest event (move to past)
Enqueue = Add new event to future
"""
timeline = []
# Events arrive in chronological order
timeline.append(("08:00", "Morning standup"))
timeline.append(("09:00", "Code review"))
timeline.append(("10:00", "Feature development"))
# Process events in order
while timeline:
time, event = timeline.pop(0)
print(f"{time}: {event}") # Processes in chronological order
1.10 Queue Patterns and Idioms
Pattern 1: Producer-Consumer
class ProducerConsumerQueue:
"""
Classic producer-consumer pattern using queue
Producers add work items, consumers process them
"""
def __init__(self):
from collections import deque
self.queue = deque()
self.total_produced = 0
self.total_consumed = 0
def producer(self, items):
"""Producer adds items to queue"""
for item in items:
self.queue.append(item)
self.total_produced += 1
print(f"Produced: {item} (Queue size: {len(self.queue)})")
def consumer(self, count=1):
"""Consumer processes items from queue"""
results = []
for _ in range(count):
if self.queue:
item = self.queue.popleft()
self.total_consumed += 1
result = self.process(item)
results.append(result)
print(f"Consumed: {item} → {result}")
return results
def process(self, item):
"""Process item (placeholder for actual work)"""
return f"Processed({item})"
# Example usage
pc = ProducerConsumerQueue()
pc.producer([1, 2, 3, 4, 5]) # Producer adds 5 items
pc.consumer(2) # Consumer processes 2 items
pc.producer([6, 7]) # Producer adds 2 more
pc.consumer(3) # Consumer processes 3 items
Pattern 2: Request Buffering
class RequestBuffer:
"""
Buffer incoming requests to handle traffic bursts
Smooths out processing load over time
"""
def __init__(self, max_size=100):
from collections import deque
self.buffer = deque(maxlen=max_size)
self.dropped_requests = 0
def receive_request(self, request):
"""Receive incoming request"""
if len(self.buffer) >= self.buffer.maxlen:
self.dropped_requests += 1
print(f"⚠️ Buffer full! Dropped: {request}")
return False
self.buffer.append({
'request': request,
'timestamp': self.get_time()
})
return True
def process_batch(self, batch_size=10):
"""Process batch of requests"""
processed = []
for _ in range(min(batch_size, len(self.buffer))):
item = self.buffer.popleft()
latency = self.get_time() - item['timestamp']
processed.append({
'request': item['request'],
'latency': latency
})
return processed
def get_time(self):
"""Get current time (simplified)"""
import time
return time.time()
Pattern 3: Level Processing
def level_processing_pattern(root):
"""
Process tree/graph level by level
Common in BFS, social network traversal, etc.
"""
if not root:
return []
queue = [root]
levels = []
while queue:
level_size = len(queue)
current_level = []
# Process entire level
for _ in range(level_size):
node = queue.pop(0)
current_level.append(node.value)
# Add next level
if hasattr(node, 'children'):
queue.extend(node.children)
levels.append(current_level)
return levels
Pattern 4: Sliding Window with Queue
def sliding_window_queue_pattern(arr, k):
"""
Process sliding windows using queue/deque
Efficient window management
"""
from collections import deque
result = []
window = deque()
for i in range(len(arr)):
# Remove elements outside window
while window and window[0] <= i - k:
window.popleft()
# Add current element
window.append(i)
# Process window when it's full
if i >= k - 1:
# Window contains indices [i-k+1, ..., i]
window_values = [arr[idx] for idx in window]
result.append(process_window(window_values))
return result
def process_window(values):
"""Process window (example: return max)"""
return max(values)
2. Theory and Principles
2.1 FIFO Principle
Core Concept: First-In-First-Out (FIFO) is the fundamental principle governing queue behavior.
def demonstrate_fifo():
"""FIFO principle demonstration"""
queue = []
# Add elements in order: A, B, C
queue.append('A')
queue.append('B')
queue.append('C')
# Remove elements in same order: A, B, C
print(queue.pop(0)) # A (first in, first out)
print(queue.pop(0)) # B
print(queue.pop(0)) # C (last in, last out)
Mathematical Formalization:
For a queue Q with operations:
enqueue(x): Q → Q ∪ {x} at reardequeue(): Q → Q \ {front(Q)}, returns front(Q)front(Q): Returns element that has been in Q longest
Properties:
- If elements enqueued in order: e₁, e₂, e₃, ..., eₙ
- They will be dequeued in same order: e₁, e₂, e₃, ..., eₙ
- At any time t, accessible element is oldest in queue
2.2 Queue as Abstract Data Type (ADT)
ADT Specification:
ADT Queue:
Data:
- Collection of elements with linear ordering
- Front pointer indicating first element
- Rear pointer indicating last element
Operations:
- enqueue(item): Add item to rear
Precondition: Queue not full (if bounded)
Postcondition: item is new rear element
- dequeue(): Remove and return front element
Precondition: Queue not empty
Postcondition: Second element becomes new front
- front(): Return front element without removal
Precondition: Queue not empty
Postcondition: Queue unchanged
- isEmpty(): Return true if queue empty
Precondition: None
Postcondition: Queue unchanged
- size(): Return number of elements
Precondition: None
Postcondition: Queue unchanged
2.3 Circular Queue Theory
Concept: Circular queue uses modular arithmetic to wrap around array, avoiding wasted space.
def circular_indexing():
"""
Circular queue indexing
For array of size n:
- Next index: (index + 1) % n
- Previous index: (index - 1 + n) % n
"""
capacity = 5
front = 0
rear = 0
# Move rear forward (with wrap-around)
rear = (rear + 1) % capacity # 1
rear = (rear + 1) % capacity # 2
# ... continues to 4, then wraps to 0
# When front=4, rear=2:
# Size = (rear - front + capacity) % capacity
# = (2 - 4 + 5) % 5 = 3
Key Formulas:
For circular queue with capacity C:
1. Next position: (index + 1) % C
2. Queue size: (rear - front + C) % C
3. Is full: (rear + 1) % C == front
4. Is empty: front == rear
Advantage Over Linear Queue:
Linear Queue (wasteful):
After dequeue operations:
[_][_][_][4][5] ← Can't enqueue even with empty space
Circular Queue (efficient):
After wrap-around:
[6][7][_][4][5] ← Can reuse space
↑ ↑
Rear Front
2.4 Priority Queue Theory
Concept: Elements have associated priorities; highest priority element dequeued first.
Heap Property:
For a min-heap priority queue:
- Parent ≤ all children
- Root = minimum element = highest priority
Min-Heap Structure:
1
/ \
3 2
/ \ / \
5 4 6 7
Dequeue order: 1, 2, 3, 4, 5, 6, 7
Operations Complexity:
Using Binary Heap:
- enqueue (insert): O(log n) - bubble up
- dequeue (extract-min): O(log n) - bubble down
- peek (get-min): O(1) - access root
- heapify: O(n) - build heap from array
Mathematical Properties:
For heap with n elements:
- Height: ⌊log₂ n⌋
- Leaf nodes: ⌈n/2⌉
- Internal nodes: ⌊n/2⌋
- Parent of node i: ⌊(i-1)/2⌋
- Left child of node i: 2i + 1
- Right child of node i: 2i + 2
2.5 Deque Theory
Double-Ended Queue Concept:
Deque supports insertion and deletion at both ends.
Operations:
- addFront(x): Insert at front
- addRear(x): Insert at rear
- removeFront(): Delete from front
- removeRear(): Delete from rear
Visualization:
addFront addRear
↓ ↓
→ [1][2][3] ←
← →
removeFront removeRear
Use Cases:
- Sliding Window: Maintain window extrema
- Palindrome Check: Compare from both ends
- Undo-Redo: Stack at both ends
- Steal Work: Work-stealing scheduler
Deque vs Queue vs Stack:
Queue: FIFO
[1][2][3]
↑ ↑
Front Rear
Stack: LIFO
[1][2][3]
↑
Top
Deque: Both
[1][2][3]
↑ ↑
Both ends accessible
2.6 Monotonic Queue Theory
Concept: Maintains elements in monotonic (increasing/decreasing) order.
Types:
-
Monotonic Increasing: Each element ≥ previous
Front → [1, 3, 5, 7] ← Rear -
Monotonic Decreasing: Each element ≤ previous
Front → [9, 7, 5, 3] ← Rear
Key Insight:
When adding element x:
- For increasing: Remove all elements < x from rear
- For decreasing: Remove all elements > x from rear
Application: Sliding Window Maximum
Array: [1, 3, -1, -3, 5, 3, 6, 7]
Window size: k = 3
For each window:
- Maintain decreasing monotonic queue
- Front element = maximum in window
Window [1, 3, -1]:
Queue: [3, -1] (removed 1 as it's < 3)
Max: 3
Window [3, -1, -3]:
Queue: [3, -1, -3]
Max: 3
Time Complexity:
Each element:
- Enqueued once: O(1) amortized
- Dequeued once: O(1) amortized Total: O(n) for n elements
2.7 Queue with Stacks Theory
Implementing Queue Using Two Stacks:
Concept: Use two stacks to reverse order twice
Stack 1 (input): For enqueue
Stack 2 (output): For dequeue
Enqueue(1), Enqueue(2), Enqueue(3):
Input: [1, 2, 3] ← top
Output: []
Dequeue():
Transfer input → output (reverses order):
Input: []
Output: [3, 2, 1] ← top
Return output.pop() = 1 (FIFO maintained!)
Amortized Analysis:
n operations:
- Each element pushed to input once: O(1)
- Each element moved to output once: O(1)
- Each element popped from output once: O(1)
Total work: 3n operations
Amortized per operation: 3n/n = O(1)
Implementing Stack Using Two Queues:
Concept: Keep newest element at front
Queue 1 (main): Current stack
Queue 2 (temp): For reordering
Push(1):
Q1: [1]
Push(2):
Enqueue 2 to Q1
Move all previous to Q2
Q1: [2]
Q2: [1]
Swap Q1 and Q2
Q1: [2, 1]
Pop():
Return Q1.dequeue() = 2 (LIFO maintained!)
2.8 Level-Order Traversal Theory
BFS with Queue:
Tree:
1
/ \
2 3
/ \
4 5
BFS traversal: 1, 2, 3, 4, 5
Algorithm:
1. Enqueue root
2. While queue not empty:
a. Dequeue node
b. Process node
c. Enqueue children (left to right)
Queue states:
Initial: [1]
After dequeue 1: [2, 3]
After dequeue 2: [3, 4, 5]
After dequeue 3: [4, 5]
After dequeue 4: [5]
After dequeue 5: []
Level Tracking:
def level_order_with_levels(root):
"""
Track levels during BFS
Method 1: Null separator
Method 2: Size at each level
"""
if not root:
return []
queue = [root]
result = []
while queue:
level_size = len(queue) # Nodes at current level
level_nodes = []
for _ in range(level_size):
node = queue.pop(0)
level_nodes.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(level_nodes)
return result
2.9 Queuing Theory and Performance Analysis
Little's Law:
A fundamental theorem in queuing theory relating average number of items, arrival rate, and wait time:
L = λ × W
Where:
L = Average number of items in queue
λ = Average arrival rate (items per time unit)
W = Average time an item spends in system
Example:
If customers arrive at 5/minute (λ=5) and spend 2 minutes in system (W=2):
Average queue length L = 5 × 2 = 10 customers
Implementation:
class QueuePerformanceAnalyzer:
"""
Analyze queue performance using Little's Law and other metrics
"""
def __init__(self):
self.arrival_times = []
self.departure_times = []
self.queue_lengths = []
self.wait_times = []
def record_arrival(self, timestamp, queue_length):
"""Record customer arrival"""
self.arrival_times.append(timestamp)
self.queue_lengths.append(queue_length)
def record_departure(self, timestamp, arrival_time):
"""Record customer departure"""
self.departure_times.append(timestamp)
wait_time = timestamp - arrival_time
self.wait_times.append(wait_time)
def analyze(self):
"""Compute performance metrics"""
if not self.arrival_times or not self.departure_times:
return {}
# Average arrival rate (λ)
time_span = max(self.arrival_times) - min(self.arrival_times)
arrival_rate = len(self.arrival_times) / time_span if time_span > 0 else 0
# Average wait time (W)
avg_wait_time = sum(self.wait_times) / len(self.wait_times)
# Average queue length (L) - from Little's Law
avg_queue_length_calculated = arrival_rate * avg_wait_time
# Observed average queue length
avg_queue_length_observed = (sum(self.queue_lengths) /
len(self.queue_lengths))
return {
'arrival_rate': arrival_rate,
'avg_wait_time': avg_wait_time,
'avg_queue_length_calculated': avg_queue_length_calculated,
'avg_queue_length_observed': avg_queue_length_observed,
'total_served': len(self.departure_times),
'max_wait_time': max(self.wait_times),
'min_wait_time': min(self.wait_times)
}
# Example usage
analyzer = QueuePerformanceAnalyzer()
# Simulate queue operations
import random
current_time = 0
for i in range(100):
current_time += random.uniform(0.1, 0.5) # Random arrival
queue_len = random.randint(0, 20)
analyzer.record_arrival(current_time, queue_len)
# Record some departures
if i > 0 and i % 3 == 0:
departure_time = current_time + random.uniform(1, 3)
analyzer.record_departure(departure_time,
analyzer.arrival_times[i-2])
metrics = analyzer.analyze()
M/M/1 Queue Model:
Single server queue with:
- M: Markovian (exponential) arrival process
- M: Markovian (exponential) service process
- 1: Single server
Key formulas:
ρ = λ/μ (utilization, must be < 1 for stability)
L = ρ/(1-ρ) (average queue length)
W = 1/(μ-λ) (average wait time)
Where:
λ = arrival rate
μ = service rate
def mm1_queue_analysis(arrival_rate, service_rate):
"""
Analyze M/M/1 queue performance
Args:
arrival_rate (λ): Customers per time unit
service_rate (μ): Service capacity per time unit
Returns:
Dictionary of performance metrics
"""
if arrival_rate >= service_rate:
return {
'error': 'Unstable queue: arrival_rate must be < service_rate'
}
# Utilization (ρ)
rho = arrival_rate / service_rate
# Average number in system (L)
L = rho / (1 - rho)
# Average number in queue (Lq)
Lq = (rho ** 2) / (1 - rho)
# Average time in system (W)
W = 1 / (service_rate - arrival_rate)
# Average time in queue (Wq)
Wq = rho / (service_rate - arrival_rate)
# Probability of n customers in system
def prob_n_customers(n):
return (1 - rho) * (rho ** n)
return {
'utilization': rho,
'avg_customers_in_system': L,
'avg_customers_in_queue': Lq,
'avg_time_in_system': W,
'avg_time_in_queue': Wq,
'prob_empty': prob_n_customers(0),
'prob_1_customer': prob_n_customers(1),
'prob_2_customers': prob_n_customers(2)
}
# Example: Coffee shop queue
results = mm1_queue_analysis(arrival_rate=20, service_rate=25)
print(f"Utilization: {results['utilization']:.2%}")
print(f"Average customers in queue: {results['avg_customers_in_queue']:.2f}")
print(f"Average wait time: {results['avg_time_in_queue']:.2f} minutes")
2.10 Information Theory Perspective
Queue as Information Channel:
A queue can be viewed as a communication channel that preserves information (order).
Information Properties:
1. Order preservation: I(output) = I(input)
2. No information loss in FIFO
3. Capacity = maximum throughput
4. Channel delay = queue latency
Entropy and Queue States:
def queue_entropy(queue_states):
"""
Calculate entropy of queue states
H(X) = -Σ p(x) log₂ p(x)
Higher entropy = more unpredictable queue behavior
"""
import math
from collections import Counter
# Count frequency of each state
state_counts = Counter(queue_states)
total = len(queue_states)
entropy = 0
for count in state_counts.values():
probability = count / total
if probability > 0:
entropy -= probability * math.log2(probability)
return entropy
# Example: Analyze queue size variation
queue_sizes = [0, 1, 1, 2, 3, 2, 1, 0, 1, 2, 2, 1, 0]
H = queue_entropy(queue_sizes)
print(f"Queue entropy: {H:.3f} bits")
# Low entropy (predictable): [1, 1, 1, 1, 1, 1] → H ≈ 0
# High entropy (unpredictable): [0, 5, 2, 8, 1, 3] → H > 2
2.11 Graph Theory and Queue Traversal
Queue in Graph Algorithms:
BFS as Level-Set Propagation:
- Queue maintains frontier of exploration
- Each dequeue expands frontier by one edge
- Level structure emerges naturally
Graph G = (V, E):
- Queue Q represents current exploration frontier
- Visited set V tracks discovered vertices
- BFS tree T emerges from parent pointers
Formal BFS Properties:
def bfs_theoretical_properties(graph, start):
"""
Demonstrate theoretical properties of BFS
Properties proven:
1. Shortest path in unweighted graph
2. Level-order discovery
3. O(V + E) time complexity
4. O(V) space complexity
"""
from collections import deque
# Initialize
queue = deque([start])
distance = {start: 0} # Distance from start
parent = {start: None} # BFS tree
level = {start: 0} # Level in BFS tree
# Track statistics
edges_explored = 0
vertices_discovered = 1
while queue:
u = queue.popleft()
for v in graph.get(u, []):
edges_explored += 1
if v not in distance:
# First time discovering v
vertices_discovered += 1
distance[v] = distance[u] + 1 # Shortest path property
parent[v] = u # Build BFS tree
level[v] = level[u] + 1 # Track level
queue.append(v)
return {
'distances': distance, # Shortest paths from start
'parent': parent, # BFS spanning tree
'levels': level, # Level structure
'vertices_discovered': vertices_discovered,
'edges_explored': edges_explored
}
# Example graph
graph = {
'A': ['B', 'C'],
'B': ['A', 'D', 'E'],
'C': ['A', 'F'],
'D': ['B'],
'E': ['B', 'F'],
'F': ['C', 'E']
}
result = bfs_theoretical_properties(graph, 'A')
Proof: BFS Finds Shortest Paths
Theorem: BFS finds shortest path in unweighted graph
Proof by induction on distance from source s:
Base case (d=0):
- Only s has distance 0 from itself
- BFS correctly sets distance[s] = 0
Inductive hypothesis:
- BFS correctly computes shortest distance for all vertices
at distance ≤ k from s
Inductive step (distance k+1):
- Let v be a vertex at distance k+1 from s
- Shortest path to v: s → ... → u → v (length k+1)
- u is at distance k (by definition)
- By inductive hypothesis, BFS discovers u with distance[u] = k
- When processing u, BFS enqueues v with distance[v] = k+1
- v cannot be discovered earlier (would contradict shortest path)
- Therefore BFS correctly sets distance[v] = k+1
Conclusion: BFS computes shortest paths for all reachable vertices ∎
2.12 Concurrent Queue Theory
Linearizability:
Property: Concurrent operations appear to occur atomically
at some point between invocation and response
For queue:
- enqueue(x) appears to insert x atomically
- dequeue() appears to remove item atomically
- FIFO order must be preserved across threads
Lock-Free Queue Challenges:
def demonstrate_race_condition():
"""
Show why naive concurrent queue fails
Problem: Lost updates without synchronization
"""
# Shared queue (UNSAFE for concurrent access)
queue = []
# Thread 1 and Thread 2 both execute:
def enqueue(item):
# Race condition here!
current_size = len(queue) # Both read same size
queue.append(item) # Both append
# Result: size may be wrong, items may be lost
# Solution: Use locks or lock-free algorithms
import threading
class SafeQueue:
def __init__(self):
self.queue = []
self.lock = threading.Lock()
def enqueue(self, item):
with self.lock: # Atomic operation
self.queue.append(item)
def dequeue(self):
with self.lock: # Atomic operation
if self.queue:
return self.queue.pop(0)
return None
ABA Problem in Lock-Free Queues:
Problem: Value changes from A→B→A, appears unchanged
Scenario:
Thread 1: Read head pointer (A)
Thread 2: Dequeue A, dequeue B, enqueue A (head is A again)
Thread 1: CAS succeeds (head still A), but queue state changed!
Solution: Tagged pointers or version numbers
class TaggedNode:
"""
Node with version tag to prevent ABA problem
"""
def __init__(self, value, next_node=None, version=0):
self.value = value
self.next = next_node
self.version = version # Increment on each modification
def compare_and_swap(self, expected_version, new_next, new_version):
"""
Atomic CAS that checks both pointer and version
"""
if self.version == expected_version:
self.next = new_next
self.version = new_version
return True
return False
2.13 Complexity Theory and Queue Operations
Decision Tree Lower Bound:
Theorem: Any comparison-based priority queue must have
Ω(log n) worst-case time for insert or extract-min
Proof sketch:
- n! possible orderings of n elements
- Decision tree for building priority queue has ≥ n! leaves
- Height h ≥ log₂(n!) = Ω(n log n)
- Average operation: Ω(log n)
Amortized Analysis Deep Dive:
class AmortizedQueueAnalysis:
"""
Demonstrate amortized analysis techniques for queues
Three methods:
1. Aggregate analysis
2. Accounting method
3. Potential method
"""
def aggregate_analysis(self, operations):
"""
Method 1: Aggregate Analysis
Total cost / number of operations
"""
# Queue using two stacks
in_stack = []
out_stack = []
total_cost = 0
for op_type, value in operations:
if op_type == 'enqueue':
in_stack.append(value)
total_cost += 1 # One operation
elif op_type == 'dequeue':
if not out_stack:
# Transfer from in_stack to out_stack
transfer_cost = len(in_stack)
while in_stack:
out_stack.append(in_stack.pop())
total_cost += transfer_cost
if out_stack:
out_stack.pop()
total_cost += 1
amortized_cost = total_cost / len(operations)
return {
'total_cost': total_cost,
'operations': len(operations),
'amortized_cost': amortized_cost
}
def accounting_method(self):
"""
Method 2: Accounting Method
Charge each operation, use credits for expensive operations
"""
# Assign costs:
# enqueue: charge 3 (1 for enqueue, 2 credits saved)
# dequeue: charge 1 (use saved credits for transfer)
explanation = """
Accounting method for queue with two stacks:
enqueue(x): Charge 3
- 1 for push on in_stack
- 1 credit for future transfer from in_stack
- 1 credit for future pop from out_stack
dequeue(): Charge 1
- If out_stack not empty: pop (use 1)
- If out_stack empty: transfer uses saved credits
(each element has 2 credits: 1 for pop from in_stack,
1 for push to out_stack)
Invariant: Each element in in_stack has 2 credits
Amortized cost: O(1) per operation
"""
return explanation
def potential_method(self, operations):
"""
Method 3: Potential Method
Define potential function Φ, amortized cost = actual + ΔΦ
"""
in_stack = []
out_stack = []
def potential():
"""Potential function: Φ = size of in_stack"""
return len(in_stack)
results = []
for op_type, value in operations:
phi_before = potential()
actual_cost = 0
if op_type == 'enqueue':
in_stack.append(value)
actual_cost = 1
elif op_type == 'dequeue':
if not out_stack:
# Transfer
actual_cost = len(in_stack) + 1
while in_stack:
out_stack.append(in_stack.pop())
else:
actual_cost = 1
if out_stack:
out_stack.pop()
phi_after = potential()
delta_phi = phi_after - phi_before
amortized_cost = actual_cost + delta_phi
results.append({
'operation': op_type,
'actual_cost': actual_cost,
'delta_phi': delta_phi,
'amortized_cost': amortized_cost
})
return results
# Example usage
analyzer = AmortizedQueueAnalysis()
# Sequence of operations
ops = [
('enqueue', 1),
('enqueue', 2),
('enqueue', 3),
('dequeue', None),
('dequeue', None),
('enqueue', 4),
('dequeue', None)
]
aggregate_result = analyzer.aggregate_analysis(ops)
print(f"Amortized cost (aggregate): {aggregate_result['amortized_cost']:.2f}")
potential_results = analyzer.potential_method(ops)
avg_amortized = sum(r['amortized_cost'] for r in potential_results) / len(ops)
print(f"Amortized cost (potential): {avg_amortized:.2f}")
3. Implementation and Mechanics
3.1 Array-Based Queue Implementation
Simple Array Queue (Inefficient):
class SimpleArrayQueue:
"""
Simple array-based queue
Pros:
- Simple implementation
- Cache-friendly
Cons:
- O(n) dequeue operation
- Wasted space as front moves
"""
def __init__(self):
self.items = []
def enqueue(self, item):
"""O(1) - Add to rear"""
self.items.append(item)
def dequeue(self):
"""O(n) - Remove from front"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items.pop(0) # Expensive!
def front(self):
"""O(1) - View front"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[0]
def is_empty(self):
"""O(1) - Check empty"""
return len(self.items) == 0
def size(self):
"""O(1) - Get size"""
return len(self.items)
Optimized Array Queue with Front Pointer:
class ArrayQueue:
"""
Array queue with front pointer
Maintains front index to avoid shifting elements
"""
def __init__(self, capacity=100):
self.capacity = capacity
self.items = [None] * capacity
self.front_idx = 0
self.rear_idx = -1
self._size = 0
def enqueue(self, item):
"""O(1) - Add to rear"""
if self.is_full():
raise OverflowError("Queue is full")
self.rear_idx += 1
self.items[self.rear_idx] = item
self._size += 1
def dequeue(self):
"""O(1) - Remove from front"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self.items[self.front_idx]
self.items[self.front_idx] = None # Help GC
self.front_idx += 1
self._size -= 1
return item
def front(self):
"""O(1) - View front"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[self.front_idx]
def is_empty(self):
"""O(1) - Check empty"""
return self._size == 0
def is_full(self):
"""O(1) - Check full"""
return self.rear_idx == self.capacity - 1
def size(self):
"""O(1) - Get size"""
return self._size
3.2 Linked List-Based Queue Implementation
class Node:
"""Node for linked queue"""
def __init__(self, data):
self.data = data
self.next = None
class LinkedQueue:
"""
Linked list-based queue
Pros:
- Dynamic size (no fixed capacity)
- O(1) enqueue and dequeue
- No wasted space
Cons:
- Extra memory for pointers
- Poor cache locality
"""
def __init__(self):
self.front_node = None
self.rear_node = None
self._size = 0
def enqueue(self, item):
"""O(1) - Add to rear"""
new_node = Node(item)
if self.rear_node is None:
# Empty queue
self.front_node = self.rear_node = new_node
else:
self.rear_node.next = new_node
self.rear_node = new_node
self._size += 1
def dequeue(self):
"""O(1) - Remove from front"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self.front_node.data
self.front_node = self.front_node.next
# If queue becomes empty, update rear
if self.front_node is None:
self.rear_node = None
self._size -= 1
return item
def front(self):
"""O(1) - View front"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.front_node.data
def is_empty(self):
"""O(1) - Check empty"""
return self.front_node is None
def size(self):
"""O(1) - Get size"""
return self._size
def __str__(self):
"""String representation"""
if self.is_empty():
return "Queue: [] (front → rear)"
elements = []
current = self.front_node
while current:
elements.append(str(current.data))
current = current.next
return f"Queue: {' → '.join(elements)} (front → rear)"
3.3 Circular Queue Implementation
class CircularQueue:
"""
Circular queue using array with wrap-around
Efficient space utilization through circular indexing
"""
def __init__(self, capacity):
self.capacity = capacity
self.items = [None] * capacity
self.front_idx = 0
self.rear_idx = -1
self._size = 0
def enqueue(self, item):
"""O(1) - Add to rear with wrap-around"""
if self.is_full():
raise OverflowError("Queue is full")
# Circular increment
self.rear_idx = (self.rear_idx + 1) % self.capacity
self.items[self.rear_idx] = item
self._size += 1
def dequeue(self):
"""O(1) - Remove from front with wrap-around"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self.items[self.front_idx]
self.items[self.front_idx] = None
# Circular increment
self.front_idx = (self.front_idx + 1) % self.capacity
self._size -= 1
return item
def front(self):
"""O(1) - View front"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[self.front_idx]
def is_empty(self):
"""O(1) - Check empty"""
return self._size == 0
def is_full(self):
"""O(1) - Check full"""
return self._size == self.capacity
def size(self):
"""O(1) - Get size"""
return self._size
def __str__(self):
"""Visualize circular queue"""
if self.is_empty():
return "Circular Queue: [] (empty)"
# Show elements from front to rear
elements = []
idx = self.front_idx
for _ in range(self._size):
elements.append(str(self.items[idx]))
idx = (idx + 1) % self.capacity
return f"Circular Queue: [{', '.join(elements)}] (F={self.front_idx}, R={self.rear_idx})"
# Dynamic circular queue
class DynamicCircularQueue:
"""Circular queue that resizes when full"""
def __init__(self, initial_capacity=4):
self.capacity = initial_capacity
self.items = [None] * initial_capacity
self.front_idx = 0
self.rear_idx = -1
self._size = 0
def enqueue(self, item):
"""O(1) amortized - Resize if needed"""
if self.is_full():
self._resize()
self.rear_idx = (self.rear_idx + 1) % self.capacity
self.items[self.rear_idx] = item
self._size += 1
def _resize(self):
"""Double capacity and reorganize"""
new_capacity = self.capacity * 2
new_items = [None] * new_capacity
# Copy elements in order
idx = self.front_idx
for i in range(self._size):
new_items[i] = self.items[idx]
idx = (idx + 1) % self.capacity
self.items = new_items
self.front_idx = 0
self.rear_idx = self._size - 1
self.capacity = new_capacity
def dequeue(self):
"""O(1) - Remove from front"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self.items[self.front_idx]
self.items[self.front_idx] = None
self.front_idx = (self.front_idx + 1) % self.capacity
self._size -= 1
return item
def is_empty(self):
return self._size == 0
def is_full(self):
return self._size == self.capacity
def size(self):
return self._size
3.4 Priority Queue (Heap-Based) Implementation
class PriorityQueue:
"""
Min-heap based priority queue
Lower value = higher priority
"""
def __init__(self):
self.heap = []
def enqueue(self, item, priority=None):
"""O(log n) - Add with priority"""
if priority is None:
priority = item
self.heap.append((priority, item))
self._bubble_up(len(self.heap) - 1)
def dequeue(self):
"""O(log n) - Remove highest priority"""
if self.is_empty():
raise IndexError("Priority queue is empty")
if len(self.heap) == 1:
return self.heap.pop()[1]
# Swap first and last, then bubble down
self._swap(0, len(self.heap) - 1)
priority, item = self.heap.pop()
if self.heap:
self._bubble_down(0)
return item
def front(self):
"""O(1) - View highest priority"""
if self.is_empty():
raise IndexError("Priority queue is empty")
return self.heap[0][1]
def is_empty(self):
"""O(1) - Check empty"""
return len(self.heap) == 0
def size(self):
"""O(1) - Get size"""
return len(self.heap)
def _bubble_up(self, index):
"""Restore heap property upward"""
parent_idx = (index - 1) // 2
while index > 0 and self.heap[index][0] < self.heap[parent_idx][0]:
self._swap(index, parent_idx)
index = parent_idx
parent_idx = (index - 1) // 2
def _bubble_down(self, index):
"""Restore heap property downward"""
while True:
smallest = index
left = 2 * index + 1
right = 2 * index + 2
# Find smallest among node and children
if (left < len(self.heap) and
self.heap[left][0] < self.heap[smallest][0]):
smallest = left
if (right < len(self.heap) and
self.heap[right][0] < self.heap[smallest][0]):
smallest = right
if smallest == index:
break
self._swap(index, smallest)
index = smallest
def _swap(self, i, j):
"""Swap two elements"""
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
# Using Python's heapq module
import heapq
class PriorityQueueHeapq:
"""Priority queue using heapq"""
def __init__(self):
self.heap = []
self.counter = 0 # For stable sorting
def enqueue(self, item, priority=None):
"""O(log n) - Add with priority"""
if priority is None:
priority = item
# Use counter for stable sort (preserve order for equal priorities)
heapq.heappush(self.heap, (priority, self.counter, item))
self.counter += 1
def dequeue(self):
"""O(log n) - Remove highest priority"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = heapq.heappop(self.heap)
return item
def front(self):
"""O(1) - View highest priority"""
if self.is_empty():
raise IndexError("Priority queue is empty")
return self.heap[0][2]
def is_empty(self):
return len(self.heap) == 0
def size(self):
return len(self.heap)
3.5 Deque (Double-Ended Queue) Implementation
class Deque:
"""
Double-ended queue using circular array
Supports insertion/deletion at both ends
"""
def __init__(self, capacity=10):
self.capacity = capacity
self.items = [None] * capacity
self.front_idx = 0
self.rear_idx = 0
self._size = 0
def add_front(self, item):
"""O(1) - Add to front"""
if self.is_full():
raise OverflowError("Deque is full")
# Move front backward (circular)
self.front_idx = (self.front_idx - 1 + self.capacity) % self.capacity
self.items[self.front_idx] = item
self._size += 1
def add_rear(self, item):
"""O(1) - Add to rear"""
if self.is_full():
raise OverflowError("Deque is full")
self.items[self.rear_idx] = item
self.rear_idx = (self.rear_idx + 1) % self.capacity
self._size += 1
def remove_front(self):
"""O(1) - Remove from front"""
if self.is_empty():
raise IndexError("Deque is empty")
item = self.items[self.front_idx]
self.items[self.front_idx] = None
self.front_idx = (self.front_idx + 1) % self.capacity
self._size -= 1
return item
def remove_rear(self):
"""O(1) - Remove from rear"""
if self.is_empty():
raise IndexError("Deque is empty")
self.rear_idx = (self.rear_idx - 1 + self.capacity) % self.capacity
item = self.items[self.rear_idx]
self.items[self.rear_idx] = None
self._size -= 1
return item
def front(self):
"""O(1) - View front"""
if self.is_empty():
raise IndexError("Deque is empty")
return self.items[self.front_idx]
def rear(self):
"""O(1) - View rear"""
if self.is_empty():
raise IndexError("Deque is empty")
rear_element_idx = (self.rear_idx - 1 + self.capacity) % self.capacity
return self.items[rear_element_idx]
def is_empty(self):
return self._size == 0
def is_full(self):
return self._size == self.capacity
def size(self):
return self._size
# Python's collections.deque
from collections import deque
def using_collections_deque():
"""Demonstrate Python's built-in deque"""
dq = deque()
# Add to both ends
dq.append(1) # Add to right (rear)
dq.appendleft(0) # Add to left (front)
# dq: [0, 1]
# Remove from both ends
dq.pop() # Remove from right: 1
dq.popleft() # Remove from left: 0
# With maxlen (bounded deque)
bounded_dq = deque(maxlen=3)
bounded_dq.extend([1, 2, 3])
bounded_dq.append(4) # Automatically removes 1
# bounded_dq: [2, 3, 4]
# Rotation
dq = deque([1, 2, 3, 4, 5])
dq.rotate(2) # Rotate right: [4, 5, 1, 2, 3]
dq.rotate(-2) # Rotate left: [1, 2, 3, 4, 5]
return dq
3.6 Implementing Stack Using Queues
class StackUsingOneQueue:
"""
Stack using single queue
Push: O(n) - Reorder elements
Pop: O(1)
"""
def __init__(self):
self.queue = []
def push(self, item):
"""O(n) - Add and reorder"""
self.queue.append(item)
# Rotate all previous elements to back
for _ in range(len(self.queue) - 1):
self.queue.append(self.queue.pop(0))
def pop(self):
"""O(1) - Remove from front"""
if self.is_empty():
raise IndexError("Stack is empty")
return self.queue.pop(0)
def top(self):
"""O(1) - View top"""
if self.is_empty():
raise IndexError("Stack is empty")
return self.queue[0]
def is_empty(self):
return len(self.queue) == 0
class StackUsingTwoQueues:
"""
Stack using two queues
Push: O(1)
Pop: O(n) - Transfer elements
"""
def __init__(self):
self.q1 = [] # Main queue
self.q2 = [] # Temporary queue
def push(self, item):
"""O(1) - Add to q1"""
self.q1.append(item)
def pop(self):
"""O(n) - Transfer to q2, pop last"""
if self.is_empty():
raise IndexError("Stack is empty")
# Move all but last element to q2
while len(self.q1) > 1:
self.q2.append(self.q1.pop(0))
# Pop last element (top of stack)
item = self.q1.pop(0)
# Swap q1 and q2
self.q1, self.q2 = self.q2, self.q1
return item
def top(self):
"""O(n) - Similar to pop but don't remove"""
if self.is_empty():
raise IndexError("Stack is empty")
while len(self.q1) > 1:
self.q2.append(self.q1.pop(0))
item = self.q1[0]
self.q2.append(self.q1.pop(0))
self.q1, self.q2 = self.q2, self.q1
return item
def is_empty(self):
return len(self.q1) == 0
3.7 Queue Using Two Stacks
class QueueUsingTwoStacks:
"""
Queue using two stacks
Enqueue: O(1)
Dequeue: O(1) amortized
"""
def __init__(self):
self.input_stack = [] # For enqueue
self.output_stack = [] # For dequeue
def enqueue(self, item):
"""O(1) - Push to input stack"""
self.input_stack.append(item)
def dequeue(self):
"""O(1) amortized - Pop from output stack"""
if self.is_empty():
raise IndexError("Queue is empty")
# Transfer from input to output if output is empty
if not self.output_stack:
while self.input_stack:
self.output_stack.append(self.input_stack.pop())
return self.output_stack.pop()
def front(self):
"""O(1) amortized - View front"""
if self.is_empty():
raise IndexError("Queue is empty")
if not self.output_stack:
while self.input_stack:
self.output_stack.append(self.input_stack.pop())
return self.output_stack[-1]
def is_empty(self):
"""O(1) - Check if both stacks empty"""
return not self.input_stack and not self.output_stack
def size(self):
"""O(1) - Sum of both stacks"""
return len(self.input_stack) + len(self.output_stack)
# Example usage
def demonstrate_queue_with_stacks():
"""
Demonstrate how queue maintains FIFO with stacks
"""
q = QueueUsingTwoStacks()
# Enqueue 1, 2, 3
q.enqueue(1)
q.enqueue(2)
q.enqueue(3)
# State: input_stack = [1, 2, 3], output_stack = []
# First dequeue triggers transfer
print(q.dequeue()) # 1
# State: input_stack = [], output_stack = [3, 2]
# Subsequent dequeues are O(1)
print(q.dequeue()) # 2
print(q.dequeue()) # 3
3.8 Sliding Window Maximum Using Deque
def sliding_window_maximum(nums, k):
"""
Find maximum in each sliding window of size k
Uses monotonic decreasing deque
Time: O(n), Space: O(k)
"""
from collections import deque
if not nums or k == 0:
return []
dq = deque() # Stores indices
result = []
for i in range(len(nums)):
# Remove indices outside current window
while dq and dq[0] < i - k + 1:
dq.popleft()
# Remove smaller elements from rear (maintain decreasing order)
while dq and nums[dq[-1]] < nums[i]:
dq.pop()
# Add current element's index
dq.append(i)
# Window is fully formed
if i >= k - 1:
result.append(nums[dq[0]]) # Front = maximum
return result
# Example:
# nums = [1, 3, -1, -3, 5, 3, 6, 7], k = 3
# Output: [3, 3, 5, 5, 6, 7]
def sliding_window_minimum(nums, k):
"""
Find minimum in each sliding window
Uses monotonic increasing deque
"""
from collections import deque
dq = deque()
result = []
for i in range(len(nums)):
# Remove indices outside window
while dq and dq[0] < i - k + 1:
dq.popleft()
# Remove larger elements (maintain increasing order)
while dq and nums[dq[-1]] > nums[i]:
dq.pop()
dq.append(i)
if i >= k - 1:
result.append(nums[dq[0]])
return result
3.9 Level Order Traversal Implementation
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right
def level_order_traversal(root):
"""
BFS level-order traversal using queue
Time: O(n), Space: O(w) where w = max width
"""
if not root:
return []
queue = [root]
result = []
while queue:
node = queue.pop(0)
result.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
return result
def level_order_by_levels(root):
"""
Level-order with levels separated
Returns: [[level 0], [level 1], [level 2], ...]
"""
if not root:
return []
queue = [root]
result = []
while queue:
level_size = len(queue)
level_nodes = []
for _ in range(level_size):
node = queue.pop(0)
level_nodes.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(level_nodes)
return result
def zigzag_level_order(root):
"""
Zigzag level-order traversal
Level 0: left to right
Level 1: right to left
Level 2: left to right
...
"""
if not root:
return []
queue = [root]
result = []
left_to_right = True
while queue:
level_size = len(queue)
level_nodes = []
for _ in range(level_size):
node = queue.pop(0)
level_nodes.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
# Reverse if right to left
if not left_to_right:
level_nodes.reverse()
result.append(level_nodes)
left_to_right = not left_to_right
return result
def right_side_view(root):
"""
Return rightmost node at each level
Uses level-order traversal
"""
if not root:
return []
queue = [root]
result = []
while queue:
level_size = len(queue)
for i in range(level_size):
node = queue.pop(0)
# Last node in level = rightmost
if i == level_size - 1:
result.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
return result
3.10 Monotonic Queue Pattern
class MonotonicQueue:
"""
Generic monotonic queue
Supports both increasing and decreasing variants
"""
def __init__(self, increasing=True):
from collections import deque
self.dq = deque()
self.increasing = increasing
def push(self, value):
"""Add value maintaining monotonic property"""
if self.increasing:
# Remove elements larger than current from rear
while self.dq and self.dq[-1] > value:
self.dq.pop()
else:
# Remove elements smaller than current from rear
while self.dq and self.dq[-1] < value:
self.dq.pop()
self.dq.append(value)
def pop(self, value):
"""Remove value if it's at front"""
if self.dq and self.dq[0] == value:
self.dq.popleft()
def get_extreme(self):
"""Get min (increasing) or max (decreasing)"""
if not self.dq:
raise IndexError("Monotonic queue is empty")
return self.dq[0]
def is_empty(self):
return len(self.dq) == 0
# Application: Shortest Subarray with Sum at Least K
def shortest_subarray_with_sum_k(nums, k):
"""
Find shortest subarray with sum >= k
Uses monotonic increasing queue of prefix sums
"""
from collections import deque
n = len(nums)
prefix_sum = [0] * (n + 1)
# Compute prefix sums
for i in range(n):
prefix_sum[i + 1] = prefix_sum[i] + nums[i]
dq = deque()
min_length = float('inf')
for i in range(n + 1):
# Check if we can form subarray ending at i
while dq and prefix_sum[i] - prefix_sum[dq[0]] >= k:
min_length = min(min_length, i - dq.popleft())
# Maintain increasing order of prefix sums
while dq and prefix_sum[i] <= prefix_sum[dq[-1]]:
dq.pop()
dq.append(i)
return min_length if min_length != float('inf') else -1
3.11 Task Scheduling with Queues
class TaskScheduler:
"""
Task scheduler using priority queue
Tasks have:
- Priority (lower number = higher priority)
- Arrival time
- Execution time
"""
def __init__(self):
import heapq
self.task_queue = [] # Min-heap by priority
self.current_time = 0
def add_task(self, task_id, priority, arrival_time, exec_time):
"""Add task to queue"""
import heapq
task = (priority, arrival_time, exec_time, task_id)
heapq.heappush(self.task_queue, task)
def execute_tasks(self):
"""Execute all tasks in priority order"""
import heapq
execution_log = []
while self.task_queue:
priority, arrival, exec_time, task_id = heapq.heappop(self.task_queue)
# Wait for task to arrive
if arrival > self.current_time:
self.current_time = arrival
# Execute task
start_time = self.current_time
self.current_time += exec_time
end_time = self.current_time
execution_log.append({
'task_id': task_id,
'priority': priority,
'start': start_time,
'end': end_time,
'waiting_time': start_time - arrival
})
return execution_log
# Round-robin scheduling with queue
class RoundRobinScheduler:
"""
Round-robin CPU scheduling
Each task gets fixed time quantum
"""
def __init__(self, quantum=2):
self.quantum = quantum
self.task_queue = []
self.current_time = 0
def add_task(self, task_id, burst_time):
"""Add task to ready queue"""
self.task_queue.append({
'id': task_id,
'remaining_time': burst_time,
'arrival_time': self.current_time
})
def execute(self):
"""Execute tasks in round-robin fashion"""
execution_log = []
while self.task_queue:
task = self.task_queue.pop(0)
# Execute for quantum or remaining time
exec_time = min(self.quantum, task['remaining_time'])
start_time = self.current_time
self.current_time += exec_time
task['remaining_time'] -= exec_time
execution_log.append({
'task_id': task['id'],
'start': start_time,
'end': self.current_time,
'quantum_used': exec_time
})
# Re-enqueue if not finished
if task['remaining_time'] > 0:
self.task_queue.append(task)
return execution_log
4. Problem-Solving and Applications
4.1 BFS and Graph Traversal Problems
Problem 1: Binary Tree Level Order Traversal
def level_order_traversal_advanced(root):
"""
Advanced level-order traversal variations
1. Standard level-order
2. Bottom-up level-order
3. Level averages
4. Maximum value per level
"""
if not root:
return []
queue = [root]
result = []
while queue:
level_size = len(queue)
level_nodes = []
for _ in range(level_size):
node = queue.pop(0)
level_nodes.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(level_nodes)
return result
def bottom_up_level_order(root):
"""Bottom-up level order (reverse of standard)"""
result = level_order_traversal_advanced(root)
return result[::-1]
def level_averages(root):
"""Average value at each level"""
if not root:
return []
queue = [root]
averages = []
while queue:
level_size = len(queue)
level_sum = 0
for _ in range(level_size):
node = queue.pop(0)
level_sum += node.val
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
averages.append(level_sum / level_size)
return averages
def largest_values_per_level(root):
"""Maximum value at each level"""
if not root:
return []
queue = [root]
max_values = []
while queue:
level_size = len(queue)
level_max = float('-inf')
for _ in range(level_size):
node = queue.pop(0)
level_max = max(level_max, node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
max_values.append(level_max)
return max_values
Problem 2: Shortest Path in Unweighted Graph
def shortest_path_bfs(graph, start, end):
"""
Find shortest path in unweighted graph using BFS
Time: O(V + E), Space: O(V)
"""
if start == end:
return [start]
queue = [(start, [start])]
visited = {start}
while queue:
node, path = queue.pop(0)
for neighbor in graph.get(node, []):
if neighbor in visited:
continue
new_path = path + [neighbor]
if neighbor == end:
return new_path
visited.add(neighbor)
queue.append((neighbor, new_path))
return None # No path found
def shortest_path_distance(graph, start, end):
"""Return distance only (more efficient)"""
if start == end:
return 0
queue = [(start, 0)]
visited = {start}
while queue:
node, dist = queue.pop(0)
for neighbor in graph.get(node, []):
if neighbor in visited:
continue
if neighbor == end:
return dist + 1
visited.add(neighbor)
queue.append((neighbor, dist + 1))
return -1 # No path found
Problem 3: Rotting Oranges
def oranges_rotting(grid):
"""
Time for all oranges to rot (multi-source BFS)
0 = empty, 1 = fresh orange, 2 = rotten orange
Rotten orange rots adjacent fresh oranges every minute
Time: O(m*n), Space: O(m*n)
"""
from collections import deque
rows, cols = len(grid), len(grid[0])
queue = deque()
fresh_count = 0
# Find all rotten oranges and count fresh ones
for r in range(rows):
for c in range(cols):
if grid[r][c] == 2:
queue.append((r, c, 0)) # (row, col, time)
elif grid[r][c] == 1:
fresh_count += 1
if fresh_count == 0:
return 0
directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
max_time = 0
while queue:
r, c, time = queue.popleft()
for dr, dc in directions:
nr, nc = r + dr, c + dc
# Check bounds and if fresh
if (0 <= nr < rows and 0 <= nc < cols and
grid[nr][nc] == 1):
grid[nr][nc] = 2 # Rot the orange
fresh_count -= 1
max_time = max(max_time, time + 1)
queue.append((nr, nc, time + 1))
return max_time if fresh_count == 0 else -1
4.2 Sliding Window Problems
Problem 1: Sliding Window Maximum
def max_sliding_window(nums, k):
"""
Maximum in each sliding window using monotonic deque
Time: O(n), Space: O(k)
"""
from collections import deque
if not nums or k == 0:
return []
dq = deque() # Stores indices
result = []
for i in range(len(nums)):
# Remove indices outside window
while dq and dq[0] < i - k + 1:
dq.popleft()
# Remove smaller elements (maintain decreasing order)
while dq and nums[dq[-1]] < nums[i]:
dq.pop()
dq.append(i)
# Add to result when window is full
if i >= k - 1:
result.append(nums[dq[0]])
return result
def min_sliding_window(nums, k):
"""Minimum in each sliding window"""
from collections import deque
dq = deque()
result = []
for i in range(len(nums)):
while dq and dq[0] < i - k + 1:
dq.popleft()
# Maintain increasing order for minimum
while dq and nums[dq[-1]] > nums[i]:
dq.pop()
dq.append(i)
if i >= k - 1:
result.append(nums[dq[0]])
return result
Problem 2: Longest Continuous Subarray
def longest_continuous_subarray(nums, limit):
"""
Longest subarray where max - min <= limit
Uses two monotonic deques (one for max, one for min)
"""
from collections import deque
max_dq = deque() # Decreasing
min_dq = deque() # Increasing
left = 0
max_length = 0
for right in range(len(nums)):
# Maintain max deque
while max_dq and nums[max_dq[-1]] < nums[right]:
max_dq.pop()
max_dq.append(right)
# Maintain min deque
while min_dq and nums[min_dq[-1]] > nums[right]:
min_dq.pop()
min_dq.append(right)
# Shrink window if limit exceeded
while nums[max_dq[0]] - nums[min_dq[0]] > limit:
left += 1
if max_dq[0] < left:
max_dq.popleft()
if min_dq[0] < left:
min_dq.popleft()
max_length = max(max_length, right - left + 1)
return max_length
4.3 Priority Queue Problems
Problem 1: Kth Largest Element
import heapq
def find_kth_largest(nums, k):
"""
Find kth largest element using min-heap
Maintain heap of size k with k largest elements
Time: O(n log k), Space: O(k)
"""
heap = []
for num in nums:
heapq.heappush(heap, num)
if len(heap) > k:
heapq.heappop(heap)
return heap[0] # Smallest in heap = kth largest overall
def find_kth_smallest(nums, k):
"""Find kth smallest using max-heap"""
heap = []
for num in nums:
# Python only has min-heap, so negate for max-heap
heapq.heappush(heap, -num)
if len(heap) > k:
heapq.heappop(heap)
return -heap[0]
Problem 2: Merge K Sorted Lists
def merge_k_sorted_lists(lists):
"""
Merge k sorted linked lists using priority queue
Time: O(n log k) where n = total elements, k = lists
Space: O(k)
"""
import heapq
heap = []
# Add first element from each list
for i, lst in enumerate(lists):
if lst:
heapq.heappush(heap, (lst.val, i, lst))
dummy = ListNode(0)
current = dummy
while heap:
val, i, node = heapq.heappop(heap)
current.next = node
current = current.next
# Add next element from same list
if node.next:
heapq.heappush(heap, (node.next.val, i, node.next))
return dummy.next
def merge_k_sorted_arrays(arrays):
"""Merge k sorted arrays"""
import heapq
heap = []
# Add first element from each array with array index and element index
for i in range(len(arrays)):
if arrays[i]:
heapq.heappush(heap, (arrays[i][0], i, 0))
result = []
while heap:
val, arr_idx, elem_idx = heapq.heappop(heap)
result.append(val)
# Add next element from same array
if elem_idx + 1 < len(arrays[arr_idx]):
next_val = arrays[arr_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
return result
Problem 3: Top K Frequent Elements
def top_k_frequent(nums, k):
"""
Find k most frequent elements
Time: O(n log k), Space: O(n)
"""
from collections import Counter
import heapq
# Count frequencies
freq_map = Counter(nums)
# Use min-heap of size k
heap = []
for num, freq in freq_map.items():
heapq.heappush(heap, (freq, num))
if len(heap) > k:
heapq.heappop(heap)
return [num for freq, num in heap]
def top_k_frequent_bucket_sort(nums, k):
"""
Using bucket sort: O(n) time
Bucket i contains elements with frequency i
"""
from collections import Counter
freq_map = Counter(nums)
buckets = [[] for _ in range(len(nums) + 1)]
# Place elements in frequency buckets
for num, freq in freq_map.items():
buckets[freq].append(num)
# Collect from highest frequency buckets
result = []
for i in range(len(buckets) - 1, 0, -1):
result.extend(buckets[i])
if len(result) >= k:
break
return result[:k]
4.4 Task Scheduling Problems
Problem 1: Task Scheduler
def least_interval(tasks, n):
"""
Minimum intervals to complete all tasks with cooldown n
Example: tasks = ["A","A","A","B","B","B"], n = 2
Output: 8 (A -> B -> idle -> A -> B -> idle -> A -> B)
Time: O(m log m) where m = unique tasks
Space: O(m) for heap
Approach 1: Priority Queue (Greedy)
Strategy: Always execute most frequent task available
"""
from collections import Counter
import heapq
task_counts = Counter(tasks)
# Max-heap of frequencies (negate for max-heap)
heap = [-count for count in task_counts.values()]
heapq.heapify(heap)
time = 0
while heap:
temp = []
cycle = n + 1 # Need to wait n intervals
for _ in range(cycle):
if heap:
freq = heapq.heappop(heap)
temp.append(freq + 1) # Decrement frequency
# Re-add non-zero frequencies
for freq in temp:
if freq < 0:
heapq.heappush(heap, freq)
# If heap empty, we're done (no idle time needed)
time += cycle if heap else len(temp)
return time
def least_interval_optimized(tasks, n):
"""
Approach 2: Mathematical Formula (Optimal)
Key insight:
- Most frequent task determines minimum time
- Idle slots = (max_freq - 1) * n
- Fill idle slots with other tasks
Time: O(m) where m = total tasks
Space: O(1) - fixed 26 letters
"""
from collections import Counter
frequencies = Counter(tasks)
max_freq = max(frequencies.values())
# Count tasks with max frequency
max_count = sum(1 for f in frequencies.values() if f == max_freq)
# Minimum time formula
# (max_freq - 1) complete cycles + final group of max_freq tasks
min_time = (max_freq - 1) * (n + 1) + max_count
# Can't be less than total tasks
return max(min_time, len(tasks))
def least_interval_detailed_explanation(tasks, n):
"""
Detailed step-by-step explanation
Example walkthrough:
tasks = ["A","A","A","B","B","B"], n = 2
Step 1: Count frequencies
A: 3, B: 3
Step 2: max_freq = 3, max_count = 2 (both A and B)
Step 3: Calculate minimum intervals
- Need (3-1) = 2 complete cycles
- Each cycle has (2+1) = 3 slots
- Last cycle has 2 tasks (A and B)
- Total: 2 * 3 + 2 = 8
Timeline: A B _ A B _ A B
↑ ↑ ↑
cycle1 cycle2 final
Step 4: Verify
- Total tasks = 6
- Formula gives 8
- Return max(8, 6) = 8
"""
from collections import Counter
freq = Counter(tasks)
max_freq = max(freq.values())
max_count = sum(1 for f in freq.values() if f == max_freq)
# Detailed calculation
cycles = max_freq - 1
slots_per_cycle = n + 1
final_group_size = max_count
min_time = cycles * slots_per_cycle + final_group_size
print(f"Max frequency: {max_freq}")
print(f"Tasks with max frequency: {max_count}")
print(f"Complete cycles: {cycles}")
print(f"Slots per cycle: {slots_per_cycle}")
print(f"Formula: {cycles} * {slots_per_cycle} + {final_group_size} = {min_time}")
print(f"Total tasks: {len(tasks)}")
print(f"Result: max({min_time}, {len(tasks)}) = {max(min_time, len(tasks))}")
return max(min_time, len(tasks))
# Test cases
def test_task_scheduler():
"""Comprehensive test cases"""
print("Testing Task Scheduler...")
print("=" * 60)
test_cases = [
# (tasks, n, expected)
(["A","A","A","B","B","B"], 2, 8),
(["A","A","A","B","B","B"], 0, 6),
(["A","A","A","A","A","A","B","C","D","E","F","G"], 2, 16),
(["A","B","C","D","E","A","B","C","D","E"], 4, 10),
]
for i, (tasks, n, expected) in enumerate(test_cases, 1):
result1 = least_interval(tasks, n)
result2 = least_interval_optimized(tasks, n)
print(f"\nTest {i}:")
print(f" Tasks: {tasks}")
print(f" Cooldown: {n}")
print(f" Expected: {expected}")
print(f" Approach 1 (Priority Queue): {result1}")
print(f" Approach 2 (Formula): {result2}")
print(f" {'PASS' if result1 == expected and result2 == expected else 'FAIL'}")
if __name__ == "__main__":
test_task_scheduler()
Problem 1b: Task Scheduler with Execution Order
def task_scheduler_with_order(tasks, n):
"""
Return actual execution order (not just count)
Shows which task executes at each time slot
"""
from collections import Counter, deque
import heapq
task_counts = Counter(tasks)
# Max-heap (task_name, -frequency)
heap = [(-count, task) for task, count in task_counts.items()]
heapq.heapify(heap)
execution_order = []
time = 0
while heap:
temp = []
cycle_tasks = []
# Execute up to (n+1) tasks in this cycle
for _ in range(n + 1):
if heap:
neg_freq, task = heapq.heappop(heap)
cycle_tasks.append(task)
if neg_freq + 1 < 0: # Still has remaining
temp.append((neg_freq + 1, task))
elif temp or heap: # Add idle if more cycles needed
cycle_tasks.append("idle")
execution_order.extend(cycle_tasks)
# Re-add tasks with remaining frequency
for item in temp:
heapq.heappush(heap, item)
time += len(cycle_tasks)
return execution_order, time
def visualize_task_schedule(tasks, n):
"""Visualize task execution timeline"""
order, total_time = task_scheduler_with_order(tasks, n)
print(f"\nTask Schedule Visualization (cooldown n={n}):")
print(f"Tasks: {tasks}")
print(f"Total time: {total_time}\n")
# Print timeline
print("Time: ", end="")
for t in range(len(order)):
print(f"{t:3d}", end=" ")
print()
print("Task: ", end="")
for task in order:
print(f"{task:>3s}", end=" ")
print()
# Print cooldown indicators
print("\nCooldown periods marked:")
for i, task in enumerate(order):
if task != "idle":
cooldown_end = min(i + n, len(order) - 1)
print(f" {task} at time {i}: cooldown until time {cooldown_end}")
# Example usage
if __name__ == "__main__":
visualize_task_schedule(["A","A","A","B","B","B"], 2)
visualize_task_schedule(["A","A","A","A","A","A","B","C","D","E","F","G"], 2)
Problem 2: Course Schedule
def can_finish_courses(num_courses, prerequisites):
"""
Check if can finish all courses (detect cycle)
Uses BFS with in-degree (Kahn's algorithm)
Time: O(V + E), Space: O(V + E)
"""
from collections import defaultdict, deque
# Build adjacency list and in-degree count
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# Queue of courses with no prerequisites
queue = deque([i for i in range(num_courses) if in_degree[i] == 0])
completed = 0
while queue:
course = queue.popleft()
completed += 1
# Process dependent courses
for dependent in graph[course]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return completed == num_courses
def find_order(num_courses, prerequisites):
"""
Find valid course order (topological sort)
Returns one valid ordering or empty if impossible
"""
from collections import defaultdict, deque
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque([i for i in range(num_courses) if in_degree[i] == 0])
order = []
while queue:
course = queue.popleft()
order.append(course)
for dependent in graph[course]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return order if len(order) == num_courses else []
4.5 Monotonic Queue Applications
Problem 1: Shortest Subarray with Sum at Least K
def shortest_subarray_sum_k(nums, k):
"""
Find shortest subarray with sum >= k
Uses monotonic queue of prefix sums
Time: O(n), Space: O(n)
"""
from collections import deque
n = len(nums)
prefix = [0] * (n + 1)
for i in range(n):
prefix[i + 1] = prefix[i] + nums[i]
dq = deque()
min_len = float('inf')
for i in range(n + 1):
# Check if we can form valid subarray
while dq and prefix[i] - prefix[dq[0]] >= k:
min_len = min(min_len, i - dq.popleft())
# Maintain increasing order
while dq and prefix[i] <= prefix[dq[-1]]:
dq.pop()
dq.append(i)
return min_len if min_len != float('inf') else -1
Problem 2: Constrained Subsequence Sum
def constrained_subset_sum(nums, k):
"""
Maximum sum of subsequence where consecutive elements
are at most k indices apart
Uses monotonic decreasing deque
Time: O(n), Space: O(k)
"""
from collections import deque
n = len(nums)
dp = [0] * n
dp[0] = nums[0]
dq = deque([0]) # Stores indices
for i in range(1, n):
# Remove indices outside window
while dq and dq[0] < i - k:
dq.popleft()
# dp[i] = nums[i] + max(0, max_in_window)
dp[i] = nums[i] + max(0, dp[dq[0]])
# Maintain decreasing order
while dq and dp[dq[-1]] < dp[i]:
dq.pop()
dq.append(i)
return max(dp)
Problem 3: Jump Game VI (Monotonic Queue Application)
def max_result(nums, k):
"""
Maximum score jumping through array
Can jump at most k steps forward
Score = sum of values landed on
Example:
nums = [1,-1,-2,4,-7,3], k = 2
Path: 1 -> -1 -> 4 -> 3 = 7
Approach: DP + Monotonic Decreasing Queue
Time: O(n), Space: O(n)
"""
from collections import deque
n = len(nums)
dp = [float('-inf')] * n
dp[0] = nums[0]
# Monotonic decreasing deque (stores indices)
dq = deque([0])
for i in range(1, n):
# Remove indices outside jump range
while dq and dq[0] < i - k:
dq.popleft()
# Best score to reach i = current value + best in reachable range
dp[i] = nums[i] + dp[dq[0]]
# Maintain decreasing order (remove smaller values)
while dq and dp[dq[-1]] <= dp[i]:
dq.pop()
dq.append(i)
return dp[-1]
def max_result_with_path(nums, k):
"""
Return both maximum score and the actual path
"""
from collections import deque
n = len(nums)
dp = [float('-inf')] * n
parent = [-1] * n # Track path
dp[0] = nums[0]
dq = deque([0])
for i in range(1, n):
while dq and dq[0] < i - k:
dq.popleft()
dp[i] = nums[i] + dp[dq[0]]
parent[i] = dq[0] # Remember where we jumped from
while dq and dp[dq[-1]] <= dp[i]:
dq.pop()
dq.append(i)
# Reconstruct path
path = []
idx = n - 1
while idx != -1:
path.append(idx)
idx = parent[idx]
path.reverse()
return dp[-1], path
# Test and visualize
def test_jump_game():
"""Test jump game with path visualization"""
test_cases = [
([1, -1, -2, 4, -7, 3], 2),
([10, -5, -2, 4, 0, 3], 3),
([1, -5, -20, 4, -1, 3, -6, -3], 2)
]
print("Jump Game VI - Test Cases:")
print("=" * 60)
for i, (nums, k) in enumerate(test_cases, 1):
max_score, path = max_result_with_path(nums, k)
print(f"\nTest {i}:")
print(f" Array: {nums}")
print(f" Max jump: k={k}")
print(f" Maximum score: {max_score}")
print(f" Path (indices): {path}")
print(f" Path (values): {[nums[i] for i in path]}")
print(f" Verification: {sum(nums[i] for i in path)}")
if __name__ == "__main__":
test_jump_game()
Problem 4: Shortest Subarray with Sum at Least K
def shortest_subarray_sum(nums, k):
"""
Shortest subarray with sum >= k
(handles negative numbers)
Uses monotonic increasing deque of prefix sums
Time: O(n), Space: O(n)
"""
from collections import deque
n = len(nums)
prefix = [0] * (n + 1)
# Build prefix sum array
for i in range(n):
prefix[i + 1] = prefix[i] + nums[i]
dq = deque() # Stores indices
min_len = float('inf')
for i in range(n + 1):
# Check if we can form valid subarray
while dq and prefix[i] - prefix[dq[0]] >= k:
min_len = min(min_len, i - dq.popleft())
# Maintain increasing order of prefix sums
# Remove larger prefix sums (they won't give shorter subarrays)
while dq and prefix[i] <= prefix[dq[-1]]:
dq.pop()
dq.append(i)
return min_len if min_len != float('inf') else -1
def shortest_subarray_detailed(nums, k):
"""
Detailed explanation with step-by-step trace
"""
from collections import deque
n = len(nums)
prefix = [0] * (n + 1)
for i in range(n):
prefix[i + 1] = prefix[i] + nums[i]
print(f"Array: {nums}")
print(f"Target sum: {k}")
print(f"Prefix sums: {prefix}")
print()
dq = deque()
min_len = float('inf')
for i in range(n + 1):
print(f"Step {i}: prefix[{i}] = {prefix[i]}")
# Check valid subarrays
while dq and prefix[i] - prefix[dq[0]] >= k:
start = dq.popleft()
length = i - start
print(f" Found subarray [{start}:{i}] with sum {prefix[i] - prefix[start]}, length {length}")
min_len = min(min_len, length)
# Maintain monotonic property
while dq and prefix[i] <= prefix[dq[-1]]:
removed = dq.pop()
print(f" Removed index {removed} (prefix[{removed}] = {prefix[removed]} >= {prefix[i]})")
dq.append(i)
print(f" Deque: {list(dq)}")
print()
result = min_len if min_len != float('inf') else -1
print(f"Result: {result}")
return result
# Example usage
if __name__ == "__main__":
print("Example 1:")
shortest_subarray_detailed([2, -1, 2], 3)
print("\n" + "=" * 60)
print("Example 2:")
shortest_subarray_detailed([1, 2], 4)
4.6 Level-Order Variations
Problem 1: Connect Next Right Pointers
class Node:
def __init__(self, val=0, left=None, right=None, next=None):
self.val = val
self.left = left
self.right = right
self.next = next
def connect_next_right(root):
"""
Connect each node to its next right node at same level
Using BFS with level tracking
"""
if not root:
return None
queue = [root]
while queue:
level_size = len(queue)
for i in range(level_size):
node = queue.pop(0)
# Connect to next node in level (if not last)
if i < level_size - 1:
node.next = queue[0]
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
return root
def connect_constant_space(root):
"""
O(1) space solution using next pointers
"""
if not root:
return None
leftmost = root
while leftmost.left: # Has next level
head = leftmost
while head:
# Connect children
head.left.next = head.right
# Connect across parent nodes
if head.next:
head.right.next = head.next.left
head = head.next
leftmost = leftmost.left
return root
Problem 2: Binary Tree Vertical Order Traversal
def vertical_order(root):
"""
Vertical order traversal using BFS
Nodes at same column in same vertical line
"""
from collections import defaultdict, deque
if not root:
return []
column_table = defaultdict(list)
queue = deque([(root, 0)]) # (node, column)
while queue:
node, col = queue.popleft()
column_table[col].append(node.val)
if node.left:
queue.append((node.left, col - 1))
if node.right:
queue.append((node.right, col + 1))
# Sort by column and return
return [column_table[col] for col in sorted(column_table.keys())]
4.7 Deque Applications
Problem 1: Design Hit Counter
class HitCounter:
"""
Count hits in last 5 minutes using deque
Supports:
- hit(timestamp): Record hit
- getHits(timestamp): Get hits in last 300 seconds
"""
def __init__(self):
from collections import deque
self.hits = deque()
def hit(self, timestamp):
"""O(1) - Record a hit"""
self.hits.append(timestamp)
def get_hits(self, timestamp):
"""O(n) worst case - Count recent hits"""
# Remove hits older than 300 seconds
while self.hits and self.hits[0] <= timestamp - 300:
self.hits.popleft()
return len(self.hits)
# Optimized with bucketing
class HitCounterOptimized:
"""
O(1) hit and getHits using circular buffer
"""
def __init__(self):
self.times = [0] * 300
self.hits = [0] * 300
def hit(self, timestamp):
"""O(1) - Record hit"""
idx = timestamp % 300
if self.times[idx] != timestamp:
self.times[idx] = timestamp
self.hits[idx] = 1
else:
self.hits[idx] += 1
def get_hits(self, timestamp):
"""O(300) = O(1) - Count recent hits"""
total = 0
for i in range(300):
if timestamp - self.times[i] < 300:
total += self.hits[i]
return total
Problem 2: Shortest Subarray to Remove
def find_length_of_shortest_subarray(arr):
"""
Find shortest subarray to remove to make array sorted
Uses deque to track potential boundaries
"""
n = len(arr)
# Find longest non-decreasing prefix
left = 0
while left < n - 1 and arr[left] <= arr[left + 1]:
left += 1
if left == n - 1:
return 0 # Already sorted
# Find longest non-decreasing suffix
right = n - 1
while right > 0 and arr[right - 1] <= arr[right]:
right -= 1
# Remove either prefix or suffix
result = min(n - left - 1, right)
# Try to merge prefix and suffix
i, j = 0, right
while i <= left and j < n:
if arr[i] <= arr[j]:
result = min(result, j - i - 1)
i += 1
else:
j += 1
return result
5. Complexity Analysis
5.1 Time Complexity Analysis
Queue Operations:
| Implementation | Enqueue | Dequeue | Front | Space | | -------------- | -------- | -------- | ----- | ----- | | Simple Array | O(1) | O(n) | O(1) | O(n) | | Array + Front | O(1) | O(1)* | O(1) | O(n) | | Circular Array | O(1) | O(1) | O(1) | O(n) | | Linked List | O(1) | O(1) | O(1) | O(n) | | Priority Queue | O(log n) | O(log n) | O(1) | O(n) | | Deque | O(1) | O(1) | O(1) | O(n) |
*Wasted space grows over time
Algorithm Complexities:
def queue_algorithm_complexities():
"""
Complexity analysis of queue-based algorithms
"""
analyses = {
"BFS Traversal": {
"time": "O(V + E)",
"space": "O(V)",
"explanation": "Visit each vertex once, explore all edges"
},
"Level Order Traversal": {
"time": "O(n)",
"space": "O(w)",
"explanation": "n nodes, max width w in queue"
},
"Sliding Window Maximum": {
"time": "O(n)",
"space": "O(k)",
"explanation": "Each element enqueued/dequeued once"
},
"Priority Queue Operations": {
"insert": "O(log n)",
"extract_min": "O(log n)",
"peek": "O(1)",
"explanation": "Heap operations"
},
"Queue using Stacks": {
"enqueue": "O(1)",
"dequeue": "O(1) amortized",
"explanation": "Each element moved at most twice"
},
"Stack using Queues": {
"push_approach1": "O(n)",
"pop_approach1": "O(1)",
"push_approach2": "O(1)",
"pop_approach2": "O(n)"
},
"Task Scheduling": {
"time": "O(n log n)",
"space": "O(n)",
"explanation": "Priority queue with n tasks"
},
"Topological Sort (BFS)": {
"time": "O(V + E)",
"space": "O(V)",
"explanation": "Process each vertex and edge once"
}
}
return analyses
5.2 Space Complexity Analysis
Queue Space Usage:
def space_complexity_analysis():
"""
Space complexity for different queue types
"""
analysis = {
"Array-Based Queue": {
"space": "O(n)",
"overhead": "None (just data)",
"issue": "Wasted space if not circular"
},
"Circular Queue": {
"space": "O(capacity)",
"overhead": "2 integers (front, rear)",
"advantage": "No wasted space"
},
"Linked Queue": {
"space": "O(n)",
"overhead": "1 pointer per node + 2 pointers (front, rear)",
"total": "~2n pointers"
},
"Priority Queue (Heap)": {
"space": "O(n)",
"overhead": "Array storage",
"note": "Compact array representation"
},
"Deque": {
"space": "O(n)",
"overhead": "Circular buffer or doubly-linked list",
"implementation_dependent": True
},
"Two Stack Queue": {
"space": "O(n)",
"overhead": "Two arrays for stacks",
"total": "~n elements distributed"
}
}
return analysis
BFS Space Analysis:
def bfs_space_analysis():
"""
Space complexity in different BFS scenarios
"""
scenarios = {
"Binary Tree BFS": {
"worst_case": "O(w) where w = max width",
"complete_tree": "O(2^h) where h = height",
"example": "Height 3 → width 8"
},
"Graph BFS": {
"queue_space": "O(V)",
"visited_space": "O(V)",
"total": "O(V)"
},
"Grid BFS": {
"queue_space": "O(m*n)",
"visited_space": "O(m*n)",
"example": "100x100 grid → up to 10,000"
},
"Multi-Source BFS": {
"initial_queue": "O(k) where k = sources",
"max_queue": "O(V)",
"example": "Rotting oranges with k rotten"
}
}
return scenarios
5.3 Amortized Analysis
Queue Using Two Stacks:
def amortized_analysis_two_stacks():
"""
Prove O(1) amortized dequeue for two-stack queue
Key insight: Each element moved at most twice
- Once from input to output
- Once removed from output
"""
proof = """
Two-Stack Queue Amortized Analysis:
Operations over n enqueues and dequeues:
Enqueue: Always O(1)
- Simply push to input stack
- Total enqueue cost: n × O(1) = O(n)
Dequeue:
- If output stack non-empty: O(1) pop
- If output stack empty: Transfer all from input (O(k))
Cost Analysis:
- Element pushed to input: 1 operation
- Element moved to output: 1 operation
- Element popped from output: 1 operation
- Total per element: 3 operations
For n operations:
- Total work: 3n
- Amortized per operation: 3n/n = O(1)
Therefore: Amortized O(1) dequeue
"""
# Accounting method
accounting = """
Assign 3 credits per enqueue:
- 1 credit: Push to input stack
- 1 credit: Future move to output stack
- 1 credit: Future pop from output stack
Dequeue:
- Use saved credits
- No additional cost beyond saved credits
Invariant: Always enough credits for all operations
"""
return proof, accounting
Dynamic Array Queue:
def dynamic_resize_analysis():
"""
Amortized analysis of dynamic circular queue
"""
analysis = """
Dynamic Circular Queue Resize:
Doubling strategy:
- Resize when full: Copy n elements
- New capacity: 2n
Sequence of n insertions:
- Regular inserts: n × 1 = n
- Resize costs: 1 + 2 + 4 + 8 + ... + n = 2n - 1
Total cost: n + (2n - 1) = 3n - 1
Amortized: (3n - 1)/n ≈ 3 = O(1)
Potential method:
Φ(Q) = 2 × size - capacity
- After resize: Φ = 2n - 2n = 0
- Before next resize: Φ = 2(2n) - 2n = 2n
- Amortized cost = actual + ΔΦ = O(1)
"""
return analysis
5.4 Comparative Analysis
def compare_queue_implementations():
"""
Compare different queue implementations
"""
comparison = {
"Operation": ["Enqueue", "Dequeue", "Space", "Pros", "Cons"],
"Simple Array": {
"Enqueue": "O(1)",
"Dequeue": "O(n)",
"Space": "O(n)",
"Pros": "Simple, cache-friendly",
"Cons": "Slow dequeue"
},
"Circular Array": {
"Enqueue": "O(1)",
"Dequeue": "O(1)",
"Space": "O(capacity)",
"Pros": "Fast, no wasted space",
"Cons": "Fixed size"
},
"Linked List": {
"Enqueue": "O(1)",
"Dequeue": "O(1)",
"Space": "O(n) + pointers",
"Pros": "Dynamic, true O(1)",
"Cons": "Pointer overhead, poor cache"
},
"Python collections.deque": {
"Enqueue": "O(1)",
"Dequeue": "O(1)",
"Space": "O(n)",
"Pros": "Optimized, both ends",
"Cons": "None (best choice)"
},
"Priority Queue": {
"Enqueue": "O(log n)",
"Dequeue": "O(log n)",
"Space": "O(n)",
"Pros": "Ordered retrieval",
"Cons": "Slower than simple queue"
}
}
return comparison
5.5 Benchmark Results
import time
import random
from collections import deque
def benchmark_queue_implementations():
"""
Benchmark different queue implementations
"""
def benchmark_operations(queue_impl, n):
"""Measure enqueue and dequeue performance"""
start = time.time()
# Enqueue n elements
for i in range(n):
queue_impl.enqueue(i)
# Dequeue n elements
for i in range(n):
queue_impl.dequeue()
return time.time() - start
results = {}
sizes = [1000, 10000, 100000]
for size in sizes:
# Test different implementations
simple_array_q = SimpleArrayQueue()
linked_q = LinkedQueue()
circular_q = CircularQueue(size)
collections_dq = deque()
results[size] = {
"simple_array": benchmark_operations(simple_array_q, size),
"linked_list": benchmark_operations(linked_q, size),
"circular_array": benchmark_operations(circular_q, size),
"collections_deque": time_deque(collections_dq, size)
}
return results
def time_deque(dq, n):
"""Time collections.deque"""
start = time.time()
for i in range(n):
dq.append(i)
for i in range(n):
dq.popleft()
return time.time() - start
# Typical results (approximate):
# Size: 1000
# Simple Array: 0.05s (O(n) dequeue)
# Linked List: 0.001s
# Circular Array: 0.0008s
# collections.deque: 0.0005s (fastest)
#
# Size: 100000
# Simple Array: 50s (quadratic!)
# Linked List: 0.1s
# Circular Array: 0.08s
# collections.deque: 0.05s
# Conclusion: collections.deque is fastest for general use
6. Optimization and Trade-offs
6.1 Implementation Trade-offs
Array vs Linked List Queue:
class QueueImplementationTradeoffs:
"""
Detailed comparison of queue implementations
"""
@staticmethod
def array_based_analysis():
return {
"Pros": [
"Cache-friendly (contiguous memory)",
"No pointer overhead",
"Better for small fixed-size queues",
"Simple index arithmetic"
],
"Cons": [
"Fixed capacity (or resize overhead)",
"Wasted space in linear variant",
"Circular indexing can be complex"
],
"Best For": [
"Known maximum size",
"Performance-critical applications",
"Embedded systems"
]
}
@staticmethod
def linked_list_analysis():
return {
"Pros": [
"True dynamic size",
"No resize overhead",
"Guaranteed O(1) operations",
"No wasted space"
],
"Cons": [
"Pointer overhead (~2x memory)",
"Poor cache locality",
"Memory fragmentation",
"Slower than array in practice"
],
"Best For": [
"Unpredictable size",
"Very large queues",
"Memory fragmented environment"
]
}
# Hybrid approach
class HybridQueue:
"""
Combine benefits of both implementations
Use array for small queues, linked list for large
"""
def __init__(self, threshold=1000):
self.threshold = threshold
self.array_queue = []
self.linked_queue = None
self.using_array = True
def enqueue(self, item):
"""Switch to linked list if threshold exceeded"""
if self.using_array:
self.array_queue.append(item)
if len(self.array_queue) > self.threshold:
self._switch_to_linked()
else:
self.linked_queue.enqueue(item)
def _switch_to_linked(self):
"""Migrate from array to linked list"""
self.linked_queue = LinkedQueue()
for item in self.array_queue:
self.linked_queue.enqueue(item)
self.array_queue = None
self.using_array = False
def dequeue(self):
if self.using_array:
if not self.array_queue:
raise IndexError("Queue is empty")
return self.array_queue.pop(0)
else:
return self.linked_queue.dequeue()
6.2 Space Optimization Techniques
Circular Queue Optimization:
class SpaceOptimizedCircularQueue:
"""
Circular queue with dynamic resizing and shrinking
"""
def __init__(self, initial_capacity=4):
self.items = [None] * initial_capacity
self.capacity = initial_capacity
self.front = 0
self.rear = -1
self.size = 0
self.load_factor_high = 0.75
self.load_factor_low = 0.25
def enqueue(self, item):
"""Resize up if load factor exceeded"""
if self.size >= self.capacity * self.load_factor_high:
self._resize(self.capacity * 2)
self.rear = (self.rear + 1) % self.capacity
self.items[self.rear] = item
self.size += 1
def dequeue(self):
"""Resize down if load factor too low"""
if self.size == 0:
raise IndexError("Queue is empty")
item = self.items[self.front]
self.items[self.front] = None
self.front = (self.front + 1) % self.capacity
self.size -= 1
# Shrink if using less than 25% capacity
if (self.size > 0 and
self.size <= self.capacity * self.load_factor_low and
self.capacity > 4):
self._resize(self.capacity // 2)
return item
def _resize(self, new_capacity):
"""Resize and reorganize elements"""
new_items = [None] * new_capacity
# Copy elements in order
for i in range(self.size):
new_items[i] = self.items[(self.front + i) % self.capacity]
self.items = new_items
self.front = 0
self.rear = self.size - 1
self.capacity = new_capacity
Memory-Efficient Priority Queue:
class CompactPriorityQueue:
"""
Priority queue optimized for memory
Use single array with tuples vs separate arrays
"""
def __init__(self):
self.heap = [] # Single array
def enqueue(self, item, priority):
"""Compact storage with tuples"""
import heapq
heapq.heappush(self.heap, (priority, item))
def dequeue(self):
"""Extract minimum"""
import heapq
if not self.heap:
raise IndexError("Queue is empty")
return heapq.heappop(self.heap)[1]
# Compare with less efficient version
class VerbosePriorityQueue:
"""Less memory efficient - separate arrays"""
def __init__(self):
self.priorities = []
self.items = []
# This wastes space with parallel arrays
6.3 Performance Optimizations
Batch Operations:
class BatchQueue:
"""
Queue optimized for batch operations
Enqueue/dequeue multiple items efficiently
"""
def __init__(self):
self.items = []
def enqueue_batch(self, items):
"""O(k) for k items vs k×O(1)"""
self.items.extend(items)
def dequeue_batch(self, count):
"""Dequeue multiple items at once"""
if count > len(self.items):
count = len(self.items)
result = self.items[:count]
self.items = self.items[count:]
return result
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
if not self.items:
raise IndexError("Queue is empty")
return self.items.pop(0)
Lock-Free Queue for Concurrency:
import threading
from threading import Lock
class LockFreeQueue:
"""
Lock-free queue using CAS (Compare-And-Swap)
Better performance under high concurrency
"""
class Node:
def __init__(self, value):
self.value = value
self.next = None
def __init__(self):
dummy = self.Node(None)
self.head = dummy
self.tail = dummy
self._lock = Lock() # Simulating atomic operations
def enqueue(self, value):
"""Lock-free enqueue"""
new_node = self.Node(value)
while True:
tail = self.tail
tail_next = tail.next
if tail == self.tail: # Still valid
if tail_next is None:
# Try to link new node
if self._compare_and_swap_next(tail, None, new_node):
# Try to swing tail
self._compare_and_swap_tail(tail, new_node)
return
else:
# Help other thread
self._compare_and_swap_tail(tail, tail_next)
def dequeue(self):
"""Lock-free dequeue"""
while True:
head = self.head
tail = self.tail
head_next = head.next
if head == self.head:
if head == tail:
if head_next is None:
raise IndexError("Queue is empty")
self._compare_and_swap_tail(tail, head_next)
else:
value = head_next.value
if self._compare_and_swap_head(head, head_next):
return value
def _compare_and_swap_next(self, node, expected, new_value):
"""Atomic CAS on next pointer"""
with self._lock:
if node.next == expected:
node.next = new_value
return True
return False
def _compare_and_swap_head(self, expected, new_value):
"""Atomic CAS on head"""
with self._lock:
if self.head == expected:
self.head = new_value
return True
return False
def _compare_and_swap_tail(self, expected, new_value):
"""Atomic CAS on tail"""
with self._lock:
if self.tail == expected:
self.tail = new_value
return True
return False
6.4 Algorithm-Specific Optimizations
Optimized BFS:
def optimized_bfs_bidirectional(graph, start, end):
"""
Bidirectional BFS for faster pathfinding
Search from both start and end simultaneously
Time: O(b^(d/2)) vs O(b^d) for unidirectional
"""
if start == end:
return [start]
# Two queues: forward and backward
forward_queue = [(start, [start])]
backward_queue = [(end, [end])]
forward_visited = {start: [start]}
backward_visited = {end: [end]}
while forward_queue and backward_queue:
# Expand smaller frontier
if len(forward_queue) <= len(backward_queue):
path = _bfs_step(graph, forward_queue, forward_visited,
backward_visited, forward=True)
else:
path = _bfs_step(graph, backward_queue, backward_visited,
forward_visited, forward=False)
if path:
return path
return None
def _bfs_step(graph, queue, visited, other_visited, forward):
"""Single BFS step"""
node, path = queue.pop(0)
for neighbor in graph.get(node, []):
if neighbor in other_visited:
# Found intersection!
if forward:
return path + other_visited[neighbor][::-1]
else:
return other_visited[neighbor] + path[::-1]
if neighbor not in visited:
new_path = path + [neighbor]
visited[neighbor] = new_path
queue.append((neighbor, new_path))
return None
Optimized Sliding Window:
def optimized_sliding_window_max(nums, k):
"""
Optimized sliding window with early termination
"""
from collections import deque
if not nums or k == 0:
return []
# Special cases
if k == 1:
return nums
if k >= len(nums):
return [max(nums)]
dq = deque()
result = []
for i in range(len(nums)):
# Remove outside window
if dq and dq[0] <= i - k:
dq.popleft()
# Maintain decreasing order
while dq and nums[dq[-1]] < nums[i]:
dq.pop()
dq.append(i)
if i >= k - 1:
result.append(nums[dq[0]])
return result
7. Comparative Analysis
7.1 Queue vs Stack
Fundamental Differences:
| Aspect | Queue (FIFO) | Stack (LIFO) | | ------------- | ------------------------------- | --------------------- | | Ordering | First-In-First-Out | Last-In-First-Out | | Operations | Enqueue (rear), Dequeue (front) | Push (top), Pop (top) | | Access Points | Two ends (front & rear) | One end (top) | | Use Cases | BFS, scheduling, buffering | DFS, undo, recursion | | Real-World | Line at store | Stack of plates |
# Queue: FIFO
queue = []
queue.append(1) # Enqueue
queue.append(2)
queue.append(3)
queue.pop(0) # Dequeue: 1 (first in, first out)
# Stack: LIFO
stack = []
stack.append(1) # Push
stack.append(2)
stack.append(3)
stack.pop() # Pop: 3 (last in, first out)
7.2 Different Queue Variants Comparison
def compare_queue_variants():
"""Compare queue variants"""
return {
"Simple Queue": {
"structure": "Front → [1][2][3] ← Rear",
"operations": "Enqueue at rear, dequeue at front",
"use_case": "General FIFO processing"
},
"Circular Queue": {
"structure": "Wrap-around array",
"advantage": "Efficient space usage",
"use_case": "Fixed-size buffering"
},
"Priority Queue": {
"structure": "Heap-based",
"ordering": "By priority, not arrival",
"use_case": "Task scheduling, Dijkstra's"
},
"Deque": {
"structure": "Front ⇄ [1][2][3] ⇄ Rear",
"operations": "Both ends accessible",
"use_case": "Sliding window, palindrome"
},
"Monotonic Queue": {
"structure": "Maintains order",
"constraint": "Increasing or decreasing",
"use_case": "Sliding window max/min"
}
}
7.3 BFS vs DFS
# BFS uses Queue
def bfs(graph, start):
"""Level-by-level exploration"""
queue = [start]
visited = {start}
while queue:
node = queue.pop(0)
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
# DFS uses Stack (or recursion)
def dfs(graph, start):
"""Depth-first exploration"""
stack = [start]
visited = {start}
while stack:
node = stack.pop()
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor)
stack.append(neighbor)
8. Edge Cases and Limitations
8.1 Common Edge Cases
def queue_edge_cases():
"""Handle queue edge cases"""
# Empty queue
queue = []
try:
queue.pop(0) # Error!
except IndexError:
print("Cannot dequeue from empty queue")
# Single element
queue = [1]
assert queue.pop(0) == 1
assert len(queue) == 0
# Full queue (bounded)
bounded_q = CircularQueue(capacity=3)
bounded_q.enqueue(1)
bounded_q.enqueue(2)
bounded_q.enqueue(3)
# bounded_q.enqueue(4) # Would raise OverflowError
# Priority queue with equal priorities
import heapq
pq = []
heapq.heappush(pq, (1, 'a'))
heapq.heappush(pq, (1, 'b'))
# Order of 'a' and 'b' is undefined
9. Practical Mastery
This section provides complete, production-ready implementations for all 10 queue topics with extensive comments, test cases, and real-world examples.
9.1 Topic 48: Queue Data Structure (Array and Linked List Implementation)
Complete Array-Based Queue Implementation:
class ArrayQueue:
"""
Production-ready array-based queue with circular buffer
Features:
- Amortized O(1) enqueue and dequeue
- Dynamic resizing
- Memory efficient with circular indexing
- Comprehensive error handling
"""
def __init__(self, capacity=10):
"""
Initialize queue with given capacity
Args:
capacity: Initial capacity (default 10)
"""
self._data = [None] * capacity
self._front = 0
self._size = 0
self._capacity = capacity
def __len__(self):
"""Return number of elements in queue"""
return self._size
def is_empty(self):
"""Check if queue is empty - O(1)"""
return self._size == 0
def is_full(self):
"""Check if queue is at capacity - O(1)"""
return self._size == self._capacity
def enqueue(self, item):
"""
Add item to rear of queue - Amortized O(1)
Args:
item: Item to add
Raises:
OverflowError: If queue is full (before resize)
"""
if self.is_full():
self._resize(2 * self._capacity) # Double capacity
# Calculate rear position using circular indexing
rear = (self._front + self._size) % self._capacity
self._data[rear] = item
self._size += 1
def dequeue(self):
"""
Remove and return front item - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self._data[self._front]
self._data[self._front] = None # Help garbage collection
self._front = (self._front + 1) % self._capacity
self._size -= 1
# Shrink if utilizing less than 25%
if 0 < self._size < self._capacity // 4:
self._resize(self._capacity // 2)
return item
def front(self):
"""
View front item without removing - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._data[self._front]
def _resize(self, new_capacity):
"""
Resize internal array - O(n)
Args:
new_capacity: New capacity for internal array
"""
old_data = self._data
self._data = [None] * new_capacity
# Copy elements to new array starting at index 0
for i in range(self._size):
self._data[i] = old_data[(self._front + i) % self._capacity]
self._front = 0
self._capacity = new_capacity
def __str__(self):
"""String representation"""
if self.is_empty():
return "Queue([])"
items = []
for i in range(self._size):
items.append(str(self._data[(self._front + i) % self._capacity]))
return f"Queue([{' <- '.join(items)}])"
def __repr__(self):
"""Detailed representation"""
return (f"ArrayQueue(size={self._size}, capacity={self._capacity}, "
f"front={self._front})")
# Comprehensive test cases
def test_array_queue():
"""Test array-based queue implementation"""
print("Testing ArrayQueue...")
# Test 1: Basic operations
q = ArrayQueue(5)
assert q.is_empty()
# Enqueue elements
for i in range(1, 6):
q.enqueue(i)
print(f"After enqueuing 1-5: {q}")
# Test 2: Dequeue
assert q.dequeue() == 1
assert q.front() == 2
print(f"After dequeue: {q}")
# Test 3: Wrap-around (circular)
q.enqueue(6)
q.enqueue(7) # Should trigger resize
print(f"After enqueuing 6,7: {q}")
# Test 4: Empty queue
while not q.is_empty():
q.dequeue()
assert q.is_empty()
print("Queue emptied successfully")
# Test 5: Error handling
try:
q.dequeue()
assert False, "Should raise IndexError"
except IndexError:
print("✓ Correctly raised IndexError on empty dequeue")
print("All ArrayQueue tests passed!\n")
**Complete Linked List-Based Queue Implementation:**
```python
class Node:
"""Node for linked list"""
__slots__ = ['data', 'next'] # Memory optimization
def __init__(self, data):
self.data = data
self.next = None
class LinkedListQueue:
"""
Production-ready linked list-based queue
Features:
- True O(1) enqueue and dequeue (no resizing)
- No capacity limit (until memory exhausted)
- Efficient for unknown size queues
- Front and rear pointers for O(1) operations
"""
def __init__(self):
"""Initialize empty queue"""
self._front = None
self._rear = None
self._size = 0
def __len__(self):
"""Return number of elements - O(1)"""
return self._size
def is_empty(self):
"""Check if queue is empty - O(1)"""
return self._front is None
def enqueue(self, item):
"""
Add item to rear - O(1)
Args:
item: Item to add
"""
new_node = Node(item)
if self.is_empty():
# First element
self._front = new_node
self._rear = new_node
else:
# Link to rear and update rear pointer
self._rear.next = new_node
self._rear = new_node
self._size += 1
def dequeue(self):
"""
Remove and return front item - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self._front.data
self._front = self._front.next
self._size -= 1
# If queue becomes empty, update rear
if self.is_empty():
self._rear = None
return item
def front(self):
"""
View front item - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._front.data
def rear(self):
"""
View rear item - O(1)
Returns:
Rear item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._rear.data
def clear(self):
"""Clear all elements - O(1)"""
self._front = None
self._rear = None
self._size = 0
def to_list(self):
"""Convert to list - O(n)"""
result = []
current = self._front
while current:
result.append(current.data)
current = current.next
return result
def __str__(self):
"""String representation - O(n)"""
if self.is_empty():
return "Queue([])"
return f"Queue({self.to_list()})"
def __repr__(self):
"""Detailed representation"""
return f"LinkedListQueue(size={self._size})"
def __iter__(self):
"""Iterator support - O(n)"""
current = self._front
while current:
yield current.data
current = current.next
# Comprehensive test cases
def test_linked_list_queue():
"""Test linked list-based queue"""
print("Testing LinkedListQueue...")
# Test 1: Basic operations
q = LinkedListQueue()
assert q.is_empty()
# Enqueue elements
for i in range(1, 6):
q.enqueue(i)
print(f"After enqueuing 1-5: {q}")
assert len(q) == 5
# Test 2: Front and rear
assert q.front() == 1
assert q.rear() == 5
# Test 3: Dequeue
assert q.dequeue() == 1
assert q.dequeue() == 2
print(f"After 2 dequeues: {q}")
assert len(q) == 3
# Test 4: Iterator
result = list(q)
assert result == [3, 4, 5]
print(f"Iterator test: {result}")
# Test 5: Clear
q.clear()
assert q.is_empty()
print("Cleared queue")
# Test 6: Large queue (no capacity limit)
for i in range(10000):
q.enqueue(i)
assert len(q) == 10000
print("✓ Successfully handled 10,000 elements")
print("All LinkedListQueue tests passed!\n")
**Performance Comparison:**
```python
def compare_queue_implementations():
"""
Compare array vs linked list queue performance
"""
import time
print("Performance Comparison: Array vs Linked List Queue")
print("=" * 60)
n = 100000
# Test Array Queue
start = time.time()
aq = ArrayQueue()
for i in range(n):
aq.enqueue(i)
for i in range(n):
aq.dequeue()
array_time = time.time() - start
# Test Linked List Queue
start = time.time()
llq = LinkedListQueue()
for i in range(n):
llq.enqueue(i)
for i in range(n):
llq.dequeue()
ll_time = time.time() - start
print(f"\n{n} enqueue + {n} dequeue operations:")
print(f" Array Queue: {array_time:.4f} seconds")
print(f" Linked List Queue: {ll_time:.4f} seconds")
print(f" Speedup: {ll_time/array_time:.2f}x")
print("\nMemory Characteristics:")
print(" Array Queue:")
print(" - Contiguous memory (cache-friendly)")
print(" - Overhead: resizing operations")
print(" - Best for: Known/bounded size")
print("\n Linked List Queue:")
print(" - Scattered memory (pointer overhead)")
print(" - No resizing needed")
print(" - Best for: Unknown size, many operations")
# Run all tests
if __name__ == "__main__":
test_array_queue()
test_linked_list_queue()
compare_queue_implementations()
9.2 Topic 49: Circular Queue Implementation
Complete Production-Ready Circular Queue:
class CircularQueue:
"""
Production-ready circular queue implementation
Features:
- Fixed capacity (no resizing)
- True O(1) operations
- Space-efficient (reuses space)
- Proper full/empty distinction
Implementation details:
- Uses front and rear pointers
- Distinguishes full from empty using size counter
- Modulo arithmetic for wrap-around
"""
def __init__(self, capacity):
"""
Initialize circular queue
Args:
capacity: Maximum number of elements
"""
if capacity <= 0:
raise ValueError("Capacity must be positive")
self._data = [None] * capacity
self._capacity = capacity
self._front = 0
self._rear = -1
self._size = 0
def __len__(self):
"""Return number of elements"""
return self._size
def capacity(self):
"""Return maximum capacity"""
return self._capacity
def is_empty(self):
"""Check if empty - O(1)"""
return self._size == 0
def is_full(self):
"""Check if full - O(1)"""
return self._size == self._capacity
def enqueue(self, item):
"""
Add item to rear - O(1)
Args:
item: Item to add
Raises:
OverflowError: If queue is full
"""
if self.is_full():
raise OverflowError(f"Queue is full (capacity: {self._capacity})")
# Move rear pointer circularly
self._rear = (self._rear + 1) % self._capacity
self._data[self._rear] = item
self._size += 1
def dequeue(self):
"""
Remove and return front item - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
item = self._data[self._front]
self._data[self._front] = None # Help GC
self._front = (self._front + 1) % self._capacity
self._size -= 1
return item
def front(self):
"""
View front item - O(1)
Returns:
Front item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._data[self._front]
def rear(self):
"""
View rear item - O(1)
Returns:
Rear item
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._data[self._rear]
def clear(self):
"""Clear all elements - O(n)"""
for i in range(self._capacity):
self._data[i] = None
self._front = 0
self._rear = -1
self._size = 0
def to_list(self):
"""
Convert to list in FIFO order - O(n)
Returns:
List of elements from front to rear
"""
if self.is_empty():
return []
result = []
for i in range(self._size):
idx = (self._front + i) % self._capacity
result.append(self._data[idx])
return result
def visualize(self):
"""
Visualize circular queue state
Shows internal array with front/rear pointers
"""
print(f"\nCircular Queue (size={self._size}/{self._capacity}):")
print("Array:", end=" ")
for i in range(self._capacity):
marker = ""
if i == self._front and i == self._rear and self._size > 0:
marker = " F,R"
elif i == self._front:
marker = " F"
elif i == self._rear:
marker = " R"
val = self._data[i] if self._data[i] is not None else "_"
print(f"[{val}]{marker}", end=" ")
print()
def __str__(self):
"""String representation"""
return f"CircularQueue({self.to_list()})"
def __repr__(self):
"""Detailed representation"""
return (f"CircularQueue(size={self._size}, capacity={self._capacity}, "
f"front={self._front}, rear={self._rear})")
# Comprehensive test cases
def test_circular_queue():
"""Test circular queue with various scenarios"""
print("Testing CircularQueue...")
print("=" * 60)
# Test 1: Basic operations
print("\nTest 1: Basic operations")
q = CircularQueue(5)
for i in range(1, 4):
q.enqueue(i)
q.visualize()
# Test 2: Wrap-around
print("\nTest 2: Wrap-around behavior")
q.dequeue()
q.dequeue()
q.enqueue(4)
q.enqueue(5)
q.enqueue(6) # This should wrap around
q.visualize()
# Test 3: Full queue
print("\nTest 3: Full queue")
try:
q.enqueue(7)
assert False, "Should raise OverflowError"
except OverflowError as e:
print(f"✓ Correctly raised: {e}")
# Test 4: Empty queue
print("\nTest 4: Empty queue")
while not q.is_empty():
q.dequeue()
q.visualize()
try:
q.dequeue()
assert False, "Should raise IndexError"
except IndexError:
print("✓ Correctly raised IndexError")
# Test 5: Multiple wrap-arounds
print("\nTest 5: Multiple wrap-arounds")
for cycle in range(3):
print(f"\n Cycle {cycle + 1}:")
for i in range(5):
q.enqueue(f"C{cycle}:{i}")
for i in range(3):
q.dequeue()
q.visualize()
print("\nAll CircularQueue tests passed!")
**Real-World Application: Ring Buffer for Audio Processing:**
```python
class AudioRingBuffer(CircularQueue):
"""
Ring buffer for audio streaming
Use case: Audio playback with circular buffer
Producer fills buffer, consumer plays audio
"""
def __init__(self, buffer_size=1024):
super().__init__(buffer_size)
self.underrun_count = 0
self.overrun_count = 0
def write_samples(self, samples):
"""
Write audio samples to buffer
Args:
samples: List of audio samples
Returns:
Number of samples successfully written
"""
written = 0
for sample in samples:
try:
self.enqueue(sample)
written += 1
except OverflowError:
self.overrun_count += 1
break
return written
def read_samples(self, count):
"""
Read audio samples from buffer
Args:
count: Number of samples to read
Returns:
List of samples (may be fewer than requested if underrun)
"""
samples = []
for _ in range(count):
try:
samples.append(self.dequeue())
except IndexError:
self.underrun_count += 1
samples.append(0) # Silence on underrun
return samples
def get_statistics(self):
"""Get buffer performance statistics"""
return {
'size': len(self),
'capacity': self.capacity(),
'utilization': len(self) / self.capacity(),
'underruns': self.underrun_count,
'overruns': self.overrun_count
}
def demo_audio_buffer():
"""Demonstrate audio ring buffer"""
print("\nAudio Ring Buffer Demo:")
print("=" * 60)
buffer = AudioRingBuffer(buffer_size=10)
# Simulate producer-consumer
print("\nSimulating audio streaming...")
# Producer writes samples
samples_in = [0.1, 0.2, 0.3, 0.4, 0.5]
written = buffer.write_samples(samples_in)
print(f"Producer wrote {written} samples")
# Consumer reads samples
samples_out = buffer.read_samples(3)
print(f"Consumer read: {samples_out}")
# Producer writes more
written = buffer.write_samples([0.6, 0.7, 0.8])
print(f"Producer wrote {written} more samples")
# Check statistics
stats = buffer.get_statistics()
print(f"\nBuffer Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
if __name__ == "__main__":
test_circular_queue()
demo_audio_buffer()
9.3 Topic 50: Priority Queue and Heap-based Implementation
Complete Priority Queue Implementation:
import heapq
class PriorityQueue:
"""
Production-ready priority queue using binary heap
Features:
- Min-heap by default (lowest priority first)
- O(log n) insert and extract
- O(1) peek
- Support for custom priorities
- Tie-breaking with insertion order
"""
def __init__(self, max_heap=False):
"""
Initialize priority queue
Args:
max_heap: If True, create max-heap (highest priority first)
"""
self._heap = []
self._counter = 0 # For tie-breaking
self._max_heap = max_heap
def __len__(self):
"""Return number of elements"""
return len(self._heap)
def is_empty(self):
"""Check if empty - O(1)"""
return len(self._heap) == 0
def push(self, item, priority=None):
"""
Add item with priority - O(log n)
Args:
item: Item to add
priority: Priority value (lower = higher priority for min-heap)
If None, item itself is used as priority
"""
if priority is None:
priority = item
# For max-heap, negate priority
if self._max_heap:
priority = -priority
# Use counter for stable ordering (FIFO for equal priorities)
entry = (priority, self._counter, item)
heapq.heappush(self._heap, entry)
self._counter += 1
def pop(self):
"""
Remove and return highest priority item - O(log n)
Returns:
Item with highest priority
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = heapq.heappop(self._heap)
return item
def peek(self):
"""
View highest priority item - O(1)
Returns:
Item with highest priority
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = self._heap[0]
return item
def peek_with_priority(self):
"""
View highest priority item and its priority - O(1)
Returns:
Tuple of (item, priority)
"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = self._heap[0]
if self._max_heap:
priority = -priority
return item, priority
def update_priority(self, old_item, new_priority):
"""
Update priority of an item - O(n log n)
Note: Inefficient, requires rebuilding heap
For frequent updates, use indexed priority queue
"""
# Remove old entry
self._heap = [(p, c, i) for p, c, i in self._heap if i != old_item]
heapq.heapify(self._heap)
# Add with new priority
self.push(old_item, new_priority)
def clear(self):
"""Clear all elements - O(1)"""
self._heap = []
self._counter = 0
def to_sorted_list(self):
"""
Return elements in priority order - O(n log n)
Note: Consumes the queue
"""
result = []
while not self.is_empty():
result.append(self.pop())
return result
def __str__(self):
"""String representation"""
if self.is_empty():
return "PriorityQueue([])"
# Show top 5 items
preview = [self._heap[i][2] for i in range(min(5, len(self._heap)))]
return f"PriorityQueue({preview}{'...' if len(self._heap) > 5 else ''})"
# Advanced: Priority Queue with Value Update
class IndexedPriorityQueue:
"""
Priority queue with efficient priority updates
Uses dictionary to track item positions for O(log n) updates
"""
def __init__(self):
self._heap = []
self._item_to_index = {} # Track positions
self._counter = 0
def push(self, item, priority):
"""Add or update item - O(log n)"""
if item in self._item_to_index:
# Update existing
self.update_priority(item, priority)
else:
# Add new
entry = [priority, self._counter, item]
self._item_to_index[item] = len(self._heap)
self._heap.append(entry)
self._counter += 1
self._sift_up(len(self._heap) - 1)
def pop(self):
"""Remove and return min item - O(log n)"""
if not self._heap:
raise IndexError("Priority queue is empty")
# Swap first and last
self._swap(0, len(self._heap) - 1)
# Remove last (which is now min)
priority, _, item = self._heap.pop()
del self._item_to_index[item]
# Restore heap property
if self._heap:
self._sift_down(0)
return item
def update_priority(self, item, new_priority):
"""Update item's priority - O(log n)"""
if item not in self._item_to_index:
raise KeyError(f"Item {item} not in queue")
index = self._item_to_index[item]
old_priority = self._heap[index][0]
self._heap[index][0] = new_priority
# Restore heap property
if new_priority < old_priority:
self._sift_up(index)
else:
self._sift_down(index)
def _sift_up(self, index):
"""Bubble element up - O(log n)"""
while index > 0:
parent = (index - 1) // 2
if self._heap[index][0] >= self._heap[parent][0]:
break
self._swap(index, parent)
index = parent
def _sift_down(self, index):
"""Bubble element down - O(log n)"""
while True:
smallest = index
left = 2 * index + 1
right = 2 * index + 2
if (left < len(self._heap) and
self._heap[left][0] < self._heap[smallest][0]):
smallest = left
if (right < len(self._heap) and
self._heap[right][0] < self._heap[smallest][0]):
smallest = right
if smallest == index:
break
self._swap(index, smallest)
index = smallest
def _swap(self, i, j):
"""Swap two elements and update index map"""
self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
self._item_to_index[self._heap[i][2]] = i
self._item_to_index[self._heap[j][2]] = j
# Test cases and examples
def test_priority_queue():
"""Test priority queue implementations"""
print("Testing PriorityQueue...")
print("=" * 60)
# Test 1: Min-heap (default)
print("\nTest 1: Min-Heap Priority Queue")
pq = PriorityQueue()
# Add tasks with priorities
pq.push("Low priority task", priority=10)
pq.push("High priority task", priority=1)
pq.push("Medium priority task", priority=5)
print("Processing tasks by priority:")
while not pq.is_empty():
task, priority = pq.peek_with_priority()
print(f" Priority {priority}: {pq.pop()}")
# Test 2: Max-heap
print("\nTest 2: Max-Heap Priority Queue")
max_pq = PriorityQueue(max_heap=True)
scores = [(85, "Alice"), (92, "Bob"), (78, "Charlie"), (95, "David")]
for score, name in scores:
max_pq.push(name, priority=score)
print("Top scorers:")
for i in range(3):
name, score = max_pq.peek_with_priority()
print(f" {i+1}. {max_pq.pop()} - Score: {score}")
# Test 3: Stable ordering (FIFO for equal priorities)
print("\nTest 3: Stable Ordering")
stable_pq = PriorityQueue()
for i in range(5):
stable_pq.push(f"Task {i}", priority=1) # All same priority
print("Equal priority tasks (should be FIFO):")
while not stable_pq.is_empty():
print(f" {stable_pq.pop()}")
print("\nAll PriorityQueue tests passed!")
def demo_task_scheduler():
"""Real-world example: Task scheduler with priorities"""
print("\nTask Scheduler Demo:")
print("=" * 60)
class Task:
def __init__(self, name, priority, duration):
self.name = name
self.priority = priority
self.duration = duration
def __repr__(self):
return f"Task({self.name}, P{self.priority}, {self.duration}s)"
scheduler = PriorityQueue()
# Add tasks
tasks = [
Task("Critical bug fix", 1, 30),
Task("Code review", 3, 15),
Task("Documentation", 5, 45),
Task("Security patch", 1, 20),
Task("Feature request", 4, 60)
]
for task in tasks:
scheduler.push(task, priority=task.priority)
# Execute tasks
print("\nExecuting tasks in priority order:")
total_time = 0
while not scheduler.is_empty():
task = scheduler.pop()
total_time += task.duration
print(f" {task} - Completed at t={total_time}min")
if __name__ == "__main__":
test_priority_queue()
demo_task_scheduler()
9.4 Topic 51: Deque (Double-Ended Queue)
Complete Deque Implementation:
class Deque:
"""
Production-ready double-ended queue (deque)
Features:
- O(1) operations at both ends
- Doubly-linked list implementation
- Supports queue and stack operations
- Memory efficient with node pooling option
"""
class _Node:
"""Doubly-linked node"""
__slots__ = ['data', 'prev', 'next']
def __init__(self, data):
self.data = data
self.prev = None
self.next = None
def __init__(self):
"""Initialize empty deque"""
self._front = None
self._rear = None
self._size = 0
def __len__(self):
"""Return number of elements - O(1)"""
return self._size
def is_empty(self):
"""Check if empty - O(1)"""
return self._size == 0
# Front operations
def add_front(self, item):
"""
Add item to front - O(1)
Args:
item: Item to add
"""
new_node = self._Node(item)
if self.is_empty():
self._front = self._rear = new_node
else:
new_node.next = self._front
self._front.prev = new_node
self._front = new_node
self._size += 1
def remove_front(self):
"""
Remove and return front item - O(1)
Returns:
Front item
Raises:
IndexError: If deque is empty
"""
if self.is_empty():
raise IndexError("Deque is empty")
item = self._front.data
if self._size == 1:
self._front = self._rear = None
else:
self._front = self._front.next
self._front.prev = None
self._size -= 1
return item
def peek_front(self):
"""
View front item - O(1)
Returns:
Front item
Raises:
IndexError: If deque is empty
"""
if self.is_empty():
raise IndexError("Deque is empty")
return self._front.data
# Rear operations
def add_rear(self, item):
"""
Add item to rear - O(1)
Args:
item: Item to add
"""
new_node = self._Node(item)
if self.is_empty():
self._front = self._rear = new_node
else:
new_node.prev = self._rear
self._rear.next = new_node
self._rear = new_node
self._size += 1
def remove_rear(self):
"""
Remove and return rear item - O(1)
Returns:
Rear item
Raises:
IndexError: If deque is empty
"""
if self.is_empty():
raise IndexError("Deque is empty")
item = self._rear.data
if self._size == 1:
self._front = self._rear = None
else:
self._rear = self._rear.prev
self._rear.next = None
self._size -= 1
return item
def peek_rear(self):
"""
View rear item - O(1)
Returns:
Rear item
Raises:
IndexError: If deque is empty
"""
if self.is_empty():
raise IndexError("Deque is empty")
return self._rear.data
# Utility methods
def clear(self):
"""Clear all elements - O(1)"""
self._front = self._rear = None
self._size = 0
def to_list(self):
"""Convert to list from front to rear - O(n)"""
result = []
current = self._front
while current:
result.append(current.data)
current = current.next
return result
def rotate(self, steps=1):
"""
Rotate deque by steps positions - O(steps)
Positive steps: rotate right (rear to front)
Negative steps: rotate left (front to rear)
Args:
steps: Number of positions to rotate
"""
if self.is_empty() or steps == 0:
return
# Normalize steps
steps = steps % self._size if self._size > 0 else 0
# Rotate right
for _ in range(steps):
item = self.remove_rear()
self.add_front(item)
def __str__(self):
"""String representation"""
return f"Deque({self.to_list()})"
def __repr__(self):
"""Detailed representation"""
return f"Deque(size={self._size})"
def __iter__(self):
"""Iterator from front to rear"""
current = self._front
while current:
yield current.data
current = current.next
def __reversed__(self):
"""Reverse iterator from rear to front"""
current = self._rear
while current:
yield current.data
current = current.prev
# Test cases
def test_deque():
"""Test deque implementation"""
print("Testing Deque...")
print("=" * 60)
# Test 1: Basic operations
print("\nTest 1: Basic Operations")
dq = Deque()
# Add to both ends
dq.add_rear(1)
dq.add_rear(2)
dq.add_front(0)
dq.add_front(-1)
print(f"After operations: {dq}")
# Should be: [-1, 0, 1, 2]
# Test 2: Remove from both ends
print("\nTest 2: Remove Operations")
print(f"Remove front: {dq.remove_front()}") # -1
print(f"Remove rear: {dq.remove_rear()}") # 2
print(f"After removals: {dq}") # [0, 1]
# Test 3: Peek operations
print("\nTest 3: Peek Operations")
print(f"Peek front: {dq.peek_front()}") # 0
print(f"Peek rear: {dq.peek_rear()}") # 1
print(f"Deque unchanged: {dq}")
# Test 4: Rotation
print("\nTest 4: Rotation")
dq.clear()
for i in range(1, 6):
dq.add_rear(i)
print(f"Initial: {dq}")
dq.rotate(2)
print(f"After rotate(2): {dq}") # [4, 5, 1, 2, 3]
dq.rotate(-1)
print(f"After rotate(-1): {dq}") # [5, 1, 2, 3, 4]
# Test 5: Use as queue (FIFO)
print("\nTest 5: Use as Queue (FIFO)")
dq.clear()
dq.add_rear(1)
dq.add_rear(2)
dq.add_rear(3)
print(f"Queue: {dq}")
while not dq.is_empty():
print(f" Dequeue: {dq.remove_front()}")
# Test 6: Use as stack (LIFO)
print("\nTest 6: Use as Stack (LIFO)")
dq.add_rear(1)
dq.add_rear(2)
dq.add_rear(3)
print(f"Stack: {dq}")
while not dq.is_empty():
print(f" Pop: {dq.remove_rear()}")
# Test 7: Iteration
print("\nTest 7: Iteration")
for i in range(1, 4):
dq.add_rear(i)
print(f"Forward: {list(dq)}")
print(f"Backward: {list(reversed(dq))}")
print("\nAll Deque tests passed!")
def demo_palindrome_checker():
"""Real-world example: Palindrome checker using deque"""
print("\nPalindrome Checker Demo:")
print("=" * 60)
def is_palindrome(text):
"""
Check if text is palindrome using deque
Strategy: Compare from both ends simultaneously
"""
# Normalize: remove spaces and convert to lowercase
text = ''.join(text.split()).lower()
dq = Deque()
for char in text:
dq.add_rear(char)
while len(dq) > 1:
if dq.remove_front() != dq.remove_rear():
return False
return True
# Test palindromes
test_strings = [
"racecar",
"A man a plan a canal Panama",
"hello",
"Madam",
"Step on no pets"
]
for text in test_strings:
result = is_palindrome(text)
print(f" '{text}': {'Palindrome' if result else 'Not palindrome'}")
if __name__ == "__main__":
test_deque()
demo_palindrome_checker()
9.5 Common Interview Patterns
def queue_interview_patterns():
"""Common queue patterns in interviews"""
return {
"Pattern 1: BFS Template": """
queue = [start]
visited = {start}
while queue:
node = queue.pop(0)
process(node)
for neighbor in get_neighbors(node):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
""",
"Pattern 2: Level-Order Template": """
queue = [root]
while queue:
level_size = len(queue)
for _ in range(level_size):
node = queue.pop(0)
process(node)
add_children_to_queue()
""",
"Pattern 3: Monotonic Queue Template": """
from collections import deque
dq = deque()
for i, val in enumerate(arr):
# Remove outside window
while dq and dq[0] < i - k:
dq.popleft()
# Maintain order
while dq and arr[dq[-1]] < val:
dq.pop()
dq.append(i)
""",
"Pattern 4: Priority Queue Template": """
import heapq
heap = []
for item in items:
heapq.heappush(heap, (priority, item))
while heap:
priority, item = heapq.heappop(heap)
process(item)
"""
}
10. Advanced Concepts
10.1 Concurrent Queues
Lock-Based Thread-Safe Queue:
import threading
from collections import deque
class ThreadSafeQueue:
"""
Production-ready thread-safe queue using locks
Features:
- Mutual exclusion with locks
- Condition variables for blocking operations
- Bounded capacity with blocking
- Thread-safe operations
"""
def __init__(self, max_size=0):
"""
Initialize thread-safe queue
Args:
max_size: Maximum capacity (0 = unbounded)
"""
self._queue = deque()
self._max_size = max_size
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._not_full = threading.Condition(self._lock)
def enqueue(self, item, block=True, timeout=None):
"""
Add item to queue
Args:
item: Item to add
block: If True, block when full
timeout: Maximum time to wait (None = forever)
Returns:
True if successful
Raises:
Full: If queue full and block=False
"""
with self._not_full:
# Wait if queue is full
if self._max_size > 0:
while len(self._queue) >= self._max_size:
if not block:
raise Exception("Queue is full")
if not self._not_full.wait(timeout):
raise Exception("Timeout waiting for space")
self._queue.append(item)
self._not_empty.notify() # Wake up waiting dequeue
return True
def dequeue(self, block=True, timeout=None):
"""
Remove and return front item
Args:
block: If True, block when empty
timeout: Maximum time to wait
Returns:
Front item
Raises:
Empty: If queue empty and block=False
"""
with self._not_empty:
# Wait if queue is empty
while len(self._queue) == 0:
if not block:
raise IndexError("Queue is empty")
if not self._not_empty.wait(timeout):
raise Exception("Timeout waiting for item")
item = self._queue.popleft()
self._not_full.notify() # Wake up waiting enqueue
return item
def size(self):
"""Get queue size (thread-safe)"""
with self._lock:
return len(self._queue)
def is_empty(self):
"""Check if empty (thread-safe)"""
with self._lock:
return len(self._queue) == 0
**Lock-Free Queue (Michael-Scott Algorithm):**
```python
import threading
from typing import Optional, Any
class LockFreeNode:
"""Node for lock-free queue"""
__slots__ = ['value', 'next']
def __init__(self, value=None):
self.value = value
self.next = None
class LockFreeQueue:
"""
Lock-free queue using Michael-Scott algorithm
Features:
- Wait-free enqueue
- Lock-free dequeue
- Uses atomic CAS operations
- High concurrency performance
Note: This is a simplified version for demonstration
Production use requires atomic operations library
"""
def __init__(self):
"""Initialize with dummy node"""
dummy = LockFreeNode()
self._head = dummy
self._tail = dummy
self._lock = threading.Lock() # For CAS simulation
def enqueue(self, value):
"""
Add item to queue - Wait-free
Args:
value: Item to add
"""
new_node = LockFreeNode(value)
while True:
tail = self._tail
next_node = tail.next
# Check if tail is still valid
if tail == self._tail:
if next_node is None:
# Try to link new node
if self._compare_and_swap_next(tail, None, new_node):
# Success, try to swing tail
self._compare_and_swap_tail(tail, new_node)
return
else:
# Help other thread swing tail
self._compare_and_swap_tail(tail, next_node)
def dequeue(self) -> Optional[Any]:
"""
Remove and return front item - Lock-free
Returns:
Front item or None if empty
"""
while True:
head = self._head
tail = self._tail
first = head.next
# Check if head is still valid
if head == self._head:
if head == tail:
# Queue empty or tail falling behind
if first is None:
return None # Empty
# Tail falling behind, help advance it
self._compare_and_swap_tail(tail, first)
else:
# Read value before CAS
value = first.value
# Try to swing head
if self._compare_and_swap_head(head, first):
return value
def _compare_and_swap_next(self, node, expected, new):
"""Atomic CAS for next pointer (simplified)"""
with self._lock:
if node.next == expected:
node.next = new
return True
return False
def _compare_and_swap_head(self, expected, new):
"""Atomic CAS for head (simplified)"""
with self._lock:
if self._head == expected:
self._head = new
return True
return False
def _compare_and_swap_tail(self, expected, new):
"""Atomic CAS for tail (simplified)"""
with self._lock:
if self._tail == expected:
self._tail = new
return True
return False
# Performance comparison
def demo_concurrent_queues():
"""Demonstrate concurrent queue performance"""
import time
from concurrent.futures import ThreadPoolExecutor
print("\nConcurrent Queue Performance Demo:")
print("=" * 60)
def producer_consumer_test(queue_class, num_threads=4, items_per_thread=1000):
"""Test producer-consumer with given queue"""
queue = queue_class()
def producer():
for i in range(items_per_thread):
queue.enqueue(i)
def consumer():
consumed = 0
for _ in range(items_per_thread):
while True:
try:
queue.dequeue()
consumed += 1
break
except:
time.sleep(0.0001)
return consumed
# Run test
start = time.time()
with ThreadPoolExecutor(max_workers=num_threads*2) as executor:
# Start producers
producer_futures = [executor.submit(producer)
for _ in range(num_threads)]
# Start consumers
consumer_futures = [executor.submit(consumer)
for _ in range(num_threads)]
# Wait for completion
for f in producer_futures + consumer_futures:
f.result()
elapsed = time.time() - start
throughput = (num_threads * items_per_thread * 2) / elapsed
return elapsed, throughput
# Test both implementations
print("\nTesting ThreadSafeQueue (lock-based):")
elapsed, throughput = producer_consumer_test(ThreadSafeQueue)
print(f" Time: {elapsed:.3f}s")
print(f" Throughput: {throughput:.0f} ops/sec")
print("\nTesting LockFreeQueue:")
elapsed, throughput = producer_consumer_test(LockFreeQueue)
print(f" Time: {elapsed:.3f}s")
print(f" Throughput: {throughput:.0f} ops/sec")
10.2 Persistent Queues
Functional Persistent Queue:
class PersistentQueue:
"""
Immutable queue preserving all versions
Features:
- Purely functional (immutable)
- All versions accessible
- O(1) amortized operations
- Structural sharing for efficiency
Implementation: Banker's Queue (2 lists)
"""
def __init__(self, front=None, rear=None, front_list=None, rear_list=None):
"""
Initialize persistent queue
Args:
front: Front element node
rear: Rear element node
front_list: List for front half
rear_list: List for rear half (reversed)
"""
self._front_list = front_list if front_list is not None else []
self._rear_list = rear_list if rear_list is not None else []
self._maintain_invariant()
def _maintain_invariant(self):
"""
Maintain invariant: front should not be empty if queue non-empty
If front is empty, reverse rear and move to front
"""
if not self._front_list and self._rear_list:
self._front_list = list(reversed(self._rear_list))
self._rear_list = []
def enqueue(self, value):
"""
Create new queue with value added - O(1)
Args:
value: Value to add
Returns:
New persistent queue with value added
"""
new_rear = self._rear_list + [value]
return PersistentQueue(
front_list=self._front_list[:], # Copy
rear_list=new_rear
)
def dequeue(self):
"""
Create new queue with front removed - O(1) amortized
Returns:
Tuple of (front_value, new_queue)
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
front_value = self._front_list[0]
new_front = self._front_list[1:]
return front_value, PersistentQueue(
front_list=new_front,
rear_list=self._rear_list[:] # Copy
)
def front(self):
"""
Get front value - O(1)
Returns:
Front value
Raises:
IndexError: If queue is empty
"""
if self.is_empty():
raise IndexError("Queue is empty")
return self._front_list[0]
def is_empty(self):
"""Check if queue is empty - O(1)"""
return len(self._front_list) == 0 and len(self._rear_list) == 0
def size(self):
"""Get queue size - O(1)"""
return len(self._front_list) + len(self._rear_list)
def to_list(self):
"""Convert to list - O(n)"""
return self._front_list + list(reversed(self._rear_list))
def __str__(self):
"""String representation"""
return f"PersistentQueue({self.to_list()})"
def demo_persistent_queue():
"""Demonstrate persistent queue version history"""
print("\nPersistent Queue Demo:")
print("=" * 60)
# Create queue and maintain version history
print("\nBuilding version history:")
v1 = PersistentQueue()
print(f"v1 (empty): {v1}")
v2 = v1.enqueue(1)
print(f"v2 (enqueue 1): {v2}")
v3 = v2.enqueue(2)
print(f"v3 (enqueue 2): {v3}")
v4 = v3.enqueue(3)
print(f"v4 (enqueue 3): {v4}")
value, v5 = v4.dequeue()
print(f"v5 (dequeue {value}): {v5}")
# All versions still accessible!
print("\nAccessing old versions:")
print(f"v1: {v1}") # Still empty
print(f"v2: {v2}") # Still [1]
print(f"v3: {v3}") # Still [1, 2]
print(f"v4: {v4}") # Still [1, 2, 3]
print(f"v5: {v5}") # [2, 3]
10.3 Distributed Queues
Distributed Queue Concepts:
class DistributedQueueInterface:
"""
Interface for distributed queue systems
Real implementations: Amazon SQS, RabbitMQ, Apache Kafka
Features:
- Message persistence
- At-least-once delivery
- Dead letter queues
- Message visibility timeout
"""
def send_message(self, message, delay_seconds=0):
"""
Send message to queue
Args:
message: Message content
delay_seconds: Delay before message becomes visible
Returns:
Message ID
"""
pass
def receive_messages(self, max_messages=1, visibility_timeout=30):
"""
Receive messages from queue
Args:
max_messages: Maximum number of messages to receive
visibility_timeout: Time message invisible to other receivers
Returns:
List of messages
"""
pass
def delete_message(self, receipt_handle):
"""
Delete message from queue
Args:
receipt_handle: Receipt handle from receive operation
"""
pass
def change_message_visibility(self, receipt_handle, visibility_timeout):
"""
Change visibility timeout of message
Args:
receipt_handle: Receipt handle
visibility_timeout: New visibility timeout
"""
pass
class SimpleDistributedQueue:
"""
Simplified distributed queue simulation
Demonstrates key concepts:
- Message persistence (in-memory for demo)
- Visibility timeout
- Message acknowledgment
- Dead letter queue
"""
def __init__(self, visibility_timeout=30, max_receives=3):
"""
Initialize distributed queue
Args:
visibility_timeout: Default visibility timeout (seconds)
max_receives: Max receives before moving to DLQ
"""
self._messages = [] # Main queue
self._dead_letter_queue = [] # DLQ
self._in_flight = {} # Messages being processed
self._visibility_timeout = visibility_timeout
self._max_receives = max_receives
self._message_counter = 0
self._lock = threading.Lock()
def send_message(self, message):
"""
Send message to queue
Args:
message: Message content
Returns:
Message ID
"""
with self._lock:
self._message_counter += 1
msg = {
'id': self._message_counter,
'body': message,
'receive_count': 0,
'sent_timestamp': time.time()
}
self._messages.append(msg)
return msg['id']
def receive_messages(self, max_messages=1):
"""
Receive messages from queue
Args:
max_messages: Maximum messages to receive
Returns:
List of messages with receipt handles
"""
import time
with self._lock:
current_time = time.time()
received = []
# Remove expired in-flight messages
expired_handles = [
handle for handle, (msg, timeout) in self._in_flight.items()
if current_time >= timeout
]
for handle in expired_handles:
msg, _ = self._in_flight.pop(handle)
self._messages.insert(0, msg) # Return to queue
# Receive messages
for _ in range(min(max_messages, len(self._messages))):
if not self._messages:
break
msg = self._messages.pop(0)
msg['receive_count'] += 1
# Check if should move to DLQ
if msg['receive_count'] > self._max_receives:
self._dead_letter_queue.append(msg)
continue
# Add to in-flight
receipt_handle = f"receipt-{msg['id']}-{current_time}"
timeout = current_time + self._visibility_timeout
self._in_flight[receipt_handle] = (msg, timeout)
received.append({
'body': msg['body'],
'receipt_handle': receipt_handle,
'receive_count': msg['receive_count']
})
return received
def delete_message(self, receipt_handle):
"""
Delete message (acknowledge successful processing)
Args:
receipt_handle: Receipt handle from receive
"""
with self._lock:
if receipt_handle in self._in_flight:
del self._in_flight[receipt_handle]
return True
return False
def get_queue_stats(self):
"""Get queue statistics"""
with self._lock:
return {
'messages_available': len(self._messages),
'messages_in_flight': len(self._in_flight),
'messages_in_dlq': len(self._dead_letter_queue)
}
def demo_distributed_queue():
"""Demonstrate distributed queue concepts"""
import time
print("\nDistributed Queue Demo:")
print("=" * 60)
queue = SimpleDistributedQueue(visibility_timeout=2, max_receives=2)
# Send messages
print("\nSending messages:")
for i in range(5):
msg_id = queue.send_message(f"Message {i}")
print(f" Sent message {msg_id}")
# Receive and process
print("\nReceiving messages:")
messages = queue.receive_messages(max_messages=3)
for msg in messages:
print(f" Received: {msg['body']} (handle: {msg['receipt_handle']})")
# Simulate processing
print("\nProcessing messages...")
queue.delete_message(messages[0]['receipt_handle'])
print(f" Deleted message 1 (success)")
# Let message 2 timeout (visibility expires)
print(f" Letting message 2 timeout...")
time.sleep(2.5)
# Receive again - message 2 should be available again
messages2 = queue.receive_messages(max_messages=5)
print(f"\nReceived {len(messages2)} messages after timeout")
# Show stats
stats = queue.get_stats()
print(f"\nQueue Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
if __name__ == "__main__":
demo_concurrent_queues()
demo_persistent_queue()
demo_distributed_queue()
10.4 Memory-Efficient Queues
Ring Buffer with Memory Pooling:
class MemoryEfficientQueue:
"""
Memory-efficient queue with object pooling
Features:
- Node pooling to reduce allocations
- Memory reuse
- Reduced GC pressure
- Cache-friendly layout
"""
class _Node:
__slots__ = ['data', 'next']
def __init__(self):
self.data = None
self.next = None
def __init__(self, pool_size=100):
"""
Initialize queue with node pool
Args:
pool_size: Initial pool size
"""
self._front = None
self._rear = None
self._size = 0
# Node pool
self._pool = [self._Node() for _ in range(pool_size)]
self._pool_index = 0
def _get_node(self):
"""Get node from pool or create new"""
if self._pool_index < len(self._pool):
node = self._pool[self._pool_index]
self._pool_index += 1
return node
return self._Node()
def _return_node(self, node):
"""Return node to pool"""
node.data = None
node.next = None
# Could implement pool return logic
def enqueue(self, item):
"""Add item using pooled node"""
node = self._get_node()
node.data = item
if self._front is None:
self._front = self._rear = node
else:
self._rear.next = node
self._rear = node
self._size += 1
def dequeue(self):
"""Remove item and return node to pool"""
if self._front is None:
raise IndexError("Queue is empty")
node = self._front
item = node.data
self._front = self._front.next
if self._front is None:
self._rear = None
self._return_node(node)
self._size -= 1
return item
11. Meta-Learning Questions
11.1 When to Use Queue
Use Queue When:
- Need FIFO ordering
- Level-order traversal (BFS)
- Task scheduling
- Request buffering
- Producer-consumer pattern
Use Stack When:
- Need LIFO ordering
- Depth-first traversal (DFS)
- Undo operations
- Recursion simulation
Use Priority Queue When:
- Priority-based processing
- Always need min/max element
- Dijkstra's algorithm
- Task scheduling with priorities
Use Deque When:
- Need both-end access
- Sliding window problems
- Palindrome checking
11.2 Pattern Recognition
def recognize_queue_patterns():
"""Keywords indicating queue usage"""
return {
"BFS Keywords": [
"Level-order",
"Shortest path (unweighted)",
"Minimum steps",
"Layer by layer",
"Nearest"
],
"Queue Keywords": [
"FIFO",
"First-come-first-served",
"Processing order",
"Waiting line",
"Buffer"
],
"Monotonic Queue Keywords": [
"Sliding window maximum/minimum",
"Next greater/smaller in range",
"Constrained optimization"
],
"Priority Queue Keywords": [
"Kth largest/smallest",
"Merge k sorted",
"Top k frequent",
"Minimum cost",
"Priority scheduling"
]
}
11.3 Common Mistakes
def common_queue_mistakes():
"""Common mistakes and solutions"""
mistakes = [
{
"mistake": "Using list.pop(0) for queue",
"problem": "O(n) operation!",
"solution": "Use collections.deque with popleft()"
},
{
"mistake": "Not tracking level in BFS",
"problem": "Can't distinguish levels",
"solution": "Use level_size = len(queue) pattern"
},
{
"mistake": "Forgetting to check empty before dequeue",
"problem": "IndexError",
"solution": "Always check is_empty() or use try-except"
},
{
"mistake": "Not removing from deque when outside window",
"problem": "Incorrect results",
"solution": "Check indices in monotonic deque"
}
]
return mistakes
11.4 Learning Roadmap
def queue_learning_roadmap():
"""Structured learning path"""
return {
"Week 1: Fundamentals": [
"Implement queue from scratch",
"Understand FIFO principle",
"Array vs linked list queue",
"Circular queue implementation"
],
"Week 2: BFS": [
"Binary tree level-order",
"Graph BFS",
"Shortest path problems",
"Multi-source BFS"
],
"Week 3: Advanced Queues": [
"Priority queue / heaps",
"Deque operations",
"Monotonic queue pattern",
"Sliding window maximum"
],
"Week 4: Applications": [
"Task scheduling",
"Topological sort",
"Queue using stacks",
"Stack using queues"
]
}
Read Next
Start reading to get personalized recommendations
Explore Unread
Great job! You've read all available articles