Prepared for: SDE Interview Preparation
Last Updated: February 2026
Focus Areas: Core Python, Advanced Python, Concurrency,
Flask, FastAPI, Django, Testing, Performance
Answer:
Python is a high-level, interpreted, dynamically-typed,
garbage-collected programming language emphasising readability.
Key features:
1 + "2" raises TypeError
Answer:
An object's mutability describes whether its value can change
after creation.
| Immutable | Mutable |
|---|---|
| int, float, complex | list |
| str | dict |
| tuple | set |
| frozenset | bytearray |
| bytes | user-defined classes (by default) |
a = [1, 2, 3]
b = a # b points to the SAME list object
b.append(4)
print(a) # [1, 2, 3, 4] — mutated!
t = (1, 2, 3)
# t[0] = 99 # TypeError: 'tuple' object does not support item assignment
Why it matters: Mutable default arguments are a classic Python pitfall:
# BAD — the list is created once when the function is defined
def add_item(item, lst=[]):
lst.append(item)
return lst
add_item(1) # [1]
add_item(2) # [1, 2] ← unexpected!
# GOOD
def add_item(item, lst=None):
if lst is None:
lst = []
lst.append(item)
return lst
is vs ==Answer:
== checks value equality (calls
__eq__)
is checks identity — whether two names
point to the exact same object in memory
a = [1, 2, 3]
b = [1, 2, 3]
print(a == b) # True (same values)
print(a is b) # False (different objects)
x = 256
y = 256
print(x is y) # True — CPython caches small ints (-5 to 256)
x = 1000
y = 1000
print(x is y) # False (outside cache range — implementation detail!)
Rule of thumb: Use == for value
comparisons. Use is only for identity checks, especially
is None / is not None.
Answer:
import copy
original = [[1, 2], [3, 4]]
shallow = copy.copy(original)
shallow[0].append(99)
print(original) # [[1, 2, 99], [3, 4]] — inner list shared!
deep = copy.deepcopy(original)
deep[0].append(100)
print(original) # [[1, 2, 99], [3, 4]] — unaffected
Answer:
*args: Collects extra positional arguments into a tuple
**kwargs: Collects extra keyword arguments into a dict
def log(level, *args, **kwargs):
print(f"[{level}]", *args)
print("Extra context:", kwargs)
log("INFO", "Server started", port=8080, host="0.0.0.0")
# [INFO] Server started
# Extra context: {'port': 8080, 'host': '0.0.0.0'}
Answer:
[...] builds the
whole list eagerly in memory.
(...) is lazy —
yields one item at a time.
squares_list = [x**2 for x in range(1_000_000)] # ~8 MB in memory
squares_gen = (x**2 for x in range(1_000_000)) # ~120 bytes (generator object)
# When to use generators: streaming large data, pipelines, iterating once
total = sum(x**2 for x in range(1_000_000)) # memory-efficient
Answer:
Introduced in PEP 484 (Python 3.5+). They are annotations only — Python
does NOT enforce them at runtime by default. Tools like
mypy, pyright, and FastAPI use them.
from typing import Optional, List, Dict, Union, Tuple
def get_user(user_id: int) -> Optional[Dict[str, str]]:
...
def process(items: List[int]) -> Tuple[int, int]:
return min(items), max(items)
# Python 3.10+ shorthand
def log(msg: str | None = None) -> None:
...
Answer:
Python resolves names in this order: Local →
Enclosing → Global →
Built-in.
x = "global"
def outer():
x = "enclosing"
def inner():
x = "local"
print(x) # "local" — L (local scope)
inner()
print(x) # "enclosing" — E (enclosing scope)
outer()
print(x) # "global" — G (global scope)
# B = built-ins: len, print, range, etc.
# global — bind to module-level name
counter = 0
def increment():
global counter
counter += 1
# nonlocal — modify enclosing (non-global) scope variable
def make_counter():
count = 0
def inc():
nonlocal count # without this: UnboundLocalError on count += 1
count += 1
return count
return inc
# Classic pitfall: assignment makes variable LOCAL for entire function
x = 10
def bad():
print(x) # UnboundLocalError! Python sees x = 20 below, marks x as local
x = 20
Answer:
Python 3 lets you explicitly control how arguments must be passed using
/ and * in the signature.
# / separates positional-only params from the rest (PEP 570, Python 3.8+)
# * separates the rest from keyword-only params
def api(pos_only, /, normal, *, kw_only):
print(pos_only, normal, kw_only)
api(1, 2, kw_only=3) # OK
api(1, normal=2, kw_only=3) # OK
# api(pos_only=1, 2, kw_only=3) # TypeError — pos_only can't be a keyword
# Keyword-only (after *) — forces callers to name the param
def connect(host, port, *, timeout=30, retries=3):
...
connect("localhost", 5432, timeout=10) # must name timeout
# connect("localhost", 5432, 10) # TypeError
# *args also creates keyword-only barrier for anything after it
def func(*args, keyword_only=True):
...
# Why positional-only matters: lets you rename the param without breaking callers,
# and matches C-extension style (e.g., len(obj) not len(sequence=obj))
@property — Getters, Setters, Deleters
Answer:
@property turns a method into a managed attribute — adding
validation or computation behind what looks like plain attribute access.
class Circle:
def __init__(self, radius):
self._radius = radius
@property
def radius(self): # getter — accessed as c.radius
return self._radius
@radius.setter
def radius(self, value): # setter — called on c.radius = x
if value < 0:
raise ValueError("Radius cannot be negative")
self._radius = value
@radius.deleter
def radius(self): # deleter — called on del c.radius
del self._radius
@property
def area(self): # read-only computed property
import math
return math.pi * self._radius ** 2
c = Circle(5)
print(c.radius) # 5 — calls getter
c.radius = 10 # calls setter
c.radius = -1 # ValueError
del c.radius # calls deleter
# Cached property (Python 3.8+) — computed once, then stored as instance attribute
from functools import cached_property
class DataSet:
def __init__(self, data):
self.data = data
@cached_property
def statistics(self):
# Expensive computation — runs only the first time
return {"mean": sum(self.data) / len(self.data), "n": len(self.data)}
# After first call, stored in self.__dict__["statistics"] directly
Answer:
Python exceptions are objects. All built-in exceptions inherit from
BaseException; user-facing exceptions should inherit from
Exception.
# Full try/except/else/finally structure
try:
result = 10 / divisor
except ZeroDivisionError as e:
print(f"Math error: {e}")
except (TypeError, ValueError) as e:
print(f"Bad input: {e}")
except Exception as e:
print(f"Unexpected: {e}")
raise # re-raise the same exception with traceback intact
else:
# Runs ONLY if no exception was raised in try block
print(f"Result: {result}")
finally:
# ALWAYS runs — use for cleanup (closing files, releasing locks, etc.)
cleanup()
# Exception hierarchy (critical ones to know for interviews)
# BaseException
# SystemExit ← raised by sys.exit()
# KeyboardInterrupt ← raised by Ctrl+C
# Exception
# ArithmeticError → ZeroDivisionError, OverflowError
# LookupError → IndexError, KeyError
# TypeError
# ValueError → UnicodeError, UnicodeDecodeError
# AttributeError
# NameError → UnboundLocalError
# OSError → FileNotFoundError, PermissionError, TimeoutError
# RuntimeError → RecursionError, NotImplementedError
# StopIteration ← signals end of iterator
# GeneratorExit ← thrown into generator when .close() is called
# Custom exceptions — inherit from Exception (or a more specific subclass)
class InsufficientFundsError(ValueError):
def __init__(self, amount, balance):
self.amount = amount
self.balance = balance
super().__init__(f"Cannot withdraw {amount}; balance is only {balance}")
try:
raise InsufficientFundsError(100, 50)
except InsufficientFundsError as e:
print(e.amount, e.balance) # access extra attributes
# Exception chaining — preserve original cause
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
raise ValueError("Invalid config format") from e # explicit: __cause__ set
# raise ValueError("...") from None ← suppresses the chain (hides original)
# Suppress specific exceptions cleanly
from contextlib import suppress
with suppress(FileNotFoundError):
os.remove("temp.txt") # silently ignored if file doesn't exist
match / case Statement (Python 3.10+)
Answer:
Structural pattern matching — more powerful than if/elif chains. Can
destructure objects, sequences, and mappings in a single step.
def handle_command(command):
match command:
case "quit":
return "Exiting"
case "help" | "?":
return "Commands: quit, help, status"
case str(s) if s.startswith("go "): # guard with if
return f"Moving {s[3:]}"
case _: # wildcard — matches anything
return f"Unknown: {command}"
# Sequence patterns
match point:
case (0, 0): print("Origin")
case (x, 0): print(f"X-axis at {x}")
case (0, y): print(f"Y-axis at {y}")
case (x, y): print(f"Point({x}, {y})")
# Mapping patterns
match event:
case {"type": "click", "button": btn}: handle_click(btn)
case {"type": "keypress", "key": key}: handle_key(key)
# Class patterns
from dataclasses import dataclass
@dataclass
class Point:
x: float
y: float
match shape:
case Point(x=0, y=0): print("Origin")
case Point(x=x, y=y): print(f"Point({x}, {y})")
# match evaluates the subject once; patterns are checked top-down
# Unlike C switch: no fallthrough — each case is independent
# Works with isinstance checks under the hood for class patterns
| Type | Ordered | Mutable | Duplicates | Lookup | Use When |
|---|---|---|---|---|---|
| list | Yes | Yes | Yes | O(n) | Ordered collection, frequent mutation |
| tuple | Yes | No | Yes | O(n) | Fixed data, dict keys, namedtuples |
| set | No | Yes | No | O(1) avg | Membership tests, dedup |
| dict | Yes (3.7+) | Yes | Keys: No | O(1) avg | Key-value mapping |
# Set operations (very interview-friendly)
a = {1, 2, 3, 4}
b = {3, 4, 5, 6}
print(a | b) # union: {1, 2, 3, 4, 5, 6}
print(a & b) # intersection: {3, 4}
print(a - b) # difference: {1, 2}
print(a ^ b) # symmetric diff: {1, 2, 5, 6}
Answer:
Python dicts use an open-addressing hash table. When
you set a key:
hash(key) to compute an index.Since Python 3.7, insertion order is guaranteed. Since Python 3.6, the implementation was redesigned to use a compact array + sparse hash table, reducing memory usage by ~30%.
Keys must be hashable — they must implement
__hash__ and __eq__. Lists cannot be dict
keys; tuples can.
from collections import defaultdict, Counter, deque, OrderedDict, namedtuple
# defaultdict — avoids KeyError for missing keys
word_count = defaultdict(int)
for word in "the quick brown fox the fox".split():
word_count[word] += 1
# defaultdict(int, {'the': 2, 'quick': 1, 'brown': 1, 'fox': 2})
# Counter — frequency counting
from collections import Counter
c = Counter("abracadabra")
print(c.most_common(2)) # [('a', 5), ('b', 2)]
# deque — O(1) appends/pops from both ends
dq = deque([1, 2, 3])
dq.appendleft(0) # [0, 1, 2, 3]
dq.popleft() # 0, deque = [1, 2, 3]
# deque(maxlen=N) acts as a fixed-size circular buffer
# namedtuple — lightweight immutable record
Point = namedtuple('Point', ['x', 'y'])
p = Point(3, 4)
print(p.x, p.y) # 3 4
# Use dataclasses for mutable records (Python 3.7+)
import heapq
# heapq implements a MIN-heap on top of a regular list
nums = [5, 3, 8, 1, 9, 2]
heapq.heapify(nums) # in-place, O(n)
print(nums[0]) # 1 — smallest always at index 0
heapq.heappush(nums, 4) # O(log n)
smallest = heapq.heappop(nums) # O(log n) — removes and returns smallest
top3 = heapq.nlargest(3, nums) # O(n + k log n) — better than sort for small k
bot3 = heapq.nsmallest(3, nums)
# Max-heap trick: negate values
max_heap = []
for v in [5, 1, 8, 3]:
heapq.heappush(max_heap, -v)
print(-heapq.heappop(max_heap)) # 8
# Priority queue with tuples (sorted by first element)
tasks = [(3, "medium"), (1, "high priority"), (5, "low")]
heapq.heapify(tasks)
print(heapq.heappop(tasks)) # (1, 'high priority')
import bisect
# bisect maintains a sorted list — binary search O(log n)
a = [1, 3, 5, 7, 9]
idx = bisect.bisect_left(a, 5) # 2 — leftmost position to insert 5
idx = bisect.bisect_right(a, 5) # 3 — rightmost position to insert 5
bisect.insort(a, 6) # inserts in sorted order: O(n) due to shift
# Classic use case: grade boundaries
def grade(score):
breakpoints = [60, 70, 80, 90]
grades = "FDCBA"
return grades[bisect.bisect(breakpoints, score)]
print(grade(85)) # 'B'
print(grade(55)) # 'F'
| Operation | list | deque | dict / set | heapq | sorted list (bisect) |
|---|---|---|---|---|---|
| Index access | O(1) | O(n) | O(1) by key | O(1) min | O(1) |
| Append (end) | O(1) amortized | O(1) | — | O(log n) | O(n) |
| Prepend (start) | O(n) | O(1) | — | — | O(n) |
| Insert (middle) | O(n) | O(n) | — | — | O(n) |
| Delete end | O(1) | O(1) | — | — | O(1) |
| Delete start | O(n) | O(1) | — | — | O(n) |
| Membership test | O(n) | O(n) | O(1) avg | O(n) | O(log n) |
| Insert key | — | — | O(1) avg | O(log n) | — |
| Delete key | — | — | O(1) avg | — | — |
| Sort | O(n log n) | — | — | — | — |
| Iterate all | O(n) | O(n) | O(n) | O(n) | O(n) |
list internals: Dynamic array — doubles capacity when
full (amortised O(1) append). list.insert(0, x) is O(n)
because all elements shift right. Use
collections.deque when you need O(1) at both ends.
dict / set internals: Open-addressing hash table — O(1)
average; degrades to O(n) worst-case on pathological hash collisions.
Load factor kept below ~2/3 (rehash triggers otherwise).
Python sort (Timsort): Stable, O(n log n) worst case,
O(n) best case (already sorted runs). Both
list.sort() (in-place, returns None) and
sorted() (new list) use it.
1. Encapsulation — bundling data and methods; controlling access via naming conventions:
class BankAccount:
def __init__(self, owner, balance=0):
self.owner = owner # public
self._balance = balance # protected (convention)
self.__pin = "1234" # private (name-mangled to _BankAccount__pin)
@property
def balance(self):
return self._balance
def deposit(self, amount):
if amount > 0:
self._balance += amount
2. Inheritance — a class inherits attributes/methods from a parent:
class Animal:
def __init__(self, name):
self.name = name
def speak(self):
raise NotImplementedError
class Dog(Animal):
def speak(self):
return f"{self.name} says Woof!"
class Cat(Animal):
def speak(self):
return f"{self.name} says Meow!"
# Multiple inheritance
class Amphibian(Dog, Cat): # MRO determines method resolution order
pass
3. Polymorphism — same interface, different behaviour:
animals = [Dog("Rex"), Cat("Whiskers")]
for animal in animals:
print(animal.speak()) # Duck typing — if it has .speak(), call it
4. Abstraction — hiding implementation details:
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self) -> float: ...
@abstractmethod
def perimeter(self) -> float: ...
class Circle(Shape):
def __init__(self, r):
self.r = r
def area(self):
return 3.14159 * self.r ** 2
def perimeter(self):
return 2 * 3.14159 * self.r
Answer:
Special methods surrounded by double underscores that Python calls
implicitly. They allow objects to work with built-in operations.
class Vector:
def __init__(self, x, y):
self.x, self.y = x, y
def __repr__(self): # unambiguous string (developers)
return f"Vector({self.x}, {self.y})"
def __str__(self): # human-readable (print)
return f"({self.x}, {self.y})"
def __add__(self, other): # v1 + v2
return Vector(self.x + other.x, self.y + other.y)
def __eq__(self, other): # v1 == v2
return self.x == other.x and self.y == other.y
def __len__(self): # len(v)
return 2
def __getitem__(self, idx): # v[0], v[1]
return (self.x, self.y)[idx]
def __iter__(self): # for component in v
yield self.x
yield self.y
def __contains__(self, val): # 3 in v
return val in (self.x, self.y)
def __bool__(self): # bool(v)
return bool(self.x or self.y)
Answer:
In Python's multiple inheritance, the
C3 linearization algorithm determines the order in
which parent classes are searched for an attribute or method.
class A:
def method(self): print("A")
class B(A):
def method(self): print("B")
class C(A):
def method(self): print("C")
class D(B, C):
pass
D().method() # prints "B"
print(D.__mro__) # (D, B, C, A, object)
# Left-to-right, depth-first, but respects the C3 rule
super() follows MRO, not just the immediate parent:
class B(A):
def method(self):
super().method() # calls C.method() next in MRO, not A.method()
print("B")
class Pizza:
num_slices = 8 # class variable
def __init__(self, topping):
self.topping = topping # instance variable
def describe(self): # instance method — receives self
return f"{self.topping} pizza, {self.num_slices} slices"
@classmethod
def margherita(cls): # class method — receives cls
return cls("Margherita") # can create instances
@staticmethod
def calories_per_slice(total_cal): # static method — no self/cls
return total_cal / Pizza.num_slices # utility function
p = Pizza.margherita()
print(Pizza.calories_per_slice(2000)) # 250.0
from dataclasses import dataclass, field
from typing import List
@dataclass
class User:
name: str
age: int
tags: List[str] = field(default_factory=list)
# Never use mutable defaults directly — use field(default_factory=...)
def is_adult(self):
return self.age >= 18
u = User("Alice", 30, ["admin"])
print(u) # User(name='Alice', age=30, tags=['admin'])
@dataclass(frozen=True) # makes it immutable (like a namedtuple)
class Point:
x: float
y: float
Answer:
A descriptor is any object that defines __get__,
__set__, or __delete__. They are the machinery
behind @property, @classmethod,
@staticmethod, and ORM field definitions (e.g.,
Column(Integer) in SQLAlchemy).
class Validator:
"""Data descriptor — validates a non-negative number."""
def __set_name__(self, owner, name): # called at class definition time
self.name = name
def __get__(self, obj, objtype=None):
if obj is None: # accessed on class, not instance
return self
return getattr(obj, f"_{self.name}", None)
def __set__(self, obj, value):
if not isinstance(value, (int, float)):
raise TypeError(f"{self.name} must be numeric")
if value < 0:
raise ValueError(f"{self.name} must be non-negative")
setattr(obj, f"_{self.name}", value)
class Product:
price = Validator() # descriptor instance as class attribute
quantity = Validator()
def __init__(self, price, quantity):
self.price = price # triggers Validator.__set__
self.quantity = quantity
p = Product(10.99, 5)
p.price = -1 # ValueError
p.price = "x" # TypeError
# Lookup priority (highest to lowest for data descriptors):
# 1. Data descriptor (has __set__ or __delete__) ← @property is here
# 2. Instance __dict__
# 3. Non-data descriptor (only __get__) ← functions/methods are here
# 4. Class __dict__
# This is why assigning c.radius = 5 calls the @radius.setter
# rather than creating an entry in c.__dict__["radius"]
__new__ vs __init__
Answer:
__new__ creates the object (allocates memory,
returns the instance). __init__ initialises it
(sets attributes on the already-created instance). Override
__new__ when you need to control object creation itself —
e.g., Singleton, subclassing immutable types.
# Subclassing immutable int — must use __new__
class DoubledInt(int):
def __new__(cls, value):
return super().__new__(cls, value * 2) # immutable value must be set here
x = DoubledInt(5)
print(x) # 10
# Singleton via __new__
class Singleton:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance # __init__ is STILL called each time!
def __init__(self, value):
self.value = value # careful — overwrites on every call
a = Singleton(1)
b = Singleton(2)
print(a is b) # True — same instance
print(a.value) # 2 — __init__ ran again!
# Call order: __new__ → if returns instance of cls → __init__
# If __new__ returns something other than an instance of cls, __init__ is skipped
Answer:
Protocol (PEP 544, Python 3.8+) defines an interface by its
methods/attributes — without requiring explicit inheritance. This is the
formal version of Python's duck typing and is compatible with type
checkers (mypy, pyright).
from typing import Protocol, runtime_checkable
@runtime_checkable # enables isinstance() checks at runtime
class Drawable(Protocol):
def draw(self) -> None: ...
def resize(self, factor: float) -> None: ...
class Circle: # does NOT inherit Drawable
def draw(self): print("Drawing circle")
def resize(self, factor): self.radius *= factor
class Square: # does NOT inherit Drawable
def draw(self): print("Drawing square")
def resize(self, factor): self.side *= factor
def render_all(shapes: list[Drawable]) -> None:
for s in shapes:
s.draw() # type-safe — mypy/pyright accept this
render_all([Circle(), Square()]) # works!
print(isinstance(Circle(), Drawable)) # True — runtime check via @runtime_checkable
# Contrast with ABC:
# ABC: requires explicit class Circle(Shape) — nominal subtyping
# Protocol: no inheritance needed — structural subtyping
# Built-in Protocols from collections.abc:
# Iterable, Iterator, Sequence, Mapping,
# Callable, Hashable, Sized, ContextManager…
from collections.abc import Iterable, Callable
def process(items: Iterable[int]) -> list[int]:
return [x * 2 for x in items] # works with list, tuple, set, generator…
Answer:
A decorator is a function (or class) that takes another function as
input and returns a modified version. They use the
@syntax as syntactic sugar for
func = decorator(func).
import time
import functools
def timer(func):
@functools.wraps(func) # preserves __name__, __doc__, etc.
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} took {end - start:.4f}s")
return result
return wrapper
@timer
def slow_function():
time.sleep(0.5)
slow_function() # slow_function took 0.5001s
Decorator with arguments:
def retry(times=3):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(times):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == times - 1:
raise
print(f"Attempt {attempt+1} failed: {e}. Retrying...")
return wrapper
return decorator
@retry(times=3)
def flaky_api_call():
...
Class-based decorator:
class cache:
def __init__(self, func):
self.func = func
self.memo = {}
functools.update_wrapper(self, func)
def __call__(self, *args):
if args not in self.memo:
self.memo[args] = self.func(*args)
return self.memo[args]
@cache
def fib(n):
if n < 2: return n
return fib(n-1) + fib(n-2)
yield
Answer:
A generator function uses yield to produce values lazily.
Each call to next() resumes execution after the last
yield.
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
gen = fibonacci()
print([next(gen) for _ in range(8)]) # [0, 1, 1, 2, 3, 5, 8, 13]
# Generator pipeline — memory efficient stream processing
def read_lines(path):
with open(path) as f:
for line in f:
yield line.strip()
def filter_lines(lines, keyword):
for line in lines:
if keyword in line:
yield line
# Processes a 10 GB log file with near-zero memory overhead
lines = read_lines("huge.log")
errors = filter_lines(lines, "ERROR")
for error in errors:
print(error)
yield from — delegating to sub-generator:
def chain(*iterables):
for it in iterables:
yield from it
list(chain([1, 2], [3, 4], [5])) # [1, 2, 3, 4, 5]
Answer:
Context managers implement the __enter__ /
__exit__ protocol and are used with the
with statement to guarantee cleanup (even on exceptions).
# Class-based
class DatabaseConnection:
def __init__(self, url):
self.url = url
def __enter__(self):
self.conn = connect(self.url)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
return False # don't suppress exceptions
with DatabaseConnection("postgres://...") as conn:
conn.execute("SELECT 1")
# connection is closed here regardless of exceptions
# Generator-based (using contextlib)
from contextlib import contextmanager
@contextmanager
def timer(label):
start = time.perf_counter()
try:
yield
finally:
print(f"{label}: {time.perf_counter() - start:.3f}s")
with timer("DB query"):
results = db.query("SELECT * FROM users")
nonlocal
Answer:
A closure is a function that remembers the variables from its enclosing
scope even after that scope has finished.
def make_counter(start=0):
count = start # free variable captured by the closure
def increment():
nonlocal count # tells Python to modify the enclosing scope's variable
count += 1
return count
return increment
counter = make_counter()
print(counter()) # 1
print(counter()) # 2
print(counter()) # 3
import itertools
# Infinite iterators
counter = itertools.count(10, 2) # 10, 12, 14, ...
cycling = itertools.cycle([1, 2, 3]) # 1, 2, 3, 1, 2, 3, ...
# Combinatoric iterators
list(itertools.permutations("ABC", 2)) # ('A','B'),('A','C'),...
list(itertools.combinations("ABC", 2)) # ('A','B'),('A','C'),('B','C')
# Terminating iterators
list(itertools.chain([1,2],[3,4])) # [1, 2, 3, 4]
list(itertools.islice(range(100), 5)) # [0, 1, 2, 3, 4]
list(itertools.takewhile(lambda x: x<5, [1,3,5,2])) # [1, 3]
for key, group in itertools.groupby([1,1,2,2,3], key=lambda x: x):
print(key, list(group))
# 1 [1, 1]
# 2 [2, 2]
# 3 [3]
Answer:
A metaclass is the class of a class — it controls how classes are
created. The default metaclass is type.
# type(name, bases, dict) creates a class dynamically
Dog = type("Dog", (object,), {"speak": lambda self: "Woof"})
Dog().speak() # 'Woof'
# Custom metaclass
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Config(metaclass=SingletonMeta):
def __init__(self):
self.debug = False
c1 = Config()
c2 = Config()
print(c1 is c2) # True — same instance
Real-world use cases: Django's ORM models, SQLAlchemy declarative base, ABC enforcement, automatic model registration patterns.
import functools
# partial — pre-fill some arguments
from functools import partial
def power(base, exp):
return base ** exp
square = partial(power, exp=2)
cube = partial(power, exp=3)
print(square(5), cube(3)) # 25 27
# reduce — fold a sequence into one value
from functools import reduce
product = reduce(lambda acc, x: acc * x, [1, 2, 3, 4, 5]) # 120
# total_ordering — automate remaining comparison dunder methods
# Provide __eq__ + ONE of __lt__ / __le__ / __gt__ / __ge__; functools generates the rest
@functools.total_ordering
class Student:
def __init__(self, gpa): self.gpa = gpa
def __eq__(self, other): return self.gpa == other.gpa
def __lt__(self, other): return self.gpa < other.gpa
# __le__, __gt__, __ge__ auto-generated
# singledispatch — type-based function overloading
@functools.singledispatch
def process(data):
raise NotImplementedError(f"No handler for {type(data).__name__}")
@process.register(int)
def _(data): return data * 2
@process.register(str)
def _(data): return data.upper()
@process.register(list)
def _(data): return [process(item) for item in data]
print(process(5)) # 10
print(process("hello")) # HELLO
print(process([1, "a"])) # [2, 'A']
# cached_property — compute once, store as instance attr (Python 3.8+)
from functools import cached_property
class DataSet:
def __init__(self, data): self.data = data
@cached_property
def statistics(self):
# runs only on first access; stored in self.__dict__['statistics']
return {"mean": sum(self.data) / len(self.data)}
# NOTE: NOT thread-safe; use a lock if accessed from multiple threads
# lru_cache — memoize function calls (thread-safe)
@functools.lru_cache(maxsize=128) # maxsize=None → unbounded
def fib(n):
if n < 2: return n
return fib(n - 1) + fib(n - 2)
fib.cache_info() # CacheInfo(hits=..., misses=..., maxsize=128, currsize=...)
fib.cache_clear() # clear cached data
# @functools.cache (Python 3.9+) = lru_cache(maxsize=None)
from functools import cache
@cache
def expensive(n): ...
__getattr__, __setattr__,
__getattribute__
Answer:
These hooks intercept attribute access — essential knowledge for
building proxies, ORMs, config objects, and debugging tools.
# __getattr__ — called ONLY when the attribute is NOT found through normal paths
# (i.e., not in instance __dict__, class __dict__, or MRO)
# Safe override point for dynamic/fallback attributes
class FlexibleConfig:
def __init__(self, data): self._data = data
def __getattr__(self, name):
if name in self._data:
return self._data[name]
raise AttributeError(f"No config key: {name!r}")
cfg = FlexibleConfig({"debug": True, "port": 8080})
print(cfg.debug) # True — falls through to __getattr__
# __setattr__ — called on EVERY attribute assignment
class Validated:
def __setattr__(self, name, value):
if name == "age" and (not isinstance(value, int) or value < 0):
raise ValueError("age must be a non-negative int")
super().__setattr__(name, value) # MUST call super() — else infinite recursion!
# __getattribute__ — called on EVERY attribute access (even normal ones)
# Override with extreme care; almost always need to call super()
class Audited:
def __getattribute__(self, name):
# Log every access
if not name.startswith("_"): # avoid infinite recursion on __dict__ etc.
print(f"Accessing {name}")
return super().__getattribute__(name) # MUST call super()
# Lookup order (simplified):
# 1. type(obj).__mro__ — data descriptors (e.g., @property with setter)
# 2. obj.__dict__ — instance attributes
# 3. type(obj).__mro__ — non-data descriptors + class attributes
# 4. __getattr__ — only if all above fail
#
# Summary:
# __getattribute__ → intercepts ALL access (risky)
# __getattr__ → intercepts MISS (safe fallback)
# __setattr__ → intercepts ALL assignment
# __delattr__ → intercepts ALL del obj.x
from typing import TypeVar, Generic, TypedDict, Literal, Final, overload, Callable
from typing import NotRequired # Python 3.11+
# TypeVar — parameterise a function or class
T = TypeVar("T")
def first(items: list[T]) -> T:
return items[0]
x: int = first([1, 2, 3]) # T inferred as int
s: str = first(["a", "b"]) # T inferred as str
# Generic class
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None: self._items.append(item)
def pop(self) -> T: return self._items.pop()
stack: Stack[int] = Stack()
stack.push(1) # OK
stack.push("x") # mypy error
# TypedDict — dict with a statically-checked shape
class Movie(TypedDict):
title: str
year: int
rating: float
class Config(TypedDict, total=False): # all keys optional
debug: bool
port: int
# NotRequired in Python 3.11+
class User(TypedDict):
name: str
email: NotRequired[str] # optional key
# Literal — restrict values
def set_log_level(level: Literal["DEBUG", "INFO", "WARNING", "ERROR"]) -> None: ...
# Final — constant (can't be reassigned)
MAX_RETRIES: Final = 3
# @overload — multiple call signatures for one implementation
@overload
def parse(data: str) -> dict: ...
@overload
def parse(data: bytes) -> dict: ...
def parse(data): # actual implementation (not type-checked directly)
if isinstance(data, bytes):
data = data.decode()
import json; return json.loads(data)
# ParamSpec — preserve signature through decorators (Python 3.10+)
from typing import ParamSpec, Concatenate
P = ParamSpec("P")
def logged(func: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
print(f"Calling {func.__name__}")
return func(*args, **kwargs)
return wrapper
Answer:
The GIL is a mutex in CPython that ensures only one thread executes
Python bytecode at a time, even on multi-core hardware.
Why does it exist? CPython's memory management (reference counting) is not thread-safe. The GIL protects Python objects from concurrent mutations without needing fine-grained locking on every object.
Consequences:
Workarounds:
multiprocessing for CPU-bound work (separate
processes, each has own GIL)
asyncio for I/O-bound concurrency (cooperative,
single-threaded)
Answer:
sys.getrefcount(obj)).
gc module)
periodically detects and collects these.
import sys
a = []
print(sys.getrefcount(a)) # 2 (a + the getrefcount argument)
b = a
print(sys.getrefcount(a)) # 3
del b
print(sys.getrefcount(a)) # 2 again
Answer:
CPython interns (caches) small integers (-5 to 256) and short strings
that look like identifiers. This means identity comparisons
(is) may accidentally work for these values — but you
should never rely on it.
a = "hello"
b = "hello"
print(a is b) # True (interned — implementation detail)
a = "hello world"
b = "hello world"
print(a is b) # May be False (not necessarily interned)
# You can explicitly intern strings
import sys
a = sys.intern("hello world")
b = sys.intern("hello world")
print(a is b) # True
Answer:
A weak reference does not prevent garbage collection.
The object is freed when only weak references remain. Useful for caches,
event systems, and breaking reference cycles without manual cleanup.
import weakref
class Resource:
def __init__(self, name): self.name = name
def __repr__(self): return f"Resource({self.name!r})"
r = Resource("db_conn")
ref = weakref.ref(r) # create weak reference
print(ref()) # Resource('db_conn') — alive
del r # remove last strong reference
print(ref()) # None — garbage collected
# WeakValueDictionary — values are weak-referenced; auto-removed when GC'd
cache = weakref.WeakValueDictionary()
obj = Resource("cached")
cache["key"] = obj
print(cache.get("key")) # Resource('cached')
del obj
print(cache.get("key")) # None — automatically removed
# WeakSet — set of weakly-referenced objects
# WeakKeyDictionary — keys are weakly referenced
# finalize — register a cleanup callback when object is GC'd
def cleanup(name): print(f"Cleaning up {name}")
obj2 = Resource("temp")
weakref.finalize(obj2, cleanup, "temp")
del obj2 # prints "Cleaning up temp"
# Real-world pattern: event emitter that doesn't keep listeners alive
class EventBus:
def __init__(self):
self._listeners = weakref.WeakSet()
def subscribe(self, fn): self._listeners.add(fn)
def emit(self, event):
for fn in list(self._listeners): # copy — set may shrink during iteration
fn(event)
__slots__ In-Depth
Answer:
By default every Python instance has a __dict__ — a
per-instance hash table. __slots__ replaces it with a fixed
array of named slots, saving ~40-50% memory and speeding up attribute
access.
import sys
class WithDict:
def __init__(self, x, y):
self.x = x; self.y = y
class WithSlots:
__slots__ = ("x", "y") # no __dict__; descriptors stored directly on type
def __init__(self, x, y):
self.x = x; self.y = y
d = WithDict(1, 2); s = WithSlots(1, 2)
print(sys.getsizeof(d)) # ~48 bytes (+ ~200 for __dict__)
print(sys.getsizeof(s)) # ~40 bytes (no __dict__ overhead)
# At million-instance scale: ~160 MB saved
# Gotchas:
# 1. Cannot add new attributes not in __slots__
s.z = 3 # AttributeError: 'WithSlots' object has no attribute 'z'
# 2. Inheritance: if parent has __dict__, child inherits it even with __slots__
class Base:
__slots__ = ("x",)
class Child(Base):
__slots__ = ("y",) # inherits base's slots; no __dict__ only if ALL parents use __slots__
# 3. __weakref__ is removed by default; add it back if needed
class Slotted:
__slots__ = ("x", "y", "__weakref__")
# 4. @cached_property needs __dict__; incompatible with pure __slots__
# When to use:
# — Value objects / data containers with fixed fields
# — Millions of instances (game entities, financial tick data, ML features)
# — When you want to prevent accidental dynamic attribute creation
| Model | Best For | GIL? | Overhead | Communication |
|---|---|---|---|---|
| threading | I/O-bound | Limited by GIL | Low | Shared memory (locks needed) |
| multiprocessing | CPU-bound | No (own process) | High (process spawn) | Queue, Pipe, shared memory |
| asyncio | I/O-bound, high concurrency | Irrelevant (single thread) | Very low | Coroutines, queues |
Answer:
asyncio is Python's built-in framework for writing
concurrent I/O code using coroutines. It uses a single-threaded
event loop that cooperative multitasks between
coroutines.
import asyncio
import aiohttp
# async def defines a coroutine
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
# asyncio.gather runs all coroutines concurrently
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Running the event loop
async def main():
urls = ["https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum"]
data = await fetch_all(urls)
for d in data:
print(d['login'], d['public_repos'])
asyncio.run(main())
Key concepts:
async def — defines a coroutine functionawait — suspends the current coroutine until the
awaitable completes; yields control back to the event loop
asyncio.gather() — runs tasks concurrently, waits for all
asyncio.create_task() — schedules a coroutine to run soon
(fire-and-forget style)
asyncio.Queue — async-safe producer/consumer queue# asyncio.create_task — schedule without waiting immediately
async def main():
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.create_task(another_coroutine())
# Both are now running concurrently
result1 = await task1
result2 = await task2
asyncio vs threading for I/O: asyncio can handle tens of thousands of concurrent connections in a single thread; threads need one OS thread per connection which is expensive at scale.
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # acquires and releases automatically
counter += 1
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 1000 (safe)
# Other synchronization primitives
rlock = threading.RLock() # reentrant lock (same thread can acquire multiple times)
event = threading.Event() # flag-based signaling
semaphore = threading.Semaphore(5) # limit concurrent access
Answer:
concurrent.futures is the recommended high-level interface
for running callables in threads or processes. It returns
Future objects and abstracts pool management.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests
urls = ["https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
"https://api.github.com/users/antirez"]
def fetch(url):
return requests.get(url, timeout=5).json()["public_repos"]
# ThreadPoolExecutor — I/O-bound (network, disk, DB)
with ThreadPoolExecutor(max_workers=10) as executor:
# map — submits all at once, yields results IN ORDER as they complete
results = list(executor.map(fetch, urls))
# submit — more control: returns a Future immediately
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures): # yields as each finishes
url = futures[future]
try:
print(f"{url}: {future.result()} repos")
except Exception as e:
print(f"{url} failed: {e}")
# ProcessPoolExecutor — CPU-bound (numpy, image processing, encryption)
def compute_heavy(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor() as executor: # default: os.cpu_count() workers
results = list(executor.map(compute_heavy, [1_000_000] * 4))
# Key points:
# - Threads share memory; use locks for shared mutable state
# - Processes have separate memory; pass data via pickling (overhead)
# - Both executor types are context managers that call shutdown(wait=True) on exit
# - Future.result(timeout=N) blocks and can raise the exception from the worker
Answer:
Never call a blocking function directly in an
async function — it freezes the entire event loop. Offload
it to a thread pool using asyncio.to_thread (Python 3.9+)
or loop.run_in_executor.
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_io(): # synchronous — would block the event loop
time.sleep(1)
return "done"
def cpu_intensive(n): # CPU-bound — needs its own process
return sum(i * i for i in range(n))
async def main():
# Python 3.9+ — clean, simple, uses default thread pool
result = await asyncio.to_thread(blocking_io)
# Full control: specify executor type
loop = asyncio.get_running_loop()
# Thread pool (default=None) for blocking I/O
result = await loop.run_in_executor(None, blocking_io)
# Process pool for CPU-bound work
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive, 1_000_000)
asyncio.run(main())
# FastAPI rule:
# def route → FastAPI automatically runs it in a threadpool (safe for blocking calls)
# async def → runs in the event loop (NEVER call blocking code!)
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync")
def sync_view(): # runs in threadpool — blocking calls OK
time.sleep(1)
return {"ok": True}
@app.get("/async")
async def async_view(): # runs in event loop — use await only
await asyncio.sleep(1)
return {"ok": True}
import asyncio
# asyncio.Lock — mutual exclusion (like threading.Lock but for coroutines)
lock = asyncio.Lock()
async def safe_write(resource, data):
async with lock: # acquires; other coroutines yield until released
await resource.write(data)
# asyncio.Semaphore — limit concurrency
sem = asyncio.Semaphore(5) # max 5 coroutines at once
async def limited_fetch(session, url):
async with sem:
return await session.get(url)
# asyncio.Event — one-to-many signaling
ready = asyncio.Event()
async def producer():
await asyncio.sleep(1)
ready.set()
async def consumer():
await ready.wait() # suspends until set()
print("Ready!")
# Timeout — Python 3.11+
async def with_timeout():
try:
async with asyncio.timeout(5.0):
result = await slow_operation()
except TimeoutError:
print("Timed out")
# Pre-3.11
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
# asyncio.Queue — async-safe producer/consumer pipeline
queue = asyncio.Queue(maxsize=10)
async def producer(q):
for i in range(20):
await q.put(i) # blocks if queue full
await q.put(None) # sentinel
async def consumer(q):
while (item := await q.get()) is not None:
process(item)
q.task_done() # signals work complete
await q.join() # wait until all items processed
# gather vs wait
# gather — run all concurrently, return results in order
results = await asyncio.gather(t1(), t2(), return_exceptions=True)
# wait — finer control over completion condition
done, pending = await asyncio.wait(
{asyncio.create_task(t) for t in tasks},
return_when=asyncio.FIRST_COMPLETED # or FIRST_EXCEPTION, ALL_COMPLETED
)
for task in pending:
task.cancel()
Answer:
Flask is a micro web framework for Python. "Micro"
means it provides the core tools (routing, request/response handling,
templating) without making decisions about database, authentication, or
form handling. These are handled by extensions.
Core components:
@app.route()
g,
current_app
request,
session
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/") # GET by default
def index():
return "Hello, World!"
@app.route("/users/<int:user_id>", methods=["GET"])
def get_user(user_id):
user = {"id": user_id, "name": "Alice"}
return jsonify(user), 200
@app.route("/users", methods=["POST"])
def create_user():
data = request.get_json() # parse JSON body
name = data.get("name")
if not name:
return jsonify({"error": "name required"}), 400
return jsonify({"id": 1, "name": name}), 201
if __name__ == "__main__":
app.run(debug=True, port=5000)
URL converters: <string:x>,
<int:x>, <float:x>,
<path:x>, <uuid:x>
from flask import request
@app.route("/search")
def search():
query = request.args.get("q", "") # URL query params: ?q=python
page = request.args.get("page", 1, type=int)
body = request.get_json() # JSON body
form_data = request.form.get("username") # form-encoded body
files = request.files.get("upload") # file upload
headers = request.headers.get("Authorization")
cookies = request.cookies.get("session_id")
method = request.method # GET, POST, etc.
return jsonify({"query": query, "page": page})
Answer:
Blueprints allow you to split a Flask application into modular, reusable
components — like mini-applications that are later registered on the
main app.
# auth/routes.py
from flask import Blueprint
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
@auth_bp.route("/login", methods=["POST"])
def login():
...
@auth_bp.route("/logout")
def logout():
...
# users/routes.py
users_bp = Blueprint("users", __name__, url_prefix="/users")
@users_bp.route("/")
def list_users():
...
# app.py
from flask import Flask
from auth.routes import auth_bp
from users.routes import users_bp
def create_app():
app = Flask(__name__)
app.register_blueprint(auth_bp)
app.register_blueprint(users_bp)
return app
@app.before_request
def authenticate():
token = request.headers.get("Authorization")
if not token and request.endpoint != "auth.login":
return jsonify({"error": "Unauthorized"}), 401
@app.after_request
def add_cors_headers(response):
response.headers["Access-Control-Allow-Origin"] = "*"
return response
@app.teardown_appcontext
def close_db(error):
db = g.pop("db", None)
if db:
db.close()
# Error handlers
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "Not found"}), 404
@app.errorhandler(500)
def internal_error(e):
return jsonify({"error": "Internal server error"}), 500
Answer:
The factory pattern is the recommended way to structure a Flask app. It
defers app creation, makes testing easier (create isolated apps per
test), and avoids circular imports.
# app/__init__.py
from flask import Flask
from .extensions import db, migrate, jwt
from .config import config_by_name
def create_app(config_name="development"):
app = Flask(__name__)
app.config.from_object(config_by_name[config_name])
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
jwt.init_app(app)
# Register blueprints
from .api.v1 import api_v1
app.register_blueprint(api_v1, url_prefix="/api/v1")
return app
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
posts = db.relationship("Post", back_populates="author", lazy="dynamic")
class Post(db.Model):
__tablename__ = "posts"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
author = db.relationship("User", back_populates="posts")
# Querying
users = User.query.all()
user = User.query.filter_by(username="alice").first()
user = User.query.get(1)
admins = User.query.filter(User.role == "admin").order_by(User.username).all()
# CRUD
new_user = User(username="bob", email="bob@example.com")
db.session.add(new_user)
db.session.commit()
user.username = "bob2"
db.session.commit()
db.session.delete(user)
db.session.commit()
Answer:
Flask's built-in server is for development only (single-threaded, no
SSL, no process management). In production:
gunicorn -w 4 "app:create_app()"
# Gunicorn with 4 workers, gevent async workers for I/O heavy apps
gunicorn -w 4 -k gevent --bind 0.0.0.0:8000 "app:create_app()"
# Dockerfile snippet
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]
from flask import Flask, request, jsonify
from flask_jwt_extended import (
JWTManager, create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt
)
from datetime import timedelta
app = Flask(__name__)
app.config["JWT_SECRET_KEY"] = "super-secret-change-in-prod"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(hours=1)
app.config["JWT_REFRESH_TOKEN_EXPIRES"] = timedelta(days=30)
jwt = JWTManager(app)
@app.route("/login", methods=["POST"])
def login():
data = request.get_json()
user = verify_credentials(data["username"], data["password"]) # your function
if not user:
return jsonify({"error": "Bad credentials"}), 401
access_token = create_access_token(
identity=str(user.id),
additional_claims={"role": user.role}
)
refresh_token = create_refresh_token(identity=str(user.id))
return jsonify(access_token=access_token, refresh_token=refresh_token)
@app.route("/refresh", methods=["POST"])
@jwt_required(refresh=True) # only accepts refresh tokens
def refresh():
identity = get_jwt_identity()
new_token = create_access_token(identity=identity)
return jsonify(access_token=new_token)
@app.route("/protected")
@jwt_required()
def protected():
user_id = get_jwt_identity() # str you passed as identity
claims = get_jwt() # full JWT payload dict
print(claims["role"]) # additional_claims you set earlier
return jsonify(user_id=user_id)
# Token revocation with a blocklist (e.g., Redis)
@jwt.token_in_blocklist_loader
def check_if_revoked(jwt_header, jwt_payload):
jti = jwt_payload["jti"]
return redis_client.get(f"token:blocked:{jti}") is not None
@app.route("/logout", methods=["DELETE"])
@jwt_required()
def logout():
jti = get_jwt()["jti"]
ttl = app.config["JWT_ACCESS_TOKEN_EXPIRES"]
redis_client.set(f"token:blocked:{jti}", "1", ex=int(ttl.total_seconds()))
return jsonify(msg="Token revoked")
import os
# config.py: class-based config hierarchy
class Config:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-key")
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = "sqlite:///dev.db"
SQLALCHEMY_ECHO = True
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_DATABASE_URI = os.environ["DATABASE_URL"] # must be set
# Never hardcode secrets — always from environment variables
config_by_name = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
}
# create_app usage
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_ENV", "development")
app = Flask(__name__)
app.config.from_object(config_by_name[config_name])
# Override from a .env file (python-dotenv)
# from dotenv import load_dotenv; load_dotenv() before create_app()
return app
# Accessing config inside app context:
# from flask import current_app
# db_url = current_app.config["SQLALCHEMY_DATABASE_URI"]
Answer:
FastAPI is a modern, high-performance Python web framework built on
Starlette (ASGI) and Pydantic. It
supports async/await natively and
auto-generates OpenAPI (Swagger) documentation.
| Feature | Flask | FastAPI |
|---|---|---|
| Protocol | WSGI (sync) | ASGI (async first) |
| Type hints | Optional | Central (Pydantic validation) |
| Docs generation | Manual / extensions | Auto (OpenAPI + Swagger UI) |
| Validation | Manual / WTForms | Automatic via Pydantic |
| Performance | Moderate | Very high (on par with Node.js) |
| Async support | Third-party (quart) | Built-in |
| Learning curve | Low | Low-Medium |
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, EmailStr, validator
from typing import Optional, List
app = FastAPI(title="My API", version="1.0.0")
# Pydantic model for request body validation
class UserCreate(BaseModel):
username: str
email: EmailStr
age: int
tags: List[str] = []
@validator("age")
def age_must_be_positive(cls, v):
if v < 0:
raise ValueError("age must be positive")
return v
# Pydantic model for response
class UserResponse(BaseModel):
id: int
username: str
email: str
class Config:
orm_mode = True # allows reading from ORM objects (SQLAlchemy rows)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
db_user = await db.create_user(**user.dict())
return db_user
Answer:
FastAPI's dependency injection system (Depends) is a clean
way to share reusable components (db sessions, auth checks, rate
limiters) across routes without repeating code.
from fastapi import Depends, FastAPI, Header
from sqlalchemy.orm import Session
from .database import SessionLocal
app = FastAPI()
# Database dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Auth dependency
async def get_current_user(authorization: str = Header(...), db: Session = Depends(get_db)):
token = authorization.replace("Bearer ", "")
payload = verify_jwt(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(User).filter(User.id == payload["sub"]).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
# Route uses both dependencies
@app.get("/me")
async def read_me(current_user: User = Depends(get_current_user)):
return current_user
# Dependency classes (stateful / configurable dependencies)
class Paginator:
def __init__(self, max_limit: int = 100):
self.max_limit = max_limit
def __call__(self, skip: int = 0, limit: int = 10):
if limit > self.max_limit:
raise HTTPException(400, f"limit > {self.max_limit}")
return {"skip": skip, "limit": limit}
pagination = Paginator(max_limit=50)
@app.get("/items")
async def list_items(page: dict = Depends(pagination)):
...
from fastapi import BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_methods=["*"],
allow_headers=["*"],
)
# Custom middleware
class RequestTimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Process-Time"] = str(duration)
return response
app.add_middleware(RequestTimingMiddleware)
# Background tasks — run after response is sent
def send_welcome_email(email: str):
# This runs after the response is returned to the client
mailer.send(to=email, subject="Welcome!")
@app.post("/register")
async def register(user: UserCreate, background_tasks: BackgroundTasks):
new_user = await create_user(user)
background_tasks.add_task(send_welcome_email, new_user.email)
return new_user # returned immediately; email sent asynchronously
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
Answer:
Pydantic V2 (2023) rewrote the core in Rust — up to 50x faster
validation. Key changes:
@validator → @field_validatorclass Config →
model_config = ConfigDict(...)
orm_mode = True → from_attributes = True
.dict() → .model_dump().json() → .model_dump_json()from pydantic import BaseModel, field_validator, ConfigDict
class User(BaseModel):
model_config = ConfigDict(from_attributes=True)
name: str
age: int
@field_validator("age")
@classmethod
def age_positive(cls, v):
assert v > 0, "age must be positive"
return v
user = User(name="Alice", age=30)
print(user.model_dump()) # {'name': 'Alice', 'age': 30}
Answer:
ASGI servers: Uvicorn (most common with FastAPI), Hypercorn, Daphne.
# Running FastAPI with uvicorn
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# Production: gunicorn managing uvicorn workers
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
class ConnectionManager:
def __init__(self):
self.active: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for connection in self.active:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(ws: WebSocket, client_id: int):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text() # blocks until a message arrives
await manager.broadcast(f"Client {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(ws)
await manager.broadcast(f"Client {client_id} left")
# Available methods:
# ws.send_text(str) / ws.receive_text()
# ws.send_json(dict) / ws.receive_json()
# ws.send_bytes(bytes) / ws.receive_bytes()
# ws.close(code=1000, reason="bye") # explicit close
# Query params and headers work on WS routes too:
@app.websocket("/ws")
async def ws_with_token(ws: WebSocket, token: str = Query(...)):
user = verify_token(token)
if not user:
await ws.close(code=1008) # policy violation
return
await ws.accept()
...
Answer:
Register startup and shutdown logic using the
lifespan context manager (recommended since FastAPI 0.93+).
Resources are shared via app.state.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
@asynccontextmanager
async def lifespan(app: FastAPI):
# ─── STARTUP: runs before the first request ───
app.state.db = await create_db_pool()
app.state.redis = await create_redis_pool()
print("Application started")
yield # ← application serves requests here
# ─── SHUTDOWN: runs after the last request ───
await app.state.db.close()
await app.state.redis.close()
print("Application stopped")
app = FastAPI(lifespan=lifespan)
# Access app-level state in a route via Request or Depends
@app.get("/users")
async def list_users(request: Request):
pool = request.app.state.db
rows = await pool.fetch("SELECT * FROM users")
return rows
# Legacy (deprecated but still works):
@app.on_event("startup") async def startup(): ...
@app.on_event("shutdown") async def shutdown(): ...
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt # pip install python-jose[cryptography]
from passlib.context import CryptContext # pip install passlib[bcrypt]
from datetime import datetime, timedelta
SECRET_KEY = "change-me-in-env"
ALGORITHM = "HS256"
pwd_ctx = CryptContext(schemes=["bcrypt"])
oauth2 = OAuth2PasswordBearer(tokenUrl="/token")
# Password utilities
def hash_password(p: str) -> str: return pwd_ctx.hash(p)
def verify_password(p, h) -> bool: return pwd_ctx.verify(p, h)
# Token creation
def create_access_token(data: dict, expires: timedelta = timedelta(hours=1)) -> str:
payload = {**data, "exp": datetime.utcnow() + expires}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# Dependency: extract and validate token
async def get_current_user(token: str = Depends(oauth2)):
exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None: raise exc
except JWTError:
raise exc
user = await db.get(User, int(user_id))
if not user: raise exc
return user
@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = await db.get_by_username(User, form.username)
if not user or not verify_password(form.password, user.hashed_password):
raise HTTPException(400, "Incorrect credentials")
token = create_access_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}
@app.get("/me")
async def me(user = Depends(get_current_user)):
return user
# Role-based access via dependency
def require_role(role: str):
async def check(user = Depends(get_current_user)):
if user.role != role:
raise HTTPException(403, "Forbidden")
return user
return check
@app.delete("/admin/users/{id}")
async def delete_user(id: int, _=Depends(require_role("admin"))):
...
Answer:
Django is a batteries-included web framework following
the MVT (Model-View-Template) pattern.
Django's key built-ins:
from django.db import models
class Author(models.Model):
name = models.CharField(max_length=100)
email = models.EmailField(unique=True)
class Meta:
ordering = ["name"]
class Book(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name="books")
published = models.DateField()
price = models.DecimalField(max_digits=8, decimal_places=2)
class Meta:
indexes = [models.Index(fields=["author", "published"])]
# --- Basic Queries ---
books = Book.objects.all() # QuerySet (lazy)
book = Book.objects.get(id=1) # single obj or DoesNotExist
books = Book.objects.filter(price__lt=20) # price < 20
books = Book.objects.exclude(published__year=2020)
book = Book.objects.filter(author__name="Alice").first()
count = Book.objects.count()
# --- Field lookups ---
# exact, iexact, contains, icontains, startswith, endswith
# in, gt, gte, lt, lte, range, date, year, isnull
books = Book.objects.filter(title__icontains="python")
books = Book.objects.filter(published__year__in=[2022, 2023])
books = Book.objects.filter(author__isnull=False)
# --- Related queries (avoid N+1) ---
# select_related — SQL JOIN for ForeignKey/OneToOne (one SQL query)
books = Book.objects.select_related("author").all()
# prefetch_related — extra query + Python join (for ManyToMany / reverse FK)
books = Book.objects.prefetch_related("author__books").all()
# --- Aggregation ---
from django.db.models import Avg, Count, Max, Min, Sum, F, Q
stats = Book.objects.aggregate(
avg_price=Avg("price"),
total=Count("id"),
max_price=Max("price"),
)
# --- annotate — per-row aggregation ---
from django.db.models import Count
authors = Author.objects.annotate(book_count=Count("books"))
for a in authors:
print(a.name, a.book_count)
# --- F expressions — reference another field in a query (no Python round-trip) ---
Book.objects.filter(price__lt=F("original_price") * 0.8) # 20% discount
# --- Q objects — complex AND/OR queries ---
books = Book.objects.filter(
Q(price__lt=15) | Q(published__year=2023)
)
books = Book.objects.filter(
Q(title__icontains="python") & ~Q(author__name="Bob") # ~Q = NOT
)
# --- update and delete ---
Book.objects.filter(price__lt=5).update(price=5)
Book.objects.filter(published__year__lt=2000).delete()
# --- bulk operations ---
Book.objects.bulk_create([
Book(title="A", author=author1, price=10, published="2023-01-01"),
Book(title="B", author=author2, price=15, published="2023-01-01"),
])
Answer:
Migrations are Django's way of propagating model changes into the
database schema.
python manage.py makemigrations # detects model changes, creates migration files
python manage.py migrate # applies pending migrations to the DB
python manage.py showmigrations # lists all migrations and their status
python manage.py migrate appname 0002 # roll back to migration 0002
python manage.py sqlmigrate app 0001 # shows the raw SQL for a migration
Custom data migration:
# In a migration file
def populate_slugs(apps, schema_editor):
Book = apps.get_model("books", "Book")
for book in Book.objects.all():
book.slug = slugify(book.title)
book.save()
class Migration(migrations.Migration):
dependencies = [("books", "0002_book_slug")]
operations = [
migrations.RunPython(populate_slugs, migrations.RunPython.noop),
]
# Function-Based Views (FBV) — simple, explicit
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
@require_http_methods(["GET", "POST"])
def user_view(request):
if request.method == "GET":
users = list(User.objects.values("id", "username"))
return JsonResponse({"users": users})
elif request.method == "POST":
data = json.loads(request.body)
user = User.objects.create(**data)
return JsonResponse({"id": user.id}, status=201)
# Class-Based Views (CBV) — reusable, DRY
from django.views import View
class UserView(View):
def get(self, request):
users = list(User.objects.values("id", "username"))
return JsonResponse({"users": users})
def post(self, request):
data = json.loads(request.body)
user = User.objects.create(**data)
return JsonResponse({"id": user.id}, status=201)
# Generic CBVs (maximum DRY)
from django.views.generic import ListView, DetailView, CreateView
class BookListView(ListView):
model = Book
template_name = "books/list.html"
context_object_name = "books"
paginate_by = 20
from rest_framework import serializers, viewsets, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
# Serializer = validation + serialization
class BookSerializer(serializers.ModelSerializer):
author_name = serializers.CharField(source="author.name", read_only=True)
class Meta:
model = Book
fields = ["id", "title", "author", "author_name", "price", "published"]
read_only_fields = ["id"]
def validate_price(self, value):
if value <= 0:
raise serializers.ValidationError("Price must be positive")
return value
# ViewSet = all CRUD in one class
class BookViewSet(viewsets.ModelViewSet):
queryset = Book.objects.select_related("author").all()
serializer_class = BookSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
def get_queryset(self):
qs = super().get_queryset()
author = self.request.query_params.get("author")
if author:
qs = qs.filter(author__name__icontains=author)
return qs
@action(detail=True, methods=["post"])
def mark_featured(self, request, pk=None):
book = self.get_object()
book.is_featured = True
book.save()
return Response({"status": "marked as featured"})
# urls.py
from rest_framework.routers import DefaultRouter
router = DefaultRouter()
router.register("books", BookViewSet)
urlpatterns = router.urls
Answer:
Middleware is a layer of hooks for request/response processing. Each
middleware can modify the request before the view, and the response
after.
# settings.py
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"corsheaders.middleware.CorsMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"myapp.middleware.RequestTimingMiddleware", # custom
]
# Custom middleware
class RequestTimingMiddleware:
def __init__(self, get_response):
self.get_response = get_response # called once on startup
def __call__(self, request):
start = time.perf_counter()
response = self.get_response(request) # calls next middleware/view
duration = time.perf_counter() - start
response["X-Process-Time"] = f"{duration:.3f}"
return response
def process_exception(self, request, exception):
# Optional: handle exceptions before Django's default handler
logger.error(f"Unhandled exception: {exception}")
return None # let Django handle it
Answer:
Signals allow decoupled parts of an application to get notified when
certain events occur.
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def send_welcome_email(sender, instance, created, **kwargs):
if created:
send_email.delay(to=instance.email, subject="Welcome!")
# Custom signal
from django.dispatch import Signal
payment_successful = Signal()
# Emit the signal
payment_successful.send(sender=Payment, amount=100, user=user)
# Listen
@receiver(payment_successful)
def notify_user(sender, amount, user, **kwargs):
Notification.objects.create(user=user, message=f"Payment of ${amount} received")
Note: Put signal receivers in apps.py
ready() to ensure they're loaded:
class UsersConfig(AppConfig):
name = "users"
def ready(self):
import users.signals # noqa
# settings.py
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/1",
}
}
# Low-level cache API
from django.core.cache import cache
cache.set("user_123", user_data, timeout=300) # 5 minutes
data = cache.get("user_123") # None if expired
cache.delete("user_123")
cache.get_or_set("key", expensive_function, timeout=60)
# View-level caching
from django.views.decorators.cache import cache_page
@cache_page(60 * 15) # 15 minutes
def my_view(request):
...
# Per-site caching (entire site via middleware — rarely used)
# Template fragment caching
{% load cache %}
{% cache 500 "sidebar" request.user.id %}
... expensive template block ...
{% endcache %}
Answer:
Occurs when fetching a list of objects (1 query) and then accessing a
related attribute for each object (N queries total).
# BAD — N+1 queries (1 for books + 1 per book for author)
books = Book.objects.all() # 1 query
for book in books:
print(book.author.name) # 1 query per book = N queries
# GOOD — select_related fixes FK/OneToOne — 1 JOIN query total
books = Book.objects.select_related("author").all()
for book in books:
print(book.author.name) # no extra queries
# GOOD — prefetch_related for ManyToMany or reverse FK — 2 queries total
books = Book.objects.prefetch_related("tags").all()
for book in books:
print([t.name for t in book.tags.all()]) # no extra queries
# Use django-debug-toolbar in development to spot N+1 issues
from django.db import transaction
# atomic() — wrap in a single DB transaction; rollback on exception
@transaction.atomic
def transfer_funds(from_acct, to_acct, amount):
from_acct.balance -= amount
from_acct.save() # if to_acct.save() raises, BOTH saves roll back
to_acct.balance += amount
to_acct.save()
# on_commit — run code ONLY after transaction commits
# (safe to trigger async tasks here — data is definitely in the DB)
@transaction.atomic
def place_order(order_data):
order = Order.objects.create(**order_data)
transaction.on_commit(lambda: send_confirmation.delay(order.id))
return order
# Nested atomic — creates a SAVEPOINT (not a nested transaction in most DBs)
def complex():
with transaction.atomic(): # outer
do_something()
try:
with transaction.atomic(): # inner = savepoint
risky_operation()
except MyError:
pass # rolled back to savepoint; outer transaction continues
# select_for_update — row-level exclusive lock (prevents race conditions)
@transaction.atomic
def purchase(user_id, item_id):
item = Item.objects.select_for_update().get(id=item_id)
# Other transactions block on this row until we commit
if item.stock > 0:
item.stock -= 1
item.save()
Order.objects.create(user_id=user_id, item=item)
else:
raise OutOfStockError
# select_for_update(nowait=True) — raise OperationalError immediately if locked
# select_for_update(skip_locked=True) — skip locked rows (good for job queues)
# ——— FBV decorators ———
from django.contrib.auth.decorators import login_required, permission_required
@login_required(login_url="/login/")
def dashboard(request): ...
@permission_required("blog.publish_post", raise_exception=True)
def publish(request): ...
# ——— CBV mixins ———
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
class PostCreateView(LoginRequiredMixin, PermissionRequiredMixin, CreateView):
permission_required = "blog.add_post"
model = Post
# ——— Programmatic checks ———
def view(request):
if not request.user.is_authenticated:
return redirect("/login/")
if not request.user.has_perm("blog.publish_post"):
return HttpResponseForbidden()
...
# ——— Custom User Model (ALWAYS do this before first migration) ———
# settings.py:
# AUTH_USER_MODEL = "accounts.User"
from django.contrib.auth.models import AbstractUser
from django.db import models
class User(AbstractUser):
phone = models.CharField(max_length=20, blank=True)
bio = models.TextField(blank=True)
avatar = models.ImageField(upload_to="avatars/", blank=True)
# AbstractUser already has: username, email, password, first_name, last_name,
# is_staff, is_active, is_superuser, groups, user_permissions, date_joined
# ——— DRF — Custom Object Permission ———
from rest_framework.permissions import BasePermission
class IsOwnerOrReadOnly(BasePermission):
"""Safe methods (GET/HEAD/OPTIONS) for all; write only for owner."""
def has_object_permission(self, request, view, obj):
if request.method in ("GET", "HEAD", "OPTIONS"):
return True
return obj.owner == request.user
class PostViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]
def perform_create(self, serializer):
serializer.save(owner=self.request.user)
# myapp/management/commands/seed_db.py
from django.core.management.base import BaseCommand, CommandError
from myapp.models import Category
import random
class Command(BaseCommand):
help = "Seeds the database with initial category data"
def add_arguments(self, parser):
parser.add_argument(
"--count", type=int, default=10,
help="Number of categories to create"
)
parser.add_argument(
"--force", action="store_true",
help="Delete existing categories first"
)
def handle(self, *args, **options):
count = options["count"]
if options["force"]:
deleted, _ = Category.objects.all().delete()
self.stdout.write(self.style.WARNING(f"Deleted {deleted} categories"))
categories = [
Category(name=f"Category {i}", slug=f"category-{i}")
for i in range(count)
]
Category.objects.bulk_create(categories, ignore_conflicts=True)
self.stdout.write(self.style.SUCCESS(f"Created {count} categories"))
# Usage:
# python manage.py seed_db
# python manage.py seed_db --count 50 --force
# Other built-in commands worth knowing:
# python manage.py shell — interactive Python with your app loaded
# python manage.py dbshell — raw DB shell (psql, sqlite3…)
# python manage.py dumpdata — export data as JSON/YAML
# python manage.py loaddata — import fixtures
# python manage.py flush — delete all data (keep schema)
# python manage.py createsuperuser — create admin user
import psycopg2
# Raw psycopg2 (PostgreSQL)
conn = psycopg2.connect(dsn="postgresql://user:pass@localhost/mydb")
cur = conn.cursor()
cur.execute("SELECT id, name FROM users WHERE age > %s", (18,)) # parameterized!
rows = cur.fetchall()
conn.commit()
cur.close()
conn.close()
# NEVER do this (SQL injection risk):
# cur.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# SQLAlchemy Core (SQL expression language)
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://user:pass@localhost/mydb")
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users WHERE age > :age"), {"age": 18})
for row in result:
print(row)
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import DeclarativeBase, Session, relationship
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
posts = relationship("Post", back_populates="user")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"))
user = relationship("User", back_populates="posts")
engine = create_engine("sqlite:///./app.db")
Base.metadata.create_all(engine)
with Session(engine) as session:
user = User(name="Alice")
post = Post(title="Hello World", user=user)
session.add_all([user, post])
session.commit()
users = session.query(User).filter(User.name == "Alice").all()
# or modern style:
from sqlalchemy import select
stmt = select(User).where(User.name == "Alice")
users = session.scalars(stmt).all()
# SQLAlchemy connection pool configuration
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost/mydb",
pool_size = 10, # persistent connections to maintain
max_overflow = 20, # extra connections allowed under burst load
pool_timeout = 30, # seconds to wait for a free connection
pool_pre_ping = True, # test connection before use (handles server-side drops)
pool_recycle = 1800, # recycle connections after 30 min (avoid stale connections)
echo = False, # set True to log all SQL
)
# asyncpg pool for async FastAPI / asyncio apps
import asyncpg
async def main():
pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/mydb",
min_size=5, max_size=20
)
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users")
await pool.close()
# ——— ALWAYS use parameterised queries ———
# Safe — placeholders prevent SQL injection:
cur.execute("SELECT * FROM users WHERE name = %s AND age > %s", (name, age))
# UNSAFE — never do this:
# cur.execute(f"SELECT * FROM users WHERE name = '{name}'")
# ——— Database Indexes ———
# Django:
class Book(models.Model):
title = models.CharField(max_length=200, db_index=True) # single-column
author = models.ForeignKey(Author, on_delete=models.CASCADE)
published = models.DateField()
class Meta:
indexes = [
models.Index(fields=["author", "published"]), # composite index
models.Index(fields=["-published"]), # descending index
]
# Index a column when it appears in: WHERE, JOIN ON, ORDER BY, GROUP BY
# Avoid indexing: low-cardinality columns (boolean/status), write-heavy tables
# PostgreSQL EXPLAIN ANALYZE to diagnose slow queries:
# EXPLAIN ANALYZE SELECT * FROM books WHERE author_id = 1;
# Look for: Seq Scan (bad on large tables) vs Index Scan / Index Only Scan (good)
# Cost estimate format: cost=start..total rows=N width=bytes
# Transactions in raw psycopg2
import psycopg2
conn = psycopg2.connect(dsn="postgresql://user:pass@localhost/mydb")
try:
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s", (100, 1))
cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s", (100, 2))
conn.commit() # both updates atomically committed
except Exception:
conn.rollback() # roll back both
finally:
conn.close()
| Feature | unittest | pytest |
|---|---|---|
| Discovery | test* methods in TestCase subclass | test_* functions anywhere |
| Assertions | self.assertEqual etc. | plain assert with magic rewriting |
| Fixtures | setUp/tearDown | @pytest.fixture with yield |
| Parametrize | Manual / subTest | @pytest.mark.parametrize |
| Plugins | Limited | Rich ecosystem (pytest-cov, pytest-asyncio etc.) |
import pytest
# Simple test
def test_add():
assert 1 + 1 == 2
# Fixtures
@pytest.fixture
def db_session():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
session = Session(engine)
yield session
session.close()
def test_create_user(db_session):
user = User(name="Test")
db_session.add(user)
db_session.commit()
assert db_session.query(User).count() == 1
# Parametrize
@pytest.mark.parametrize("a, b, expected", [
(1, 2, 3),
(0, 0, 0),
(-1, 1, 0),
])
def test_add_parametrized(a, b, expected):
assert a + b == expected
# Mocking
from unittest.mock import patch, MagicMock
def get_weather(city):
return requests.get(f"https://api.weather.com/{city}").json()
def test_get_weather():
mock_response = MagicMock()
mock_response.json.return_value = {"temp": 22}
with patch("requests.get", return_value=mock_response) as mock_get:
result = get_weather("London")
mock_get.assert_called_once_with("https://api.weather.com/London")
assert result == {"temp": 22}
# Testing Flask
import pytest
from app import create_app
@pytest.fixture
def client():
app = create_app("testing")
with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
db.drop_all()
def test_create_user(client):
resp = client.post("/users", json={"username": "alice", "email": "alice@test.com"})
assert resp.status_code == 201
assert resp.json["username"] == "alice"
# Testing FastAPI
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_get_user():
response = client.get("/users/1")
assert response.status_code == 200
assert "username" in response.json()
# Async tests
import pytest
import httpx
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_async_endpoint():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/users/1")
assert response.status_code == 200
import pytest
# ——— monkeypatch — temporarily modify objects, env vars, paths ———
def test_uses_env(monkeypatch):
monkeypatch.setenv("API_KEY", "test-key-123")
monkeypatch.delenv("SECRET", raising=False) # no error if not set
monkeypatch.setattr("myapp.services.requests.get", lambda *a, **k: FakResp())
# All changes are automatically undone after the test exits
# ——— Fixture scopes ———
@pytest.fixture(scope="session") # created once for the whole test run
def db_engine():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
@pytest.fixture(scope="function") # default — fresh for each test
def db_session(db_engine):
conn = db_engine.connect()
txn = conn.begin()
sess = Session(bind=conn)
yield sess
sess.close()
txn.rollback() # undo every test's changes
# ——— Assertions ———
def test_raises():
with pytest.raises(ZeroDivisionError, match="division by zero"):
1 / 0
def test_float(): # floating point
assert 0.1 + 0.2 == pytest.approx(0.3)
def test_approx_dict():
assert {"a": 0.1 + 0.2} == pytest.approx({"a": 0.3})
# ——— Parametrize ———
@pytest.mark.parametrize("n, expected", [(0, 0), (1, 1), (5, 120)])
def test_factorial(n, expected):
assert factorial(n) == expected
# ——— Built-in fixtures ———
def test_file(tmp_path): # tmp_path — per-test temp dir (pathlib.Path)
f = tmp_path / "out.txt"
f.write_text("hello")
assert f.read_text() == "hello"
def test_output(capsys): # capsys — capture stdout/stderr
print("hello")
out, err = capsys.readouterr()
assert out == "hello\n"
def test_log(caplog): # caplog — capture log records
import logging
with caplog.at_level(logging.WARNING):
logging.warning("watch out")
assert "watch out" in caplog.text
# ——— Async tests ——— (pip install pytest-asyncio)
import pytest
@pytest.mark.asyncio
async def test_async_endpoint():
result = await some_coroutine()
assert result == expected
# conftest.py — place shared fixtures here; auto-discovered by pytest
# Can go at any directory level; hierarchically scoped
from django.test import TestCase, Client, RequestFactory
from django.contrib.auth.models import User
from django.urls import reverse
from rest_framework.test import APITestCase, APIClient
from unittest.mock import patch, MagicMock
# TestCase — each test wrapped in a transaction, rolled back after
class BookTests(TestCase):
@classmethod
def setUpTestData(cls): # runs ONCE for the class — fast; use for read-only data
cls.alice = User.objects.create_user("alice", password="pass")
cls.author = Author.objects.create(name="Test Author")
def setUp(self): # runs before EACH test — use for mutable state
self.client.force_login(self.alice)
def test_book_list_returns_200(self):
url = reverse("book-list") # always use reverse(), not hardcoded URLs
resp = self.client.get(url)
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, "Test Author")
self.assertQuerySetEqual(
resp.context["books"],
Book.objects.all(),
transform=lambda x: x
)
def test_unauthenticated_redirect(self):
self.client.logout()
resp = self.client.get(reverse("book-list"))
self.assertRedirects(resp, f"/login/?next={reverse('book-list')}")
# DRF APITestCase
class BookAPITests(APITestCase):
def setUp(self):
self.user = User.objects.create_user("tester", password="pass")
self.client.force_authenticate(user=self.user) # skip JWT/session
def test_create_book(self):
resp = self.client.post(
reverse("book-list"),
{"title": "Test", "price": "9.99", "author": 1},
format="json"
)
self.assertEqual(resp.status_code, 201)
self.assertEqual(Book.objects.count(), 1)
def test_no_auth_returns_401(self):
self.client.force_authenticate(user=None)
self.assertEqual(self.client.get(reverse("book-list")).status_code, 401)
# RequestFactory — test views directly, bypassing middleware
class MiddlewarelessTests(TestCase):
def test_view_directly(self):
factory = RequestFactory()
request = factory.get("/")
request.user = User.objects.create_user("rf_user")
response = MyView.as_view()(request)
self.assertEqual(response.status_code, 200)
# Mocking external services
class PaymentTests(TestCase):
@patch("payments.views.stripe.PaymentIntent.create")
def test_payment_success(self, mock_stripe):
mock_stripe.return_value = MagicMock(id="pi_123", status="succeeded")
resp = self.client.post("/pay/", {"amount": 1000}, content_type="application/json")
self.assertEqual(resp.status_code, 200)
mock_stripe.assert_called_once()
args, kwargs = mock_stripe.call_args
self.assertEqual(kwargs["amount"], 1000)
import cProfile
import pstats
# cProfile — deterministic profiler
cProfile.run("my_function()", "profile_output")
stats = pstats.Stats("profile_output")
stats.sort_stats("cumulative").print_stats(10)
# line_profiler — per-line profiling (pip install line-profiler)
# @profile decorator, run: kernprof -l -v script.py
# timeit — for micro-benchmarks
import timeit
timeit.timeit("x = [i**2 for i in range(100)]", number=10000)
# memory_profiler
# pip install memory_profiler
# @profile decorator, run: python -m memory_profiler script.py
# 1. Use sets for membership tests (O(1) vs O(n) for lists)
valid_ids = {1, 2, 3, 4, 5}
if user_id in valid_ids: # O(1)
...
# 2. Use generators for large data
total = sum(x**2 for x in range(10_000_000)) # vs building entire list
# 3. Use local variables in tight loops (faster lookup than globals)
def fast_loop():
_append = results.append # cache attribute lookup
for i in range(100_000):
_append(i)
# 4. str.join instead of + for string concatenation
parts = ["Hello", " ", "World"]
result = "".join(parts) # O(n) total
# result = "" + "Hello" + " " + "World" # O(n^2) due to new string creation each time
# 5. functools.lru_cache for expensive pure functions
from functools import lru_cache
@lru_cache(maxsize=128)
def factorial(n):
if n < 2: return 1
return n * factorial(n - 1)
# 6. Use __slots__ to reduce memory for many instances
class Point:
__slots__ = ("x", "y") # no __dict__; saves ~50% memory per instance
def __init__(self, x, y):
self.x, self.y = x, y
# 7. Avoid repeated attribute lookups in loops
import math
sqrt = math.sqrt
for x in range(100_000):
result = sqrt(x) # faster than math.sqrt(x) in loop
# 8. Use numpy/pandas for numerical operations — vectorized C code
import numpy as np
a = np.arange(1_000_000)
total = a.sum() # C speed, no Python loop
# Swap variables
a, b = b, a
# Multiple assignment from iterable
first, *rest = [1, 2, 3, 4, 5] # first=1, rest=[2,3,4,5]
first, *middle, last = range(5) # first=0, middle=[1,2,3], last=4
# Ternary expression
result = "even" if n % 2 == 0 else "odd"
# Dictionary merging (Python 3.9+)
merged = {**dict1, **dict2} # old
merged = dict1 | dict2 # new
dict1 |= dict2 # in-place
# Walrus operator := (Python 3.8+)
if (n := len(data)) > 10:
print(f"Too long: {n}")
while chunk := file.read(8192):
process(chunk)
# enumerate and zip
for i, val in enumerate(items, start=1):
print(f"{i}: {val}")
for name, score in zip(names, scores):
print(f"{name}: {score}")
# Dictionary comprehension
squares = {x: x**2 for x in range(10)}
# Conditional comprehension
evens = [x for x in range(20) if x % 2 == 0]
# Flat nested list
flat = [item for sublist in nested for item in sublist]
# equivalent to: flat = list(itertools.chain.from_iterable(nested))
| Tool | Purpose | Key Commands |
|---|---|---|
| venv | Built-in virtual env |
python -m venv .venv,
source .venv/bin/activate
|
| pip | Package installer |
pip install flask,
pip freeze > requirements.txt
|
| pip-tools | Lock file management | pip-compile requirements.in |
| poetry | Dependency + packaging | poetry add flask, poetry install |
| pipenv | venv + pip combined | pipenv install flask |
| pyenv | Python version management |
pyenv install 3.12.0, pyenv local 3.12.0
|
# 1. Late binding closures
funcs = [lambda: i for i in range(5)]
print([f() for f in funcs]) # [4, 4, 4, 4, 4] — all capture the same i
# Fix: default argument captures value at definition time
funcs = [lambda i=i: i for i in range(5)]
print([f() for f in funcs]) # [0, 1, 2, 3, 4]
# 2. Mutable default argument (covered above)
# 3. Class variable shared across instances
class Dog:
tricks = [] # shared class variable!
def learn(self, trick):
self.tricks.append(trick)
rex = Dog(); rex.learn("sit")
fido = Dog(); fido.learn("paw")
print(rex.tricks) # ['sit', 'paw'] — shared!
# Fix: use instance variable
class Dog:
def __init__(self):
self.tricks = [] # instance variable
# 4. Exception variable scope (Python 3)
try:
x = 1/0
except ZeroDivisionError as e:
pass
# print(e) # NameError — e is deleted after the except block in Python 3
# 5. Integer division
print(7 / 2) # 3.5 (float division)
print(7 // 2) # 3 (floor division)
print(-7 // 2) # -4 (floor, not truncation!)
# 6. Chained comparison (Pythonic AND correct)
x = 5
print(1 < x < 10) # True (evaluated as 1 < x AND x < 10)
dis
Answer:
dis disassembles Python functions into their CPython
bytecode instructions. Useful for understanding performance,
thread-safety, and why certain patterns behave the way they do.
import dis
def add(a, b):
return a + b
dis.dis(add)
# Output (CPython 3.12):
# 2 RESUME 0
# 3 LOAD_FAST 0 (a)
# LOAD_FAST 1 (b)
# BINARY_OP 0 (+)
# RETURN_VALUE
# Why x += 1 is NOT atomic (and needs a lock):
def increment():
global counter
counter += 1
# Compiles to 3 ops: LOAD_GLOBAL, BINARY_OP, STORE_GLOBAL
# The GIL can switch threads between any two of these!
# LOAD_FAST vs LOAD_GLOBAL — local vars are faster
def slow_loop():
for _ in range(100_000):
result = math.sqrt(2) # LOAD_GLOBAL('math'), LOAD_ATTR('sqrt')
def fast_loop():
_sqrt = math.sqrt # cache in local variable
for _ in range(100_000):
result = _sqrt(2) # LOAD_FAST — ~20% faster
# Inspect code object metadata
fn = add
print(fn.__code__.co_varnames) # ('a', 'b') — local variable names
print(fn.__code__.co_consts) # (None,) — constants referenced
print(fn.__code__.co_argcount) # 2 — number of positional args
print(fn.__code__.co_flags) # bitmask: generator, async, etc.
| Letter | Principle | Short Rule | Python Tip |
|---|---|---|---|
| S | Single Responsibility | One class = one reason to change |
Separate UserService (business logic) from
UserRepository (data access)
|
| O | Open / Closed | Open for extension, closed for modification | Use ABC/Protocol + new subclasses; never edit the base to add a feature |
| L | Liskov Substitution | Subclass must be substitutable for its base |
If Shape.area() -> float, all subclasses must too;
don’t narrow return type
|
| I | Interface Segregation | Prefer small focused interfaces |
Separate Readable, Writable,
Seekable Protocols instead of one fat base
|
| D | Dependency Inversion | Depend on abstractions, not concretions |
Inject repo: Repository; don’t hardcode
SQLAlchemyRepository inside the service
|
# Dependency Inversion — inject via constructor (easiest Python pattern)
from abc import ABC, abstractmethod
class NotificationSender(ABC):
@abstractmethod
def send(self, recipient: str, message: str) -> None: ...
class EmailSender(NotificationSender):
def send(self, r, m): smtp_send(r, m)
class SMSSender(NotificationSender):
def send(self, r, m): twilio_send(r, m)
class OrderService:
def __init__(self, notifier: NotificationSender): # depends on abstraction
self.notifier = notifier
def place_order(self, order):
order.save()
self.notifier.send(order.user.email, "Order placed!")
# Testing becomes trivial:
class FakeNotifier(NotificationSender):
def __init__(self): self.sent = []
def send(self, r, m): self.sent.append((r, m))
service = OrderService(FakeNotifier())
| Question | Answer |
|---|---|
Is bool a subclass of int? |
Yes. True == 1, False == 0,
isinstance(True, int) is True.
True + True == 2.
|
Is None a singleton? |
Yes — only one NoneType instance. Always
x is None, never x == None.
|
list.sort() vs sorted() |
sort() in-place, returns None;
sorted() returns a new list. Both stable (Timsort,
O(n log n)).
|
| How to reverse a list efficiently? |
lst.reverse() in-place O(n), returns None.
lst[::-1] creates a new reversed list.
|
| Is dict ordering guaranteed? | Yes, insertion order since Python 3.7. |
| Can you modify a list while iterating? |
Don’t — leads to skipped elements. Iterate a copy:
for x in list(lst): or use comprehension.
|
zip with unequal lengths? |
Stops at the shortest. Use
itertools.zip_longest(fill_value=None) to continue.
|
__repr__ vs __str__ |
__repr__: unambiguous, for devs (REPL,
repr()). __str__: human-readable
(print(), str()). Falls back to
__repr__ if __str__ missing.
|
What is StopIteration? |
Raised by __next__ when exhausted. Caught internally
by for loops and next(it, default).
|
Why use @functools.wraps? |
Copies __name__, __doc__,
__module__, __annotations__ from wrapped
func to wrapper — preserves introspection and testing tools.
|
| Multiple return values? |
Returns a tuple: return a, b is sugar for
return (a, b).
|
What is __all__? |
List of names exported when from module import *. Has
no effect on direct imports.
|
| GIL released during I/O? |
Yes — GIL is released during I/O syscalls,
time.sleep, and many C extensions (NumPy, etc.).
|
| Python 3.13 GIL status? |
PEP 703 landed in 3.13 as an opt-in build flag
(--disable-gil, aka free-threaded mode). Not the
default.
|
Difference between == and is for
strings?
|
== checks value; is checks identity.
Short identifier-like strings are interned (cached), so
is may return True — but this is an
implementation detail; never rely on it.
|
What does object.__init_subclass__ do? |
Called when a class is subclassed. Used to auto-register subclasses, enforce constraints (like abstract methods), or modify subclass during creation — a lighter alternative to metaclasses. |
| What’s a classmethod vs staticmethod? |
@classmethod receives cls (can create
instances, access class state).
@staticmethod receives nothing (pure utility, no
coupling to class/instance).
|
Document: Python / Flask / FastAPI / Django —
Interview Questions
Prepared for: SDE Interview Preparation
Last Updated: February 2026