Welcome to Python Programming for Data Engineering by HitaVir Tech!
This hands-on codelab takes you from zero Python knowledge to building real data pipelines. Every concept is taught through the lens of Data Engineering — you will not learn Python in a vacuum, you will learn it the way data engineers actually use it.
A complete Sales Data ETL Pipeline for HitaVir Tech:
Raw CSV Data → Extract → Clean → Transform → Aggregate → Report
│ │ │ │ │ │
sales_raw.csv Load file Fix nulls Add cols Summarize output.csv
Skill | Level |
Python syntax, variables, types | Beginner |
Control flow (if/else, loops) | Beginner |
Functions and modular code | Beginner |
Data structures (lists, dicts) | Beginner |
File I/O (CSV, JSON, text) | Intermediate |
Error handling and logging | Intermediate |
pandas data manipulation | Intermediate |
ETL pipeline development | Intermediate |
Industry best practices | Intermediate |
Python is the number one language in Data Engineering because:
5-7 hours (go at your own pace — every section builds on the previous one)
HitaVir Tech says: "Python is not just a programming language — it is the glue that holds modern data platforms together. Master it, and every data tool becomes accessible to you."
This codelab assumes zero Python experience. We start from installing Python itself.
HitaVir Tech says: "If you can use a calculator and save a file, you are ready to learn Python. Everything else, we teach you step by step."
Let us set up a professional Python development environment on Windows.
Go to https://www.python.org/downloads/ and download the latest Python 3.x installer.
CRITICAL during installation:
If you miss the PATH checkbox, Python commands will not work in your terminal.
Open Git Bash (or Command Prompt) and run:
python --version
Expected output:
Python 3.12.3
Also verify pip (Python's package manager):
pip --version
Expected output:
pip 24.0 from ... (python 3.12)
Download from https://code.visualstudio.com/ and install with default settings.
Ctrl + Shift + X (Extensions panel)mkdir -p ~/python-de-learning
cd ~/python-de-learning
A virtual environment keeps your project's packages separate from other projects:
python -m venv venv
Activate it:
Git Bash:
source venv/Scripts/activate
Command Prompt:
venv\Scripts\activate
You should see (venv) at the beginning of your prompt:
(venv) user@COMPUTER ~/python-de-learning
$
pip install pandas numpy requests
Save your dependencies:
pip freeze > requirements.txt
cat requirements.txt
Create a test file:
cat > test_setup.py << 'EOF'
import sys
import pandas as pd
import numpy as np
print(f"Python version: {sys.version}")
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
print("\nHitaVir Tech - Setup Complete!")
print("You are ready to learn Python for Data Engineering!")
EOF
Run it:
python test_setup.py
Expected output:
Python version: 3.12.3 (tags/v3.12.3:...)
pandas version: 2.2.1
numpy version: 1.26.4
HitaVir Tech - Setup Complete!
You are ready to learn Python for Data Engineering!
python-de-learning/
├── venv/ ← Virtual environment (do not edit)
├── requirements.txt ← Package list
└── test_setup.py ← Setup verification
HitaVir Tech says: "Always use virtual environments. In the real world, different projects need different package versions. Virtual environments prevent them from conflicting."
Problem | Fix |
| Reinstall Python with "Add to PATH" checked |
| Run |
Wrong Python version | Use |
| Use the correct command for your terminal |
Let us write real Python code. Create a new file for each section.
A variable is a name that points to a value. Think of it as a labeled box.
cat > basics_variables.py << 'PYEOF'
# ============================================
# HitaVir Tech - Python Variables
# ============================================
# Strings — text data (names, cities, log messages)
pipeline_name = "HitaVir Sales ETL"
data_source = "postgres"
status = "running"
print(f"Pipeline: {pipeline_name}")
print(f"Source: {data_source}")
print(f"Status: {status}")
# Integers — whole numbers (counts, IDs, ports)
total_records = 15000
batch_size = 500
port = 5432
print(f"\nTotal records: {total_records}")
print(f"Batch size: {batch_size}")
print(f"Database port: {port}")
# Floats — decimal numbers (percentages, measurements)
success_rate = 99.7
processing_time = 3.45
data_quality_score = 0.95
print(f"\nSuccess rate: {success_rate}%")
print(f"Processing time: {processing_time}s")
print(f"Quality score: {data_quality_score}")
# Booleans — True/False (flags, conditions)
is_production = True
has_errors = False
pipeline_active = True
print(f"\nProduction mode: {is_production}")
print(f"Has errors: {has_errors}")
print(f"Pipeline active: {pipeline_active}")
# None — represents "no value" (null in databases)
last_error = None
print(f"\nLast error: {last_error}")
PYEOF
Run it:
python basics_variables.py
Expected output:
Pipeline: HitaVir Sales ETL
Source: postgres
Status: running
Total records: 15000
Batch size: 500
Database port: 5432
Success rate: 99.7%
Processing time: 3.45s
Quality score: 0.95
Production mode: True
Has errors: False
Pipeline active: True
Last error: None
Type | Example | Data Engineering Use |
|
| Column names, log messages, file paths |
|
| Row counts, IDs, batch sizes |
|
| Metrics, percentages, measurements |
|
| Flags, conditions, validations |
|
| Missing values, null handling |
cat > basics_types.py << 'PYEOF'
# ============================================
# HitaVir Tech - Type Checking & Conversion
# ============================================
# Check types
record_count = 1500
pipeline_name = "ETL Pipeline"
is_active = True
print(f"type of record_count: {type(record_count)}") # <class 'int'>
print(f"type of pipeline_name: {type(pipeline_name)}") # <class 'str'>
print(f"type of is_active: {type(is_active)}") # <class 'bool'>
# Type conversion (common in data pipelines)
# String to integer (e.g., reading CSV data)
raw_value = "2500"
numeric_value = int(raw_value)
print(f"\nConverted '{raw_value}' to integer: {numeric_value}")
# Integer to string (e.g., building log messages)
count = 1500
message = "Processed " + str(count) + " records"
print(message)
# String to float (e.g., parsing decimal data)
price_str = "29.99"
price = float(price_str)
print(f"Price: ${price}")
# f-strings — the best way to format strings in Python
name = "HitaVir Tech"
records = 5000
time_taken = 2.3
print(f"\n{name} processed {records} records in {time_taken}s")
PYEOF
python basics_types.py
cat > basics_operators.py << 'PYEOF'
# ============================================
# HitaVir Tech - Operators
# ============================================
# Arithmetic
total = 1000 + 500 # Addition
remaining = 1000 - 300 # Subtraction
total_size = 500 * 3 # Multiplication
avg = 1500 / 3 # Division (returns float)
batches = 1500 // 500 # Floor division (returns int)
leftover = 1500 % 500 # Modulo (remainder)
squared = 2 ** 10 # Exponentiation
print(f"Total: {total}")
print(f"Remaining: {remaining}")
print(f"Total size: {total_size}")
print(f"Average: {avg}")
print(f"Batches needed: {batches}")
print(f"Leftover records: {leftover}")
print(f"2^10 = {squared}")
# Comparison (used in data validation)
row_count = 1500
threshold = 1000
print(f"\nRow count > threshold: {row_count > threshold}") # True
print(f"Row count == 1500: {row_count == 1500}") # True
print(f"Row count != 0: {row_count != 0}") # True
# Logical (used in pipeline conditions)
has_data = True
is_valid = True
has_errors = False
print(f"\nReady to process: {has_data and is_valid}") # True
print(f"Any issues: {has_errors or not is_valid}") # False
print(f"No errors: {not has_errors}") # True
PYEOF
python basics_operators.py
cat > basics_input.py << 'PYEOF'
# ============================================
# HitaVir Tech - User Input
# ============================================
# input() always returns a string
name = input("Enter your name: ")
batch = input("Enter your batch number: ")
batch_num = int(batch) # Convert to integer
print(f"\nWelcome to HitaVir Tech, {name}!")
print(f"You are in Batch {batch_num}")
print(f"Let's learn Python for Data Engineering!")
PYEOF
python basics_input.py
HitaVir Tech says: "In data engineering, you rarely use input(). Instead, you read from files, databases, and APIs. But understanding input/output flow is fundamental to programming."
Create a file called exercise_basics.py that calculates pipeline statistics:
# Calculate: If a pipeline processes 50,000 records per hour,
# how many records in 8 hours? What is the per-minute rate?
# Print results using f-strings.
Data pipelines constantly make decisions: Is the data valid? Should we retry? Which path to take?
cat > control_if.py << 'PYEOF'
# ============================================
# HitaVir Tech - Control Flow: if-else
# ============================================
# --- Data Quality Check ---
null_percentage = 0.03 # 3% nulls
threshold = 0.05 # 5% max allowed
if null_percentage <= threshold:
print("PASS: Data quality check passed")
print(f" Null rate: {null_percentage:.1%} (threshold: {threshold:.1%})")
else:
print("FAIL: Data quality check failed")
print(f" Null rate: {null_percentage:.1%} exceeds threshold: {threshold:.1%}")
# --- Pipeline Status Router ---
print("\n--- Pipeline Status Router ---")
status_code = 200
if status_code == 200:
print("SUCCESS: Pipeline completed normally")
elif status_code == 202:
print("ACCEPTED: Pipeline queued for processing")
elif status_code == 400:
print("ERROR: Bad request - check input data")
elif status_code == 500:
print("CRITICAL: Server error - alert on-call engineer")
else:
print(f"UNKNOWN: Unexpected status code {status_code}")
# --- Environment Selector ---
print("\n--- Environment Selector ---")
env = "production"
if env == "development":
db_host = "localhost"
log_level = "DEBUG"
elif env == "staging":
db_host = "staging-db.hitavir.tech"
log_level = "INFO"
elif env == "production":
db_host = "prod-db.hitavir.tech"
log_level = "WARNING"
else:
db_host = "localhost"
log_level = "DEBUG"
print(f"Environment: {env}")
print(f"Database: {db_host}")
print(f"Log level: {log_level}")
# --- Data Type Validator ---
print("\n--- Data Type Validator ---")
value = "12345"
if value.isdigit():
print(f"'{value}' is a valid integer")
converted = int(value)
elif value.replace(".", "", 1).isdigit():
print(f"'{value}' is a valid float")
converted = float(value)
else:
print(f"'{value}' is a string (non-numeric)")
PYEOF
python control_if.py
cat > control_loops.py << 'PYEOF'
# ============================================
# HitaVir Tech - Control Flow: Loops
# ============================================
# --- for loop: Process a batch of records ---
print("--- Processing Transaction Batch ---")
transactions = [150.00, 230.50, 45.99, 1200.00, 89.95, 567.25]
total = 0
for i, amount in enumerate(transactions, 1):
total += amount
print(f" Transaction {i}: ${amount:>10.2f} | Running total: ${total:>10.2f}")
print(f"\n Batch total: ${total:.2f}")
print(f" Average: ${total / len(transactions):.2f}")
# --- for loop with range: Batch processing ---
print("\n--- Batch Processing Simulation ---")
total_records = 1500
batch_size = 500
for batch_num in range(0, total_records, batch_size):
end = min(batch_num + batch_size, total_records)
print(f" Processing records {batch_num + 1} to {end}...")
print(" All batches processed!")
# --- while loop: Retry logic ---
print("\n--- Connection Retry Logic ---")
max_retries = 3
attempt = 0
connected = False
while attempt < max_retries and not connected:
attempt += 1
print(f" Attempt {attempt}/{max_retries}: Connecting to database...")
if attempt == 3:
connected = True
print(" Connected successfully!")
if not connected:
print(" FAILED: Could not connect after max retries")
# --- break and continue ---
print("\n--- Data Filtering with break/continue ---")
records = [
{"id": 1, "name": "Alice", "status": "active"},
{"id": 2, "name": "Bob", "status": "inactive"},
{"id": 3, "name": "Charlie", "status": "active"},
{"id": 4, "name": "STOP", "status": "signal"},
{"id": 5, "name": "Diana", "status": "active"},
]
active_users = []
for record in records:
if record["name"] == "STOP":
print(f" Stop signal received at record {record['id']}")
break
if record["status"] != "active":
print(f" Skipping inactive user: {record['name']}")
continue
active_users.append(record["name"])
print(f" Added active user: {record['name']}")
print(f"\n Active users found: {active_users}")
PYEOF
python control_loops.py
HitaVir Tech says: "In data engineering, loops process records, retry failed connections, and iterate through batches. The for loop is your workhorse. The while loop is your retry mechanism. Master both."
Functions let you write code once and use it everywhere. In data engineering, functions are the building blocks of every pipeline. This section covers every type of function you need to know.
Type | Description | Example |
Basic function | No parameters, no return |
|
Parameterized function | Accepts inputs |
|
Function with return | Sends back a result |
|
Default parameters | Parameters with fallback values |
|
Keyword arguments | Call by name |
|
| Variable number of positional args |
|
| Variable number of keyword args |
|
Lambda function | One-line anonymous function |
|
Nested function | Function inside a function |
|
Recursive function | Function that calls itself |
|
Generator function | Yields values lazily |
|
Decorator function | Wraps another function |
|
Let us learn each one with real Data Engineering examples.
cat > func_01_basics.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 1: Basic Functions
# ============================================
# --- 1A. Function with no parameters, no return ---
def print_pipeline_header():
"""Print a standard header for pipeline output."""
print("=" * 50)
print(" HitaVir Tech - Data Pipeline")
print("=" * 50)
print_pipeline_header()
# --- 1B. Function with parameters ---
def greet_engineer(name):
"""Greet a data engineer by name."""
print(f"Welcome to HitaVir Tech, {name}!")
greet_engineer("Alice")
greet_engineer("Bob")
# --- 1C. Function with return value ---
def calculate_total(price, quantity):
"""Calculate total amount for an order."""
total = price * quantity
return total
result = calculate_total(999.99, 3)
print(f"\nOrder total: ${result:,.2f}")
# --- 1D. Function returning multiple values ---
def get_pipeline_stats(total, failed):
"""Return multiple statistics from a pipeline run."""
success = total - failed
rate = (success / total) * 100
return success, failed, rate # Returns a tuple
s, f, r = get_pipeline_stats(10000, 23)
print(f"\nPipeline: {s} success, {f} failed, {r:.1f}% rate")
# --- 1E. Function returning a dictionary ---
def build_record(id, name, amount):
"""Build a standardized data record."""
return {
"id": id,
"name": name.strip().title(),
"amount": round(amount, 2),
"status": "valid" if amount > 0 else "invalid"
}
record = build_record(1, " alice johnson ", 150.50)
print(f"\nRecord: {record}")
PYEOF
python func_01_basics.py
cat > func_02_defaults_kwargs.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 2: Defaults & Keyword Args
# ============================================
# --- 2A. Default parameters ---
# Parameters with = have default values (used if not provided)
def connect_database(host, port=5432, database="hitavir_db", timeout=30):
"""Connect to database with sensible defaults."""
print(f"Connecting to {database} at {host}:{port} (timeout: {timeout}s)")
return True
print("--- Default Parameters ---")
# All defaults used:
connect_database("prod-db.hitavir.tech")
# Override some defaults:
connect_database("staging-db.hitavir.tech", port=5433, database="staging_db")
# Override all:
connect_database("dev-db.hitavir.tech", 3306, "dev_db", 10)
# --- 2B. Keyword arguments (call by name) ---
# You can pass arguments BY NAME in any order
def create_pipeline(name, source, destination, schedule="daily", retries=3):
"""Create a pipeline configuration."""
print(f"\n Pipeline: {name}")
print(f" Source: {source} → Destination: {destination}")
print(f" Schedule: {schedule} | Retries: {retries}")
print("\n--- Keyword Arguments ---")
# Positional (order matters):
create_pipeline("ETL-1", "postgres", "s3")
# Keyword (order does NOT matter):
create_pipeline(
destination="bigquery",
name="ETL-2",
source="mysql",
schedule="hourly",
retries=5
)
# Mix positional + keyword (positional must come first):
create_pipeline("ETL-3", "api", "redshift", retries=10)
# --- 2C. Mutable default argument trap ---
# WRONG — mutable default is shared across all calls:
def bad_append(item, lst=[]):
lst.append(item)
return lst
# CORRECT — use None and create inside:
def good_append(item, lst=None):
if lst is None:
lst = []
lst.append(item)
return lst
print("\n--- Mutable Default Trap ---")
print(f"Bad call 1: {bad_append('a')}") # ['a']
print(f"Bad call 2: {bad_append('b')}") # ['a', 'b'] — BUG! List is shared!
print(f"Good call 1: {good_append('a')}") # ['a']
print(f"Good call 2: {good_append('b')}") # ['b'] — Correct! Fresh list each time
PYEOF
python func_02_defaults_kwargs.py
HitaVir Tech says: "Default parameters are incredibly common in data engineering. Database connections, API timeouts, retry counts — they all have sensible defaults that you override when needed."
cat > func_03_args.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 3: *args
# ============================================
# *args lets a function accept ANY NUMBER of positional arguments.
# Inside the function, args is a TUPLE.
# --- 3A. Basic *args ---
def sum_all(*numbers):
"""Sum any number of values."""
print(f" Received {len(numbers)} args: {numbers}")
return sum(numbers)
print("--- *args Basics ---")
print(f"Sum of 1,2,3: {sum_all(1, 2, 3)}")
print(f"Sum of 10,20,30,40,50: {sum_all(10, 20, 30, 40, 50)}")
print(f"Sum of single: {sum_all(100)}")
print(f"Sum of nothing: {sum_all()}")
# --- 3B. *args in data engineering: Process multiple files ---
def process_files(*filepaths):
"""Process any number of data files."""
print(f"\n--- Processing {len(filepaths)} files ---")
results = []
for i, filepath in enumerate(filepaths, 1):
result = {"file": filepath, "status": "processed", "rows": i * 100}
results.append(result)
print(f" [{i}] {filepath} → {result['rows']} rows")
return results
process_files("sales_jan.csv", "sales_feb.csv", "sales_mar.csv")
process_files("users.csv") # Works with any count
# --- 3C. Combining regular params + *args ---
def run_pipeline(pipeline_name, *tables):
"""Run a pipeline on one or more tables."""
print(f"\n--- Pipeline: {pipeline_name} ---")
print(f" Tables to process: {list(tables)}")
for table in tables:
print(f" Processing table: {table}")
run_pipeline("Daily ETL", "users", "orders", "products", "payments")
run_pipeline("Hourly Sync", "events")
# --- 3D. Unpacking a list into *args ---
quarterly_files = [
"q1_sales.csv",
"q2_sales.csv",
"q3_sales.csv",
"q4_sales.csv"
]
# The * unpacks the list into separate arguments:
process_files(*quarterly_files)
# Without * it would pass the entire list as ONE argument
PYEOF
python func_03_args.py
cat > func_04_kwargs.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 4: **kwargs
# ============================================
# **kwargs lets a function accept ANY NUMBER of keyword arguments.
# Inside the function, kwargs is a DICTIONARY.
# --- 4A. Basic **kwargs ---
def print_config(**settings):
"""Print any configuration key-value pairs."""
print(f" Received {len(settings)} settings:")
for key, value in settings.items():
print(f" {key} = {value}")
print("--- **kwargs Basics ---")
print_config(host="localhost", port=5432, database="hitavir_db")
print()
print_config(name="ETL Pipeline", version="2.0", author="HitaVir Tech", debug=True)
# --- 4B. **kwargs for flexible database connection ---
def connect(**connection_params):
"""Connect to any database with flexible parameters."""
db_type = connection_params.get("type", "postgres")
host = connection_params.get("host", "localhost")
port = connection_params.get("port", 5432)
database = connection_params.get("database", "default")
username = connection_params.get("username", "admin")
ssl = connection_params.get("ssl", False)
print(f"\n Connecting: {db_type}://{username}@{host}:{port}/{database}")
print(f" SSL: {'enabled' if ssl else 'disabled'}")
return True
print("\n--- Flexible Database Connection ---")
connect(type="postgres", host="prod-db.hitavir.tech", database="analytics", ssl=True)
connect(type="mysql", host="mysql.hitavir.tech", port=3306, database="reporting")
connect() # Uses all defaults
# --- 4C. Combining regular params, *args, and **kwargs ---
def execute_query(query, *params, **options):
"""Execute a database query with parameters and options.
Args:
query (str): SQL query string
*params: Query parameters (for parameterized queries)
**options: Execution options (timeout, retries, etc.)
"""
timeout = options.get("timeout", 30)
retries = options.get("retries", 3)
log_query = options.get("log", True)
print(f"\n Query: {query}")
print(f" Params: {params}")
print(f" Timeout: {timeout}s | Retries: {retries} | Logging: {log_query}")
print("\n--- Combined: regular + *args + **kwargs ---")
execute_query(
"SELECT * FROM users WHERE region = %s AND status = %s",
"North", "active", # *args — query params
timeout=60, retries=5, log=True # **kwargs — options
)
execute_query("SELECT COUNT(*) FROM orders") # No args or kwargs
# --- 4D. Unpacking a dictionary into **kwargs ---
prod_config = {
"type": "postgres",
"host": "prod-db.hitavir.tech",
"port": 5432,
"database": "hitavir_prod",
"username": "etl_service",
"ssl": True
}
# The ** unpacks the dict into keyword arguments:
connect(**prod_config)
# --- 4E. Building flexible pipeline config ---
def create_pipeline_config(name, source, destination, **overrides):
"""Create a pipeline config with default values and optional overrides."""
config = {
"name": name,
"source": source,
"destination": destination,
"batch_size": 1000,
"retries": 3,
"timeout": 300,
"log_level": "INFO",
"notify_on_failure": True
}
# Override any defaults with provided kwargs
config.update(overrides)
return config
print("\n--- Flexible Pipeline Config ---")
config1 = create_pipeline_config("Sales ETL", "postgres", "s3")
config2 = create_pipeline_config(
"Real-time Events", "kafka", "bigquery",
batch_size=100, timeout=30, log_level="DEBUG"
)
print("Config 1 (defaults):")
for k, v in config1.items():
print(f" {k}: {v}")
print("\nConfig 2 (with overrides):")
for k, v in config2.items():
print(f" {k}: {v}")
PYEOF
python func_04_kwargs.py
HitaVir Tech says: "*args and **kwargs are the backbone of flexible Python code. Every major framework uses them — Django, Flask, pandas, Spark. When you see **options or *args in library docs, you now know exactly what they mean."
cat > func_05_advanced.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 5: Advanced Function Types
# ============================================
# --- 5A. Lambda Functions (Anonymous Functions) ---
print("=" * 50)
print("LAMBDA FUNCTIONS")
print("=" * 50)
# Regular function:
def double(x):
return x * 2
# Lambda equivalent (one-line anonymous function):
double_lambda = lambda x: x * 2
print(f"Regular: {double(5)}") # 10
print(f"Lambda: {double_lambda(5)}") # 10
# Lambdas are most useful with sort, map, filter:
sales = [
{"product": "Laptop", "revenue": 4999.95},
{"product": "Mouse", "revenue": 149.95},
{"product": "Monitor", "revenue": 899.98},
{"product": "Keyboard", "revenue": 239.97},
]
# Sort by revenue using lambda as the key function:
sorted_sales = sorted(sales, key=lambda x: x["revenue"], reverse=True)
print("\nSales by revenue (descending):")
for s in sorted_sales:
print(f" {s['product']:>10}: ${s['revenue']:>10,.2f}")
# filter() — keep only items matching a condition:
big_sales = list(filter(lambda x: x["revenue"] > 500, sales))
print(f"\nBig sales (>$500): {[s['product'] for s in big_sales]}")
# map() — apply a function to every item:
prices = [100, 200, 300, 400, 500]
with_tax = list(map(lambda p: round(p * 1.18, 2), prices))
print(f"\nPrices with 18% tax: {with_tax}")
# --- 5B. Nested Functions (Inner Functions) ---
print(f"\n{'=' * 50}")
print("NESTED FUNCTIONS")
print("=" * 50)
def create_data_cleaner(null_replacement="N/A", trim=True):
"""Create a customized data cleaning function."""
def clean(value):
"""Inner function that does the actual cleaning."""
if value is None or value == "":
return null_replacement
if trim and isinstance(value, str):
return value.strip().title()
return value
return clean # Return the inner function
# Create specialized cleaners:
name_cleaner = create_data_cleaner(null_replacement="Unknown")
code_cleaner = create_data_cleaner(null_replacement="NONE", trim=False)
raw_names = [" alice ", None, "BOB SMITH", "", " charlie "]
cleaned = [name_cleaner(n) for n in raw_names]
print(f"Raw: {raw_names}")
print(f"Cleaned: {cleaned}")
# --- 5C. Recursive Functions ---
print(f"\n{'=' * 50}")
print("RECURSIVE FUNCTIONS")
print("=" * 50)
def flatten_nested_data(data, prefix=""):
"""Flatten a nested dictionary (common in JSON/API data)."""
flat = {}
for key, value in data.items():
full_key = f"{prefix}{key}" if not prefix else f"{prefix}.{key}"
if isinstance(value, dict):
flat.update(flatten_nested_data(value, full_key)) # Recursive call
else:
flat[full_key] = value
return flat
# Nested API response:
api_response = {
"user": {
"name": "Alice",
"address": {
"city": "Bangalore",
"state": "Karnataka",
"country": "India"
},
"scores": {
"python": 95,
"sql": 88
}
},
"status": "active"
}
flat = flatten_nested_data(api_response)
print("Flattened JSON:")
for key, value in flat.items():
print(f" {key}: {value}")
# --- 5D. Generator Functions (yield) ---
print(f"\n{'=' * 50}")
print("GENERATOR FUNCTIONS")
print("=" * 50)
def read_in_batches(data, batch_size):
"""Yield data in batches — memory efficient for large datasets."""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
yield batch # yield pauses and returns one batch at a time
all_records = list(range(1, 16)) # 15 records
print(f"Total records: {all_records}")
for batch_num, batch in enumerate(read_in_batches(all_records, batch_size=4), 1):
print(f" Batch {batch_num}: {batch}")
# Generators are memory-efficient — they do NOT load all data at once
# Perfect for processing millions of rows from databases or files
# --- 5E. Decorator Functions ---
print(f"\n{'=' * 50}")
print("DECORATOR FUNCTIONS")
print("=" * 50)
import time
def timer(func):
"""Decorator that measures how long a function takes."""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f" [{func.__name__}] completed in {duration:.4f}s")
return result
return wrapper
def retry(max_attempts=3):
"""Decorator that retries a function on failure."""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
print(f" [{func.__name__}] Attempt {attempt} failed: {e}")
if attempt == max_attempts:
raise
return wrapper
return decorator
@timer
def process_data(records):
"""Simulate processing data."""
total = sum(records)
return total
@timer
def slow_query():
"""Simulate a slow database query."""
time.sleep(0.1)
return "query result"
result = process_data([1, 2, 3, 4, 5])
print(f" Result: {result}")
result = slow_query()
print(f" Result: {result}")
PYEOF
python func_05_advanced.py
cat > func_06_pipeline.py << 'PYEOF'
# ============================================
# HitaVir Tech - Part 6: Complete Pipeline Example
# Using all function types together
# ============================================
import time
# --- Decorator for logging ---
def log_step(func):
"""Decorator: log the start and end of each pipeline step."""
def wrapper(*args, **kwargs):
print(f"\n[START] {func.__name__}")
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
print(f"[DONE] {func.__name__} ({elapsed:.3f}s)")
return result
return wrapper
# --- Extract: uses default params ---
@log_step
def extract(source, limit=None):
"""Extract data from source."""
data = [
{"id": 1, "product": "Laptop", "price": 999.99, "quantity": 5},
{"id": 2, "product": "Mouse", "price": 29.99, "quantity": 100},
{"id": 3, "product": "Keyboard", "price": 79.99, "quantity": 50},
{"id": 4, "product": "Monitor", "price": 449.99, "quantity": 10},
{"id": 5, "product": "Headphones", "price": 149.99, "quantity": 30},
]
if limit:
data = data[:limit]
print(f" Loaded {len(data)} records from {source}")
return data
# --- Transform: uses *args for multiple transform steps ---
@log_step
def transform(records, *steps):
"""Apply multiple transform steps to records."""
for step_func in steps:
records = step_func(records)
return records
# Individual transform functions (passed as *args):
def add_total(records):
"""Add total column."""
for r in records:
r["total"] = round(r["price"] * r["quantity"], 2)
print(f" Added 'total' column to {len(records)} records")
return records
def add_category(records):
"""Add price category."""
for r in records:
r["category"] = "Premium" if r["price"] >= 200 else "Standard"
print(f" Added 'category' column to {len(records)} records")
return records
def filter_valid(records):
"""Keep only records with positive quantity."""
valid = [r for r in records if r["quantity"] > 0]
print(f" Filtered: {len(valid)}/{len(records)} records valid")
return valid
# --- Load: uses **kwargs for flexible options ---
@log_step
def load(records, destination, **options):
"""Load records to destination with flexible options."""
format_type = options.get("format", "csv")
compress = options.get("compress", False)
partition_by = options.get("partition_by", None)
print(f" Destination: {destination}")
print(f" Format: {format_type} | Compressed: {compress}")
if partition_by:
print(f" Partitioned by: {partition_by}")
print(f" Loaded {len(records)} records")
return True
# --- Run the pipeline ---
print("=" * 50)
print(" HitaVir Tech - Complete Pipeline")
print("=" * 50)
# Extract with defaults
data = extract("postgres://hitavir-db/sales")
# Transform with multiple *args steps
data = transform(data, add_total, add_category, filter_valid)
# Load with **kwargs options
load(
data,
"s3://hitavir-warehouse/output/",
format="parquet",
compress=True,
partition_by="category"
)
# Display results
print(f"\n--- Final Data ---")
for r in data:
print(f" {r['product']:>12} | ${r['total']:>10,.2f} | {r['category']}")
print(f"\nPipeline complete!")
PYEOF
python func_06_pipeline.py
Type | Syntax | When to Use |
Basic |
| Simple, single-purpose tasks |
With params |
| Configurable behavior |
With defaults |
| Sensible fallbacks |
|
| Unknown number of inputs (file lists, tables) |
|
| Flexible config options (DB connections, settings) |
Combined |
| Maximum flexibility (frameworks, libraries) |
Lambda |
| Quick inline transformations (sort keys, map/filter) |
Nested |
| Factory pattern, closures (custom cleaners) |
Recursive |
| Tree structures, nested JSON flattening |
Generator |
| Memory-efficient batch processing |
Decorator |
| Cross-cutting concerns (logging, timing, retry) |
When combining all argument types, they must appear in this order:
def func(regular, default=val, *args, keyword_only, **kwargs):
pass
# Example:
def pipeline(name, mode="batch", *sources, notify=True, **options):
pass
HitaVir Tech says: "Functions are the atoms of programming — everything is built from them. Every data pipeline is just extract(), transform(), load(). Every API endpoint is a function. Every automation script is a collection of functions. Master functions and you master Python."
Data structures are how you organize data in memory. Data engineers use them constantly.
cat > data_structures.py << 'PYEOF'
# ============================================
# HitaVir Tech - Data Structures
# ============================================
# ====== LISTS ======
# Ordered, mutable collection — like rows in a table
print("=" * 50)
print("LISTS — Ordered Collections")
print("=" * 50)
# List of database tables to process
tables = ["users", "orders", "products", "payments"]
print(f"Tables to process: {tables}")
print(f"First table: {tables[0]}")
print(f"Last table: {tables[-1]}")
print(f"Number of tables: {len(tables)}")
# Add and remove
tables.append("logs")
print(f"After append: {tables}")
tables.remove("logs")
print(f"After remove: {tables}")
# Slicing
print(f"First two: {tables[:2]}")
print(f"Last two: {tables[-2:]}")
# List of numbers — common in data processing
scores = [85, 92, 78, 95, 88, 76, 91]
print(f"\nScores: {scores}")
print(f"Average: {sum(scores) / len(scores):.1f}")
print(f"Max: {max(scores)}")
print(f"Min: {min(scores)}")
print(f"Sorted: {sorted(scores)}")
# ====== TUPLES ======
# Ordered, immutable — like a fixed record
print(f"\n{'=' * 50}")
print("TUPLES — Immutable Records")
print("=" * 50)
# Database connection config (should not change)
db_config = ("prod-db.hitavir.tech", 5432, "hitavir_prod")
host, port, database = db_config # Tuple unpacking
print(f"Host: {host}")
print(f"Port: {port}")
print(f"Database: {database}")
# Coordinates, pairs, fixed data
column_mapping = [
("first_name", "fname"),
("last_name", "lname"),
("email_address", "email"),
]
print("\nColumn mapping:")
for old_name, new_name in column_mapping:
print(f" {old_name} → {new_name}")
# ====== DICTIONARIES ======
# Key-value pairs — like a row in a database
print(f"\n{'=' * 50}")
print("DICTIONARIES — Key-Value Data")
print("=" * 50)
# A single record (like one row from a database)
employee = {
"id": 101,
"name": "Priya Sharma",
"department": "Data Engineering",
"salary": 85000,
"skills": ["Python", "SQL", "Spark"],
"is_active": True
}
print(f"Name: {employee['name']}")
print(f"Department: {employee['department']}")
print(f"Skills: {', '.join(employee['skills'])}")
# Safe access with .get()
print(f"Manager: {employee.get('manager', 'Not assigned')}")
# Iterate over dictionary
print("\nEmployee record:")
for key, value in employee.items():
print(f" {key}: {value}")
# Pipeline configuration (common in real projects)
pipeline_config = {
"name": "sales_etl",
"source": {
"type": "postgres",
"host": "db.hitavir.tech",
"port": 5432
},
"destination": {
"type": "s3",
"bucket": "hitavir-warehouse"
},
"schedule": "0 6 * * *",
"retry_count": 3
}
print(f"\nPipeline: {pipeline_config['name']}")
print(f"Source: {pipeline_config['source']['type']}://{pipeline_config['source']['host']}")
print(f"Destination: {pipeline_config['destination']['type']}://{pipeline_config['destination']['bucket']}")
# ====== SETS ======
# Unique values only — great for deduplication
print(f"\n{'=' * 50}")
print("SETS — Unique Values")
print("=" * 50)
raw_emails = [
"alice@hitavir.tech", "bob@hitavir.tech",
"alice@hitavir.tech", "charlie@hitavir.tech",
"bob@hitavir.tech", "diana@hitavir.tech"
]
unique_emails = set(raw_emails)
print(f"Raw count: {len(raw_emails)}")
print(f"Unique count: {len(unique_emails)}")
print(f"Duplicates removed: {len(raw_emails) - len(unique_emails)}")
# Set operations — comparing datasets
db_users = {"alice", "bob", "charlie", "diana"}
api_users = {"charlie", "diana", "eve", "frank"}
print(f"\nIn DB only: {db_users - api_users}")
print(f"In API only: {api_users - db_users}")
print(f"In both: {db_users & api_users}")
print(f"In either: {db_users | api_users}")
# ====== LIST OF DICTIONARIES ======
# The most common data structure in data engineering
print(f"\n{'=' * 50}")
print("LIST OF DICTS — The Data Engineering Standard")
print("=" * 50)
sales_data = [
{"date": "2026-04-01", "product": "Laptop", "amount": 999.99, "region": "North"},
{"date": "2026-04-01", "product": "Mouse", "amount": 29.99, "region": "South"},
{"date": "2026-04-02", "product": "Keyboard", "amount": 79.99, "region": "North"},
{"date": "2026-04-02", "product": "Monitor", "amount": 449.99, "region": "East"},
{"date": "2026-04-03", "product": "Laptop", "amount": 999.99, "region": "West"},
]
# Filter: sales above $100
big_sales = [s for s in sales_data if s["amount"] > 100]
print(f"Sales > $100: {len(big_sales)}")
# Calculate total
total_revenue = sum(s["amount"] for s in sales_data)
print(f"Total revenue: ${total_revenue:,.2f}")
# Group by region
from collections import defaultdict
by_region = defaultdict(float)
for sale in sales_data:
by_region[sale["region"]] += sale["amount"]
print("\nRevenue by region:")
for region, total in sorted(by_region.items()):
print(f" {region}: ${total:,.2f}")
PYEOF
python data_structures.py
HitaVir Tech says: "A list of dictionaries is the bread and butter of data engineering. It is how APIs return data, how you process CSV rows, and how you pass data between pipeline stages. Master this pattern."
This table is asked in every Python interview for Data Engineering roles. Memorize it.
Feature | List | Tuple | Set | Dictionary |
Syntax |
|
|
|
|
Mutable? | Yes | No | Yes | Yes |
Ordered? | Yes | Yes | No | Yes (3.7+) |
Duplicates? | Allowed | Allowed | Not allowed | Keys unique |
Indexing? | Yes | Yes | No | By key |
Slicing? | Yes | Yes | No | No |
Use case | Collections of items | Fixed records | Unique values | Key-value mapping |
Operation | List | Tuple | Set | Dict |
Access by index | O(1) | O(1) | N/A | N/A |
Access by key | N/A | N/A | N/A | O(1) |
Search (in) | O(n) | O(n) | O(1) | O(1) |
Append / Add | O(1) | N/A | O(1) | O(1) |
Insert at position | O(n) | N/A | N/A | N/A |
Delete | O(n) | N/A | O(1) | O(1) |
Memory usage | Medium | Low | High | High |
Scenario | Use This | Why |
Rows from a CSV file | List of dicts | Each row is a dict, collection is a list |
Database connection config | Tuple or dict | Config should not change (tuple) or needs named access (dict) |
Column names to process | List | Ordered, may have duplicates |
Unique customer IDs | Set | Automatic deduplication, O(1) lookup |
API response / JSON data | Dict | Key-value structure matches JSON |
Mapping old column names to new | Dict |
|
Batch of records for processing | List of dicts | Industry standard for tabular data |
Immutable function return values | Tuple | Cannot be accidentally modified |
Checking if value exists in large dataset | Set | O(1) vs O(n) for lists |
Environment variables / settings | Dict | Named access: |
Coordinates or fixed pairs | Tuple |
|
Pipeline of transformation steps | List of functions |
|
List = Shopping cart → ordered, changeable, duplicates OK
Tuple = ID card → ordered, fixed, cannot be changed
Set = Unique stamps → unordered, no duplicates, fast lookup
Dict = Phone book → name→number pairs, fast lookup by key
HitaVir Tech says: "In interviews, they will ask: ‘When would you use a set instead of a list?' The answer: when you need unique values and fast O(1) lookups. A set checks membership instantly; a list scans every element. For 10 million records, that is the difference between milliseconds and minutes."
Reading and writing files is the foundation of every data pipeline.
cat > create_sample_data.py << 'PYEOF'
"""
HitaVir Tech - Create sample data files for practice
"""
import csv
import json
# --- Create CSV file ---
sales_data = [
["order_id", "customer", "product", "quantity", "price", "date", "region"],
[1001, "Alice Johnson", "Laptop", 1, 999.99, "2026-04-01", "North"],
[1002, "Bob Smith", "Mouse", 5, 29.99, "2026-04-01", "South"],
[1003, "Charlie Brown", "Keyboard", 2, 79.99, "2026-04-01", "North"],
[1004, "", "Monitor", 1, 449.99, "2026-04-02", "East"],
[1005, "Diana Prince", "Laptop", 2, 999.99, "2026-04-02", "West"],
[1006, "Eve Wilson", "", 3, 29.99, "2026-04-02", "South"],
[1007, "Frank Miller", "Keyboard", 0, 79.99, "2026-04-03", "North"],
[1008, "Grace Lee", "Headphones", 1, 149.99, "2026-04-03", "East"],
[1009, "Henry Davis", "Monitor", 1, 449.99, "2026-04-03", "West"],
[1010, "Ivy Chen", "Laptop", 1, -999.99, "2026-04-03", "North"],
]
with open("sales_raw.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(sales_data)
print("Created: sales_raw.csv")
# --- Create JSON config ---
config = {
"pipeline_name": "HitaVir Sales ETL",
"version": "1.0.0",
"source": {
"type": "csv",
"path": "sales_raw.csv"
},
"rules": {
"max_null_percent": 0.05,
"min_quantity": 1,
"min_price": 0.01
},
"output": {
"path": "sales_cleaned.csv",
"report_path": "pipeline_report.json"
}
}
with open("pipeline_config.json", "w") as f:
json.dump(config, f, indent=2)
print("Created: pipeline_config.json")
# --- Create log file ---
logs = """[2026-04-05 08:00:01] INFO: Pipeline started
[2026-04-05 08:00:02] INFO: Loading sales_raw.csv
[2026-04-05 08:00:03] WARNING: Found 2 records with missing customer names
[2026-04-05 08:00:04] ERROR: Record 1010 has negative price
[2026-04-05 08:00:05] INFO: Cleaned 10 records, 2 rejected
[2026-04-05 08:00:06] INFO: Pipeline completed in 5.2s
"""
with open("pipeline.log", "w") as f:
f.write(logs)
print("Created: pipeline.log")
print("\nAll sample files created successfully!")
PYEOF
python create_sample_data.py
cat > file_csv.py << 'PYEOF'
"""
HitaVir Tech - CSV File Handling
"""
import csv
# --- READ CSV ---
print("=" * 60)
print("READING CSV FILE")
print("=" * 60)
records = []
with open("sales_raw.csv", "r") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(row)
print(f"Loaded {len(records)} records\n")
print("First 3 records:")
for i, record in enumerate(records[:3]):
print(f" {i+1}. Order {record['order_id']}: "
f"{record['customer']} bought {record['quantity']}x {record['product']} "
f"@ ${record['price']}")
# --- PROCESS CSV ---
print(f"\n{'=' * 60}")
print("PROCESSING CSV DATA")
print("=" * 60)
cleaned = []
rejected = []
for record in records:
# Convert numeric fields
record["quantity"] = int(record["quantity"])
record["price"] = float(record["price"])
# Validation
issues = []
if not record["customer"]:
issues.append("missing customer")
if not record["product"]:
issues.append("missing product")
if record["quantity"] <= 0:
issues.append(f"invalid quantity: {record['quantity']}")
if record["price"] <= 0:
issues.append(f"invalid price: {record['price']}")
if issues:
record["rejection_reason"] = "; ".join(issues)
rejected.append(record)
print(f" REJECTED Order {record['order_id']}: {record['rejection_reason']}")
else:
record["total"] = round(record["price"] * record["quantity"], 2)
cleaned.append(record)
print(f"\nCleaned: {len(cleaned)} records")
print(f"Rejected: {len(rejected)} records")
# --- WRITE CSV ---
print(f"\n{'=' * 60}")
print("WRITING CLEANED CSV")
print("=" * 60)
fieldnames = ["order_id", "customer", "product", "quantity", "price", "total", "date", "region"]
with open("sales_cleaned.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(cleaned)
print(f"Saved {len(cleaned)} records to sales_cleaned.csv")
# Display results
total_revenue = sum(r["total"] for r in cleaned)
print(f"Total revenue: ${total_revenue:,.2f}")
PYEOF
python file_csv.py
cat > file_json.py << 'PYEOF'
"""
HitaVir Tech - JSON File Handling
"""
import json
# --- READ JSON ---
print("=" * 60)
print("READING JSON CONFIG")
print("=" * 60)
with open("pipeline_config.json", "r") as f:
config = json.load(f)
print(f"Pipeline: {config['pipeline_name']}")
print(f"Version: {config['version']}")
print(f"Source: {config['source']['path']}")
print(f"Max null %: {config['rules']['max_null_percent']}")
# --- CREATE PIPELINE REPORT (JSON) ---
print(f"\n{'=' * 60}")
print("GENERATING PIPELINE REPORT")
print("=" * 60)
report = {
"pipeline": config["pipeline_name"],
"run_date": "2026-04-05",
"status": "completed",
"metrics": {
"total_input_records": 10,
"cleaned_records": 7,
"rejected_records": 3,
"success_rate": 70.0,
"total_revenue": 4339.89,
"processing_time_seconds": 5.2
},
"top_products": [
{"product": "Laptop", "revenue": 2999.97, "orders": 3},
{"product": "Monitor", "revenue": 449.99, "orders": 1},
{"product": "Headphones", "revenue": 149.99, "orders": 1}
],
"rejections": [
{"order_id": 1004, "reason": "missing customer"},
{"order_id": 1006, "reason": "missing product"},
{"order_id": 1007, "reason": "invalid quantity: 0"},
{"order_id": 1010, "reason": "invalid price: -999.99"}
]
}
with open("pipeline_report.json", "w") as f:
json.dump(report, f, indent=2)
print("Report saved to pipeline_report.json")
print(f"\nPipeline Status: {report['status']}")
print(f"Success Rate: {report['metrics']['success_rate']}%")
print(f"Revenue: ${report['metrics']['total_revenue']:,.2f}")
PYEOF
python file_json.py
cat > file_logs.py << 'PYEOF'
"""
HitaVir Tech - Log File Analysis
"""
# --- Read and analyze log file ---
print("=" * 60)
print("LOG FILE ANALYSIS")
print("=" * 60)
with open("pipeline.log", "r") as f:
lines = f.readlines()
info_count = 0
warning_count = 0
error_count = 0
errors = []
for line in lines:
line = line.strip()
if not line:
continue
if "INFO:" in line:
info_count += 1
elif "WARNING:" in line:
warning_count += 1
elif "ERROR:" in line:
error_count += 1
errors.append(line)
print(f"Total log entries: {len([l for l in lines if l.strip()])}")
print(f"INFO: {info_count}")
print(f"WARNING: {warning_count}")
print(f"ERROR: {error_count}")
if errors:
print(f"\nError details:")
for err in errors:
print(f" {err}")
PYEOF
python file_logs.py
HitaVir Tech says: "File handling is where theory meets reality. Every data pipeline starts with reading a file and ends with writing a file. CSV and JSON are the two formats you will use most."
Production pipelines must handle errors gracefully and log everything for debugging.
cat > error_handling.py << 'PYEOF'
"""
HitaVir Tech - Error Handling & Logging
"""
import logging
from datetime import datetime
# --- Setup logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler("etl_pipeline.log"),
logging.StreamHandler() # Also print to console
]
)
logger = logging.getLogger("HitaVirETL")
# --- try-except basics ---
print("=" * 60)
print("ERROR HANDLING BASICS")
print("=" * 60)
# Division by zero
try:
result = 100 / 0
except ZeroDivisionError:
logger.error("Cannot divide by zero!")
# File not found
try:
with open("nonexistent_file.csv", "r") as f:
data = f.read()
except FileNotFoundError:
logger.warning("File not found — using default data")
# Type conversion error
try:
value = int("not_a_number")
except ValueError as e:
logger.error(f"Type conversion failed: {e}")
# --- Real pipeline with error handling ---
print(f"\n{'=' * 60}")
print("PRODUCTION-GRADE PIPELINE")
print("=" * 60)
def safe_extract(filepath):
"""Extract data with error handling."""
logger.info(f"Starting extraction from {filepath}")
try:
with open(filepath, "r") as f:
import csv
reader = csv.DictReader(f)
records = list(reader)
logger.info(f"Extracted {len(records)} records")
return records
except FileNotFoundError:
logger.error(f"Source file not found: {filepath}")
return []
except Exception as e:
logger.critical(f"Unexpected error during extraction: {e}")
return []
def safe_transform(records):
"""Transform data with per-record error handling."""
logger.info(f"Starting transformation of {len(records)} records")
cleaned = []
errors = 0
for i, record in enumerate(records):
try:
record["price"] = float(record.get("price", 0))
record["quantity"] = int(record.get("quantity", 0))
if record["price"] <= 0 or record["quantity"] <= 0:
raise ValueError("Invalid price or quantity")
record["total"] = round(record["price"] * record["quantity"], 2)
cleaned.append(record)
except (ValueError, TypeError) as e:
errors += 1
logger.warning(f"Record {i+1} skipped: {e}")
logger.info(f"Transformation complete: {len(cleaned)} valid, {errors} errors")
return cleaned
def safe_load(records, output_path):
"""Load data with error handling."""
logger.info(f"Loading {len(records)} records to {output_path}")
try:
import csv
fieldnames = ["order_id", "customer", "product", "quantity", "price", "total", "date", "region"]
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(records)
logger.info(f"Successfully saved to {output_path}")
return True
except Exception as e:
logger.critical(f"Failed to save output: {e}")
return False
# --- Run the pipeline ---
logger.info("=" * 40)
logger.info("HitaVir Tech ETL Pipeline Starting")
logger.info("=" * 40)
start_time = datetime.now()
data = safe_extract("sales_raw.csv")
if data:
cleaned = safe_transform(data)
if cleaned:
success = safe_load(cleaned, "sales_output.csv")
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.info(f"Pipeline finished in {duration:.2f}s")
logger.info("=" * 40)
print(f"\nLog file saved to: etl_pipeline.log")
PYEOF
python error_handling.py
HitaVir Tech says: "In production, errors WILL happen. The question is not if, but when. Good error handling means your pipeline fails gracefully, logs the problem, and makes debugging easy."
pandas is the most important Python library for data engineering. It handles tabular data like a spreadsheet on steroids.
cat > pandas_basics.py << 'PYEOF'
"""
HitaVir Tech - pandas for Data Engineering
"""
import pandas as pd
# --- READ CSV INTO DATAFRAME ---
print("=" * 60)
print("LOADING DATA WITH PANDAS")
print("=" * 60)
df = pd.read_csv("sales_raw.csv")
print(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns")
print(f"\nColumn names: {list(df.columns)}")
print(f"\nData types:\n{df.dtypes}")
print(f"\nFirst 5 rows:")
print(df.head())
# --- EXPLORE DATA ---
print(f"\n{'=' * 60}")
print("DATA EXPLORATION")
print("=" * 60)
print("\nBasic statistics:")
print(df.describe())
print(f"\nMissing values per column:")
print(df.isnull().sum())
print(f"\nUnique products: {df['product'].nunique()}")
print(f"Unique regions: {df['region'].nunique()}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
# --- CLEAN DATA ---
print(f"\n{'=' * 60}")
print("DATA CLEANING")
print("=" * 60)
# Remove rows with missing customer or product
df_clean = df.dropna(subset=["customer", "product"])
print(f"After dropping nulls: {len(df_clean)} rows (was {len(df)})")
# Remove invalid data
df_clean = df_clean[df_clean["quantity"] > 0]
df_clean = df_clean[df_clean["price"] > 0]
print(f"After removing invalid: {len(df_clean)} rows")
# --- TRANSFORM DATA ---
print(f"\n{'=' * 60}")
print("DATA TRANSFORMATION")
print("=" * 60)
# Add calculated columns
df_clean = df_clean.copy()
df_clean["total"] = (df_clean["price"] * df_clean["quantity"]).round(2)
df_clean["price_category"] = df_clean["price"].apply(
lambda p: "Premium" if p >= 200 else ("Mid" if p >= 50 else "Budget")
)
print(df_clean[["order_id", "product", "price", "quantity", "total", "price_category"]])
# --- AGGREGATE DATA ---
print(f"\n{'=' * 60}")
print("DATA AGGREGATION")
print("=" * 60)
# Revenue by region
print("\nRevenue by Region:")
region_summary = df_clean.groupby("region")["total"].agg(["sum", "count", "mean"]).round(2)
region_summary.columns = ["revenue", "orders", "avg_order"]
print(region_summary)
# Revenue by product
print("\nRevenue by Product:")
product_summary = df_clean.groupby("product")["total"].sum().sort_values(ascending=False)
print(product_summary)
# --- SAVE OUTPUT ---
print(f"\n{'=' * 60}")
print("SAVING RESULTS")
print("=" * 60)
df_clean.to_csv("sales_pandas_cleaned.csv", index=False)
print("Saved: sales_pandas_cleaned.csv")
region_summary.to_csv("region_report.csv")
print("Saved: region_report.csv")
print(f"\nTotal revenue: ${df_clean['total'].sum():,.2f}")
print(f"Average order value: ${df_clean['total'].mean():,.2f}")
print(f"Top product: {product_summary.index[0]} (${product_summary.iloc[0]:,.2f})")
PYEOF
python pandas_basics.py
HitaVir Tech says: "pandas is to data engineers what a stethoscope is to doctors — you cannot work without it. Learn read_csv, groupby, merge, and apply, and you can handle 90% of data tasks."
Time to build a real, production-quality ETL pipeline that combines everything you have learned.
HitaVir Tech receives daily sales CSV files. You need to build an automated pipeline that:
mkdir -p pipeline_project/input pipeline_project/output pipeline_project/logs
cat > pipeline_project/etl_pipeline.py << 'PYEOF'
"""
HitaVir Tech - Sales Data ETL Pipeline
=======================================
A production-quality ETL pipeline that processes daily sales data.
Usage:
python etl_pipeline.py
Author: HitaVir Tech
Version: 1.0.0
"""
import csv
import json
import logging
import os
from datetime import datetime
from collections import defaultdict
# ============================================================
# CONFIGURATION
# ============================================================
CONFIG = {
"input_file": "input/sales_raw.csv",
"output_file": "output/sales_cleaned.csv",
"report_file": "output/daily_report.json",
"rejected_file": "output/rejected_records.csv",
"log_file": "logs/pipeline.log",
"rules": {
"required_fields": ["order_id", "customer", "product", "quantity", "price"],
"min_quantity": 1,
"min_price": 0.01,
"max_price": 100000,
"valid_regions": ["North", "South", "East", "West"]
}
}
# ============================================================
# LOGGING SETUP
# ============================================================
def setup_logging(log_file):
"""Configure logging for the pipeline."""
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)-8s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger("HitaVirETL")
# ============================================================
# EXTRACT
# ============================================================
def extract(filepath, logger):
"""
Extract: Read raw CSV data from source file.
Returns:
list[dict]: List of records, or empty list on failure.
"""
logger.info(f"[EXTRACT] Reading from {filepath}")
if not os.path.exists(filepath):
logger.error(f"[EXTRACT] File not found: {filepath}")
return []
try:
with open(filepath, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
records = list(reader)
logger.info(f"[EXTRACT] Loaded {len(records)} records with columns: {reader.fieldnames}")
return records
except Exception as e:
logger.critical(f"[EXTRACT] Failed to read file: {e}")
return []
# ============================================================
# VALIDATE
# ============================================================
def validate_record(record, rules):
"""
Validate a single record against business rules.
Returns:
tuple: (is_valid: bool, errors: list[str])
"""
errors = []
# Check required fields
for field in rules["required_fields"]:
if not record.get(field, "").strip():
errors.append(f"Missing required field: {field}")
# Validate numeric fields
try:
qty = int(record.get("quantity", 0))
if qty < rules["min_quantity"]:
errors.append(f"Quantity {qty} below minimum {rules['min_quantity']}")
except ValueError:
errors.append(f"Invalid quantity: {record.get('quantity')}")
try:
price = float(record.get("price", 0))
if price < rules["min_price"]:
errors.append(f"Price {price} below minimum {rules['min_price']}")
if price > rules["max_price"]:
errors.append(f"Price {price} above maximum {rules['max_price']}")
except ValueError:
errors.append(f"Invalid price: {record.get('price')}")
# Validate region
region = record.get("region", "").strip()
if region and region not in rules["valid_regions"]:
errors.append(f"Invalid region: {region}")
return len(errors) == 0, errors
# ============================================================
# TRANSFORM
# ============================================================
def transform(records, rules, logger):
"""
Transform: Clean, validate, and enrich records.
Returns:
tuple: (cleaned_records, rejected_records)
"""
logger.info(f"[TRANSFORM] Processing {len(records)} records")
cleaned = []
rejected = []
for record in records:
is_valid, errors = validate_record(record, rules)
if not is_valid:
record["rejection_reasons"] = "; ".join(errors)
rejected.append(record)
logger.warning(f"[TRANSFORM] Rejected order {record.get('order_id', '?')}: {errors}")
continue
# Type conversion
record["quantity"] = int(record["quantity"])
record["price"] = float(record["price"])
# Enrichment
record["total"] = round(record["price"] * record["quantity"], 2)
record["customer"] = record["customer"].strip().title()
record["product"] = record["product"].strip().title()
# Categorization
if record["total"] >= 1000:
record["tier"] = "Enterprise"
elif record["total"] >= 200:
record["tier"] = "Business"
else:
record["tier"] = "Consumer"
cleaned.append(record)
logger.info(f"[TRANSFORM] Result: {len(cleaned)} valid, {len(rejected)} rejected")
return cleaned, rejected
# ============================================================
# LOAD
# ============================================================
def load(cleaned, rejected, config, logger):
"""
Load: Save cleaned data and rejected records to files.
"""
os.makedirs("output", exist_ok=True)
# Save cleaned records
if cleaned:
fieldnames = ["order_id", "customer", "product", "quantity",
"price", "total", "tier", "date", "region"]
with open(config["output_file"], "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(cleaned)
logger.info(f"[LOAD] Saved {len(cleaned)} cleaned records to {config['output_file']}")
# Save rejected records
if rejected:
fieldnames = ["order_id", "customer", "product", "quantity",
"price", "date", "region", "rejection_reasons"]
with open(config["rejected_file"], "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rejected)
logger.info(f"[LOAD] Saved {len(rejected)} rejected records to {config['rejected_file']}")
# ============================================================
# REPORT
# ============================================================
def generate_report(cleaned, rejected, duration, config, logger):
"""Generate pipeline run report."""
total_input = len(cleaned) + len(rejected)
success_rate = (len(cleaned) / total_input * 100) if total_input > 0 else 0
# Aggregate metrics
revenue_by_region = defaultdict(float)
revenue_by_product = defaultdict(float)
tier_counts = defaultdict(int)
for record in cleaned:
revenue_by_region[record["region"]] += record["total"]
revenue_by_product[record["product"]] += record["total"]
tier_counts[record["tier"]] += 1
report = {
"pipeline": "HitaVir Tech Sales ETL",
"run_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"duration_seconds": round(duration, 2),
"summary": {
"total_input": total_input,
"cleaned": len(cleaned),
"rejected": len(rejected),
"success_rate": round(success_rate, 1)
},
"revenue": {
"total": round(sum(r["total"] for r in cleaned), 2),
"by_region": dict(sorted(revenue_by_region.items())),
"by_product": dict(sorted(revenue_by_product.items(),
key=lambda x: x[1], reverse=True))
},
"tier_distribution": dict(tier_counts)
}
with open(config["report_file"], "w") as f:
json.dump(report, f, indent=2)
logger.info(f"[REPORT] Saved to {config['report_file']}")
return report
# ============================================================
# MAIN — ORCHESTRATOR
# ============================================================
def run_pipeline():
"""Main pipeline orchestrator."""
logger = setup_logging(CONFIG["log_file"])
logger.info("=" * 60)
logger.info("HitaVir Tech Sales ETL Pipeline — Starting")
logger.info("=" * 60)
start_time = datetime.now()
# EXTRACT
raw_data = extract(CONFIG["input_file"], logger)
if not raw_data:
logger.error("No data extracted. Pipeline aborted.")
return
# TRANSFORM
cleaned, rejected = transform(raw_data, CONFIG["rules"], logger)
# LOAD
load(cleaned, rejected, CONFIG, logger)
# REPORT
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
report = generate_report(cleaned, rejected, duration, CONFIG, logger)
# SUMMARY
logger.info("=" * 60)
logger.info("PIPELINE COMPLETE")
logger.info(f" Input: {report['summary']['total_input']} records")
logger.info(f" Cleaned: {report['summary']['cleaned']} records")
logger.info(f" Rejected: {report['summary']['rejected']} records")
logger.info(f" Success rate: {report['summary']['success_rate']}%")
logger.info(f" Total revenue: ${report['revenue']['total']:,.2f}")
logger.info(f" Duration: {duration:.2f}s")
logger.info("=" * 60)
if __name__ == "__main__":
run_pipeline()
PYEOF
cp sales_raw.csv pipeline_project/input/
cd pipeline_project
python etl_pipeline.py
echo "--- Cleaned Data ---"
cat output/sales_cleaned.csv
echo -e "\n--- Rejected Records ---"
cat output/rejected_records.csv
echo -e "\n--- Pipeline Report ---"
cat output/daily_report.json
echo -e "\n--- Pipeline Log ---"
cat logs/pipeline.log
pipeline_project/
├── etl_pipeline.py ← Main pipeline script
├── input/
│ └── sales_raw.csv ← Raw input data
├── output/
│ ├── sales_cleaned.csv ← Cleaned output
│ ├── rejected_records.csv ← Failed records
│ └── daily_report.json ← Summary report
└── logs/
└── pipeline.log ← Execution log
HitaVir Tech says: "This is a real ETL pipeline. It extracts, validates, transforms, loads, reports, and logs. This exact pattern scales from 10 records to 10 million. Add PySpark and you are ready for Big Data."
cd ~/python-de-learning
Level up your Python with patterns used daily in data engineering.
cat > intermediate.py << 'PYEOF'
"""
HitaVir Tech - Intermediate Python for Data Engineering
"""
# ====== LIST COMPREHENSIONS ======
print("=" * 60)
print("LIST COMPREHENSIONS")
print("=" * 60)
# Traditional loop
prices = [999.99, 29.99, 79.99, 449.99, 149.99]
high_value = []
for p in prices:
if p > 100:
high_value.append(p)
# Same thing with list comprehension (Pythonic way)
high_value_lc = [p for p in prices if p > 100]
print(f"High value items: {high_value_lc}")
# Transform and filter in one line
discounted = [round(p * 0.9, 2) for p in prices if p > 100]
print(f"10% discount on premium items: {discounted}")
# Dictionary comprehension
products = ["Laptop", "Mouse", "Keyboard", "Monitor"]
prices = [999.99, 29.99, 79.99, 449.99]
catalog = {product: price for product, price in zip(products, prices)}
print(f"\nCatalog: {catalog}")
# ====== LAMBDA FUNCTIONS ======
print(f"\n{'=' * 60}")
print("LAMBDA FUNCTIONS")
print("=" * 60)
# Named function
def calculate_tax(amount):
return round(amount * 0.18, 2)
# Lambda equivalent (inline function)
tax = lambda amount: round(amount * 0.18, 2)
print(f"Tax on $100: ${tax(100)}")
# Common use: sorting complex data
sales = [
{"product": "Laptop", "revenue": 4999.95},
{"product": "Mouse", "revenue": 149.95},
{"product": "Monitor", "revenue": 449.99},
]
# Sort by revenue (descending)
sorted_sales = sorted(sales, key=lambda x: x["revenue"], reverse=True)
print("\nSales sorted by revenue:")
for s in sorted_sales:
print(f" {s['product']}: ${s['revenue']:,.2f}")
# Common use: map and filter
amounts = [100, 250, 50, 800, 30, 1200]
large_with_tax = list(map(lambda x: round(x * 1.18, 2), filter(lambda x: x > 200, amounts)))
print(f"\nLarge amounts with 18% tax: {large_with_tax}")
# ====== BASIC OOP ======
print(f"\n{'=' * 60}")
print("CLASSES (BASIC OOP)")
print("=" * 60)
class DataPipeline:
"""A reusable data pipeline class."""
def __init__(self, name, source, destination):
self.name = name
self.source = source
self.destination = destination
self.records = []
self.status = "initialized"
def extract(self):
"""Simulate data extraction."""
self.status = "extracting"
self.records = [
{"id": 1, "value": 100},
{"id": 2, "value": 200},
{"id": 3, "value": 300},
]
print(f" [{self.name}] Extracted {len(self.records)} records from {self.source}")
return self
def transform(self, multiplier=1.0):
"""Transform records."""
self.status = "transforming"
for record in self.records:
record["value"] = record["value"] * multiplier
print(f" [{self.name}] Transformed {len(self.records)} records (x{multiplier})")
return self
def load(self):
"""Load records to destination."""
self.status = "completed"
print(f" [{self.name}] Loaded {len(self.records)} records to {self.destination}")
return self
def run(self):
"""Run the full pipeline."""
print(f"\nRunning pipeline: {self.name}")
return self.extract().transform(multiplier=1.18).load()
def __str__(self):
return f"Pipeline('{self.name}', status={self.status}, records={len(self.records)})"
# Use the class
pipeline = DataPipeline(
name="Sales ETL",
source="postgres://hitavir-db",
destination="s3://hitavir-warehouse"
)
pipeline.run()
print(f"Result: {pipeline}")
# ====== WORKING WITH APIs ======
print(f"\n{'=' * 60}")
print("WORKING WITH APIs")
print("=" * 60)
try:
import requests
response = requests.get("https://api.github.com/repos/python/cpython")
if response.status_code == 200:
data = response.json()
print(f"Repo: {data['full_name']}")
print(f"Stars: {data['stargazers_count']:,}")
print(f"Language: {data['language']}")
print(f"Open issues: {data['open_issues_count']:,}")
else:
print(f"API returned status: {response.status_code}")
except ImportError:
print("requests not installed — run: pip install requests")
except Exception as e:
print(f"API call failed: {e}")
PYEOF
python intermediate.py
HitaVir Tech says: "List comprehensions, lambdas, and classes are the intermediate trifecta. Comprehensions make your code concise. Lambdas make sorting and filtering elegant. Classes make your pipelines reusable and testable."
How real data engineering teams organize their code.
cat > best_practices.py << 'PYEOF'
"""
HitaVir Tech - Python Best Practices for Data Engineering
"""
# ====== NAMING CONVENTIONS ======
# Variables and functions: snake_case
pipeline_name = "sales_etl"
total_record_count = 1500
def calculate_success_rate(total, failed):
return round((total - failed) / total * 100, 2)
# Classes: PascalCase
class DataPipeline:
pass
class SalesTransformer:
pass
# Constants: UPPER_SNAKE_CASE
MAX_RETRY_COUNT = 3
DEFAULT_BATCH_SIZE = 1000
DATABASE_TIMEOUT = 30
# ====== DOCSTRINGS ======
def process_batch(records, batch_size=500):
"""
Process records in batches.
Args:
records (list): List of dictionaries containing record data.
batch_size (int): Number of records per batch. Defaults to 500.
Returns:
list: Processed records.
Raises:
ValueError: If records is empty.
"""
if not records:
raise ValueError("Records list cannot be empty")
processed = []
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
processed.extend(batch)
return processed
print("Best practices loaded successfully!")
print(f"Max retries: {MAX_RETRY_COUNT}")
print(f"Default batch size: {DEFAULT_BATCH_SIZE}")
PYEOF
python best_practices.py
hitavir-data-project/
├── README.md ← Project documentation
├── requirements.txt ← Package dependencies
├── .gitignore ← Files to exclude from Git
├── setup.py ← Package configuration
│
├── src/ ← Source code
│ ├── __init__.py
│ ├── extract.py ← Extraction logic
│ ├── transform.py ← Transformation logic
│ ├── load.py ← Loading logic
│ ├── validate.py ← Validation rules
│ └── utils.py ← Utility functions
│
├── config/ ← Configuration files
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
│
├── tests/ ← Test files
│ ├── test_extract.py
│ ├── test_transform.py
│ └── test_validate.py
│
├── data/ ← Data files (gitignored)
│ ├── input/
│ └── output/
│
└── logs/ ← Log files (gitignored)
HitaVir Tech says: "Code is read 10 times more than it is written. Follow naming conventions, write docstrings, and organize your projects. Your future self and your teammates will thank you."
Common issues Windows users face and how to fix them.
Error | Cause | Fix |
| Package not installed |
|
| Wrong file path | Use |
| Mixed tabs and spaces | Use VS Code's "Convert Indentation" |
| Wrong data type | Use |
| Dict key missing | Use |
| File encoding issue | Add |
Issue | Fix |
| Use |
Path uses backslash | Use forward slashes: |
Permission denied | Run terminal as Administrator |
| Use |
# When something is wrong, add print statements:
print(f"DEBUG: variable = {variable}")
print(f"DEBUG: type = {type(variable)}")
print(f"DEBUG: len = {len(data)}")
# Or use the built-in debugger:
# python -m pdb your_script.py
HitaVir Tech says: "Every bug is a lesson. Read the error message carefully — Python tells you exactly what went wrong and on which line. The traceback is your best debugging friend."
These questions are frequently asked in Python Data Engineering interviews at companies hiring through LinkedIn, Naukri, Indeed, and at institutes like Simplilearn, Intellipaat, Coursera, DataCamp, Great Learning, and Scaler Academy. They cover the exact topics you learned in this codelab.
Q1: What is the difference between a list and a tuple?
Answer: A list is mutable (can be changed after creation) while a tuple is immutable (cannot be changed). Lists use square brackets [], tuples use parentheses (). Use tuples for fixed data like database connection configs (host, port, db) and lists for collections that change like rows from a query result. Tuples are slightly faster and use less memory.
Q2: What are
*args
and
**kwargs
? Give a real example.
Answer: *args allows a function to accept any number of positional arguments as a tuple. **kwargs allows any number of keyword arguments as a dictionary. Real example: a database connection function uses **kwargs so callers can pass host, port, ssl, timeout — any combination without the function needing to define every parameter explicitly.
def connect_db(**kwargs):
host = kwargs.get("host", "localhost")
port = kwargs.get("port", 5432)
# flexible — caller decides which params to pass
Q3: What is the difference between
==
and
is
?
Answer: == checks if two values are equal (same content). is checks if two variables point to the exact same object in memory (same identity). For data engineering: use == for value comparison, use is only for checking None (if value is None).
Q4: What are list comprehensions? Why are they preferred?
Answer: List comprehensions are a concise way to create lists from existing iterables. They are preferred because they are faster than traditional for loops (optimized internally by Python) and more readable for simple transformations.
# Traditional loop
results = []
for x in data:
if x > 0:
results.append(x * 2)
# List comprehension (faster, cleaner)
results = [x * 2 for x in data if x > 0]
Q5: Explain mutable vs immutable types with examples.
Answer: Mutable objects can be changed after creation: list, dict, set. Immutable objects cannot: int, float, str, tuple, frozenset. This matters in data engineering when passing data between functions — mutable objects can be accidentally modified by a function, causing bugs. Use tuples for data that must not change.
Q6: How do you read a CSV file in Python? Compare csv module vs pandas.
Answer: The csv module is built-in and lightweight — good for simple row-by-row processing. pandas read_csv() loads the entire file into a DataFrame — good for analysis, transformation, and aggregation. For small files or streaming, use csv. For analytics and transformation pipelines, use pandas.
# csv module (row by row, low memory)
import csv
with open("data.csv") as f:
reader = csv.DictReader(f)
for row in reader:
process(row)
# pandas (full DataFrame, powerful but uses more memory)
import pandas as pd
df = pd.read_csv("data.csv")
Q7: How do you handle missing values in a dataset?
Answer: Detect with df.isnull().sum(). Handle by either: (1) dropping rows: df.dropna(), (2) filling with default: df.fillna(0) or df.fillna(method="ffill"), (3) filling with statistics: df.fillna(df["column"].mean()). The strategy depends on business rules — for financial data, you might reject rows; for sensor data, you might forward-fill.
Q8: What is the difference between
json.load()
and
json.loads()
?
Answer: json.load() reads from a file object. json.loads() reads from a string. Similarly, json.dump() writes to a file, json.dumps() converts to a string. The "s" stands for "string."
import json
# From file
with open("config.json") as f:
data = json.load(f)
# From string
data = json.loads('{"key": "value"}')
Q9: How would you process a 10 GB CSV file that does not fit in memory?
Answer: Use chunked reading with pandas: pd.read_csv("huge.csv", chunksize=10000) which returns an iterator of DataFrames. Or use the csv module for row-by-row processing. For production, use PySpark or Dask which distribute processing across multiple machines. You can also use generator functions with yield to process batches lazily.
Q10: What is a decorator? Give a data engineering use case.
Answer: A decorator is a function that wraps another function to add behavior without modifying the original. In data engineering, common use cases are: timing pipeline steps (@timer), retrying on failure (@retry(max=3)), logging function calls (@log_execution), and caching results (@lru_cache).
def retry(max_attempts=3):
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception:
if attempt == max_attempts - 1:
raise
return wrapper
return decorator
@retry(max_attempts=3)
def fetch_data_from_api(url):
# automatically retries up to 3 times on failure
pass
Q11: What is the difference between
@staticmethod
and
@classmethod
?
Answer: @staticmethod does not receive self or cls — it is a utility function that lives inside a class. @classmethod receives cls (the class itself) and can create instances. In a data pipeline class, you might use @classmethod as a factory method: Pipeline.from_config("config.yaml") and @staticmethod for utility: Pipeline.validate_path(path).
Q12: What are generators? When would you use them in data engineering?
Answer: Generators use yield instead of return and produce values lazily — one at a time, without loading everything into memory. Essential for processing large datasets, reading database cursors row-by-row, or streaming data. They are memory-efficient because only one item exists in memory at a time.
def read_batches(filepath, batch_size=1000):
batch = []
with open(filepath) as f:
for line in f:
batch.append(line)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
Q13: How do you handle errors in a production data pipeline?
Answer: Use try-except blocks with specific exception types (never bare except:). Log errors with the logging module (not print). Implement retry logic for transient failures (network, database timeouts). Use finally blocks for cleanup (closing connections). Store failed records separately for investigation rather than silently dropping them.
Q14: What is the difference between
raise
and
raise from
?
Answer: raise re-raises or creates an exception. raise NewError() from original_error chains exceptions — preserving the original traceback while wrapping it in a more meaningful error. This is useful in pipelines to add context: "Failed to process batch 42" while still showing the original "Connection refused" error.
Q15: How do you implement logging in Python?
Answer: Use the built-in logging module with appropriate levels: DEBUG for development, INFO for normal operations, WARNING for unexpected but handled situations, ERROR for failures, CRITICAL for system-level failures. Configure with basicConfig() or handler-based setup for file + console output. Never use print() in production pipelines.
Q16: What is the difference between
loc
and
iloc
in pandas?
Answer: loc selects by label (column names, index labels). iloc selects by integer position. df.loc[0:5, "name"] selects rows 0 through 5 of column "name" (inclusive). df.iloc[0:5, 0] selects the first 5 rows of the first column (exclusive of end).
Q17: How do you merge two DataFrames? Explain join types.
Answer: Use pd.merge(df1, df2, on="key", how="inner"). Join types: inner (matching rows only), left (all from left, matching from right), right (all from right), outer (all from both). This mirrors SQL JOIN behavior. Use left when you want to keep all records from your primary table and enrich with data from a lookup table.
Q18: How do you handle duplicate rows in pandas?
Answer: Detect with df.duplicated().sum(). Remove with df.drop_duplicates(). For specific columns: df.drop_duplicates(subset=["email"]). Keep first or last: df.drop_duplicates(keep="last"). In data engineering, always check for duplicates after merging datasets or loading incremental data.
Q19: What is the
apply()
function in pandas?
Answer: apply() runs a function on every row or column of a DataFrame. Use it for custom transformations that cannot be done with built-in pandas methods. It is slower than vectorized operations but more flexible.
# Apply to each row
df["full_name"] = df.apply(lambda row: f"{row['first']} {row['last']}", axis=1)
# Apply to a column
df["category"] = df["price"].apply(lambda p: "Premium" if p > 500 else "Standard")
Q20: What is the difference between
groupby().agg()
and
groupby().transform()
?
Answer: agg() returns a reduced DataFrame (one row per group) — used for summary reports. transform() returns a DataFrame with the same shape as the original — each row gets the group's aggregated value. Use transform() when you need group statistics as a new column alongside individual rows.
# agg: one row per region (summary)
df.groupby("region")["revenue"].agg(["sum", "mean", "count"])
# transform: adds group mean to every row (enrichment)
df["region_avg"] = df.groupby("region")["revenue"].transform("mean")
Q21: How do you structure a Python data engineering project?
Answer: Use a modular structure separating concerns: src/ for source code (extract.py, transform.py, load.py), config/ for environment configs, tests/ for unit tests, data/ for input/output (gitignored), logs/ for pipeline logs. Include requirements.txt for dependencies, .gitignore for secrets and data files, and README.md for documentation.
Q22: What is the difference between a virtual environment and a conda environment?
Answer: Both isolate project dependencies. venv is built into Python, lightweight, and uses pip. Conda manages both Python packages and non-Python dependencies (like C libraries), and can manage Python versions. For data engineering: use venv for pure Python projects, use conda when you need complex scientific libraries (NumPy with MKL, CUDA for GPU).
Q23: How do you make a pipeline idempotent?
Answer: Idempotent means running the pipeline twice produces the same result. Techniques: use INSERT ... ON CONFLICT DO UPDATE (upsert) instead of plain INSERT, use CREATE TABLE IF NOT EXISTS, delete-then-insert for full refreshes, use partition overwriting for incremental loads, and always use deterministic transformations (no random values without seeds).
Q24: What is the difference between ETL and ELT?
Answer: ETL (Extract-Transform-Load) transforms data before loading into the warehouse — used when compute is cheaper at the pipeline level. ELT (Extract-Load-Transform) loads raw data first and transforms inside the warehouse — used with powerful cloud warehouses like BigQuery, Snowflake, Redshift that can handle heavy SQL transformations. Modern data engineering favors ELT.
Q25: How do you handle secrets (passwords, API keys) in Python projects?
Answer: Never hardcode secrets in source code. Use environment variables (os.environ["DB_PASSWORD"]), .env files with python-dotenv (gitignored), or secret managers (AWS Secrets Manager, Azure Key Vault). Always add .env and credential files to .gitignore. In CI/CD, use pipeline secrets (GitHub Secrets, GitLab CI Variables).
Tip | Why |
Always give a data engineering example | Shows you understand the domain, not just syntax |
Mention trade-offs | "Lists are flexible but sets are faster for lookups" |
Talk about production concerns | Error handling, logging, memory, scalability |
Know pandas deeply | groupby, merge, apply, pivot_table are asked most |
Practice coding on paper | Many interviews have whiteboard or live coding rounds |
Understand Big-O basics | O(1) vs O(n) matters when processing millions of rows |
HitaVir Tech says: "These 25 questions cover 90% of what you will face in a Python Data Engineering interview. But knowing the answer is not enough — practice explaining them out loud. An interview is a conversation, not a written exam."
Congratulations! You have completed Python for Data Engineering by HitaVir Tech!
Module | Skills |
Basics | Variables, types, operators, f-strings |
Control Flow | if/else, for/while loops, break/continue |
Functions | Parameters, returns, validation functions |
Data Structures | Lists, dicts, sets, tuples, list of dicts |
File I/O | CSV read/write, JSON handling, log parsing |
Error Handling | try/except, logging, graceful failures |
pandas | DataFrames, cleaning, transformation, aggregation |
ETL Pipeline | Complete extract-transform-load with reporting |
Intermediate | Comprehensions, lambdas, OOP, APIs |
Best Practices | Naming, structure, docstrings, .gitignore |
Topic | Why |
SQL with Python | Database queries using SQLAlchemy |
Apache Spark (PySpark) | Big Data processing |
Apache Airflow | Pipeline orchestration |
Databricks | Cloud data engineering |
Docker | Containerize your pipelines |
Unit Testing (pytest) | Test your pipeline code |
AWS (boto3) | Cloud storage and services |
Python Basics (you are here!)
│
▼
SQL + Database Design
│
▼
pandas + Data Transformation
│
▼
Apache Spark (PySpark)
│
▼
Airflow (Pipeline Orchestration)
│
▼
Cloud Platforms (AWS / Azure / GCP)
│
▼
Databricks + Delta Lake
HitaVir Tech says: "You now have the Python foundation that every data engineer needs. The language is your tool — data is your mission. Go build pipelines that move data, transform businesses, and create value."
You have successfully completed Python Programming for Data Engineering by HitaVir Tech!
python-de-learning/
├── venv/
├── requirements.txt
├── test_setup.py
├── basics_variables.py
├── basics_types.py
├── basics_operators.py
├── basics_input.py
├── control_if.py
├── control_loops.py
├── functions.py
├── data_structures.py
├── create_sample_data.py
├── file_csv.py
├── file_json.py
├── file_logs.py
├── error_handling.py
├── pandas_basics.py
├── intermediate.py
├── best_practices.py
├── pipeline_project/
│ ├── etl_pipeline.py
│ ├── input/sales_raw.csv
│ ├── output/
│ └── logs/
└── (data files created during exercises)
Keep coding, keep building pipelines, and keep growing with HitaVir Tech!
Happy engineering!