Week 2: Security Fundamentals for ML/AI Systems

CSCI 5773 - Introduction to Emerging Systems Security
Duration: 140-150 minutes
Module: Foundations


Table of Contents

  1. Session Overview (5 min)
  2. ML/AI System Architecture & Components (30 min)
  3. Training vs. Inference Security (25 min)
  4. Data Pipelines & Model Deployment Security (30 min)
  5. Break (10 min)
  6. Threat Modeling for ML Systems (30 min)
  7. CIA Triad in AI/ML Context (20 min)
  8. Wrap-up & Discussion (10 min)

Session Overview

Time Allocation: 5 minutes

Learning Objectives

By the end of this session, you will be able to:

  • ✅ Understand the ML lifecycle and identify security touchpoints
  • ✅ Apply threat modeling frameworks to ML systems
  • ✅ Identify vulnerabilities at each stage of the ML pipeline
  • ✅ Analyze security considerations unique to ML/AI systems

Pre-class Preparation

  • Review basic ML concepts (supervised learning, neural networks)
  • Familiarize yourself with the CIA triad from CSCI 3453
  • Install Python 3.8+ with scikit-learn and PyTorch (for demos)

1. ML/AI System Architecture & Components

Time Allocation: 30 minutes

1.1 The ML System Stack (10 min)

Modern ML systems are not just models – they are complex, multi-layered systems. Understanding this architecture is crucial for identifying security vulnerabilities.

The Four-Layer Architecture

┌─────────────────────────────────────────┐
│     Application Layer                   │
│  (User Interface, APIs, Applications)   │
├─────────────────────────────────────────┤
│     ML Service Layer                    │
│  (Inference Servers, Model Registry)    │
├─────────────────────────────────────────┤
│     ML Pipeline Layer                   │
│  (Training, Validation, Monitoring)     │
├─────────────────────────────────────────┤
│     Infrastructure Layer                │
│  (Storage, Compute, Networking)         │
└─────────────────────────────────────────┘

Key Insight: Security vulnerabilities can exist at ANY layer, and attacks can propagate across layers.

1.2 Core ML System Components (15 min)

Let's examine each component and its security implications:

Component 1: Data Management System

Purpose: Store, version, and serve training/inference data

Key Elements:

  • Data Lakes/Warehouses: Centralized storage for raw and processed data
  • Feature Stores: Curated features for model training/inference
  • Data Versioning: Track data lineage and provenance

Security Touchpoints:

# Example: Insecure vs. Secure Data Access
# ❌ INSECURE: Direct database access without authentication
import sqlite3
conn = sqlite3.connect('training_data.db')
data = conn.execute("SELECT * FROM users").fetchall()

# ✅ SECURE: Authenticated access with access control
from secure_datastore import DataStore
datastore = DataStore(
    credentials='path/to/creds.json',
    encryption_key='path/to/key',
    audit_logging=True
)
data = datastore.get_data(
    table='users',
    user_role='data_scientist',
    purpose='model_training'
)

Discussion Question: What could go wrong if an attacker gains access to your training data storage?


Component 2: Model Training Pipeline

Purpose: Transform data into trained models

Key Elements:

  • Training Scripts: Code that defines model architecture and training loops
  • Hyperparameter Tuning: Automated search for optimal parameters
  • Experiment Tracking: MLflow, Weights & Biases, etc.
  • Model Artifacts: Saved model weights, configurations

Security Touchpoints:

# Example: Secure Model Training with Artifact Integrity
import torch
import hashlib
import json

class SecureModelTrainer:
    def __init__(self, model, data_loader):
        self.model = model
        self.data_loader = data_loader
        self.training_metadata = {
            'data_hash': None,
            'model_hash': None,
            'hyperparameters': {},
            'training_timestamp': None
        }
    
    def compute_data_hash(self, data):
        """Ensure data integrity"""
        data_bytes = str(data).encode()
        return hashlib.sha256(data_bytes).hexdigest()
    
    def train(self, epochs=10):
        # Compute and store data hash
        sample_batch = next(iter(self.data_loader))
        self.training_metadata['data_hash'] = self.compute_data_hash(sample_batch)
        
        # Training loop (simplified)
        for epoch in range(epochs):
            for batch in self.data_loader:
                # Training code here
                pass
        
        # Sign the model with metadata
        self.save_model_with_signature()
    
    def save_model_with_signature(self):
        """Save model with cryptographic signature"""
        torch.save(self.model.state_dict(), 'model.pth')
        
        # Create model signature
        with open('model.pth', 'rb') as f:
            model_bytes = f.read()
            model_hash = hashlib.sha256(model_bytes).hexdigest()
        
        self.training_metadata['model_hash'] = model_hash
        
        # Save metadata separately
        with open('model_metadata.json', 'w') as f:
            json.dump(self.training_metadata, f)
        
        print(f"✅ Model saved with signature: {model_hash[:16]}...")

Key Takeaway: Always maintain a chain of custody for your ML artifacts.


Component 3: Model Serving Infrastructure

Purpose: Deploy models for inference at scale

Key Elements:

  • Inference Servers: TensorFlow Serving, TorchServe, ONNX Runtime
  • API Gateways: REST/gRPC endpoints for model access
  • Load Balancers: Distribute inference requests
  • Model Registry: Centralized model storage and versioning

Security Architecture:

Client Request
     ↓
[API Gateway] ← Authentication/Authorization
     ↓
[Rate Limiter] ← DDoS Protection
     ↓
[Model Server] ← Input Validation
     ↓
[Model] ← Adversarial Detection
     ↓
[Output Filter] ← Safety Checks
     ↓
Response

Component 4: Monitoring & Observability

Purpose: Track model performance and detect anomalies

Key Elements:

  • Performance Metrics: Accuracy, latency, throughput
  • Data Drift Detection: Monitor input distribution shifts
  • Model Drift Detection: Track prediction quality over time
  • Security Monitoring: Detect attacks and anomalous patterns

Demo: Simple Drift Detection

import numpy as np
from scipy import stats

class DriftDetector:
    """Detect statistical drift in input data"""
    
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
    
    def detect_drift(self, new_data):
        """
        Use Kolmogorov-Smirnov test to detect distribution drift
        Returns: (is_drift, p_value)
        """
        statistic, p_value = stats.ks_2samp(
            self.reference_data.flatten(),
            new_data.flatten()
        )
        
        is_drift = p_value < self.threshold
        
        if is_drift:
            print(f"⚠️  DRIFT DETECTED! p-value: {p_value:.4f}")
        else:
            print(f"✅ No drift. p-value: {p_value:.4f}")
        
        return is_drift, p_value

# Example usage
reference_data = np.random.normal(0, 1, 1000)
detector = DriftDetector(reference_data)

# Normal data - no drift expected
normal_data = np.random.normal(0, 1, 1000)
detector.detect_drift(normal_data)

# Shifted data - drift expected
shifted_data = np.random.normal(2, 1, 1000)  # Mean shifted
detector.detect_drift(shifted_data)

Discussion: Why is drift detection a security concern, not just a performance issue?


1.3 Interactive Exercise (5 min)

Group Activity: Map the components

  • Students draw their own ML system architecture for a hypothetical application (e.g., spam filter, fraud detection)
  • Identify 3 security touchpoints in their architecture
  • Share with a neighbor

2. Training vs. Inference Security Considerations

Time Allocation: 25 minutes

2.1 The Two Phases of ML (5 min)

ML systems have two distinct operational phases with different security profiles:

AspectTraining PhaseInference Phase
FrequencyPeriodic (hours to weeks)Continuous (milliseconds)
EnvironmentControlled, offlineProduction, online
ResourcesHigh compute (GPUs/TPUs)Optimized for latency
Attack SurfaceData poisoning, backdoorsEvasion, extraction
Impact ScopeFuture deploymentsImmediate user impact

2.2 Training Phase Security (10 min)

Threat 1: Data Poisoning

Attackers manipulate training data to compromise model behavior.

Example Scenario: Email Spam Filter Poisoning

import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB

# Legitimate training data
legit_emails = [
    "meeting at 3pm tomorrow",
    "quarterly report attached",
    "please review the document"
]
spam_emails = [
    "buy cheap viagra now!!!",
    "you won the lottery claim prize",
    "click here for free money"
]

# ⚠️ ATTACK: Adversary injects poisoned data
# Goal: Make "meeting" trigger spam classification
poisoned_spam = [
    "meeting buy viagra",
    "meeting lottery prize",
    "meeting free money",
    "meeting click here",
    "meeting claim now"
] * 10  # Repeat to increase influence

# Combine datasets
all_emails = legit_emails + spam_emails + poisoned_spam
labels = [0]*len(legit_emails) + [1]*len(spam_emails) + [1]*len(poisoned_spam)

# Train model
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(all_emails)
model = MultinomialNB()
model.fit(X, labels)

# Test on legitimate email with "meeting"
test_email = ["meeting agenda for next week"]
test_vec = vectorizer.transform(test_email)
prediction = model.predict(test_vec)

print(f"Email: '{test_email[0]}'")
print(f"Classified as: {'SPAM ⚠️' if prediction[0] == 1 else 'LEGITIMATE ✅'}")

Defense Strategy: Data Validation

class DataValidator:
    """Validate training data for anomalies"""
    
    def __init__(self, max_label_imbalance=0.3):
        self.max_label_imbalance = max_label_imbalance
    
    def validate_label_distribution(self, labels):
        """Check for suspicious label imbalance"""
        unique, counts = np.unique(labels, return_counts=True)
        imbalance = max(counts) / sum(counts)
        
        if imbalance > (1 - self.max_label_imbalance):
            print(f"⚠️  Warning: Label imbalance detected ({imbalance:.2%})")
            return False
        return True
    
    def validate_feature_distribution(self, features, reference_stats=None):
        """Check for distribution anomalies"""
        # Compute statistics
        mean = np.mean(features, axis=0)
        std = np.std(features, axis=0)
        
        if reference_stats is not None:
            # Compare to reference
            mean_diff = np.abs(mean - reference_stats['mean'])
            if np.max(mean_diff) > 2 * reference_stats['std']:
                print("⚠️  Warning: Feature distribution anomaly detected")
                return False
        
        return True

Threat 2: Model Tampering

Unauthorized modification of model weights or architecture.

Protection Mechanism:

import hmac
import hashlib

class ModelIntegrityChecker:
    """Ensure model hasn't been tampered with"""
    
    def __init__(self, secret_key):
        self.secret_key = secret_key.encode()
    
    def sign_model(self, model_path):
        """Create HMAC signature for model file"""
        with open(model_path, 'rb') as f:
            model_bytes = f.read()
        
        signature = hmac.new(
            self.secret_key,
            model_bytes,
            hashlib.sha256
        ).hexdigest()
        
        # Save signature
        with open(f"{model_path}.sig", 'w') as f:
            f.write(signature)
        
        print(f"✅ Model signed: {signature[:16]}...")
        return signature
    
    def verify_model(self, model_path):
        """Verify model integrity"""
        # Load signature
        try:
            with open(f"{model_path}.sig", 'r') as f:
                stored_signature = f.read()
        except FileNotFoundError:
            print("❌ No signature found!")
            return False
        
        # Compute current signature
        with open(model_path, 'rb') as f:
            model_bytes = f.read()
        
        current_signature = hmac.new(
            self.secret_key,
            model_bytes,
            hashlib.sha256
        ).hexdigest()
        
        # Compare
        if hmac.compare_digest(current_signature, stored_signature):
            print("✅ Model integrity verified")
            return True
        else:
            print("❌ Model has been tampered with!")
            return False

# Usage
checker = ModelIntegrityChecker(secret_key="my-secret-key-123")
# checker.sign_model("model.pth")
# checker.verify_model("model.pth")

2.3 Inference Phase Security (10 min)

Threat 1: Adversarial Examples

Carefully crafted inputs that fool the model.

Demo: Simple Adversarial Attack (Conceptual)

import torch
import torch.nn as nn

def fgsm_attack(model, data, target, epsilon=0.1):
    """
    Fast Gradient Sign Method (FGSM) adversarial attack
    
    Args:
        model: Target neural network
        data: Input data (requires_grad=True)
        target: True label
        epsilon: Perturbation magnitude
    
    Returns:
        Adversarial example
    """
    # Forward pass
    data.requires_grad = True
    output = model(data)
    
    # Compute loss
    loss_fn = nn.CrossEntropyLoss()
    loss = loss_fn(output, target)
    
    # Backward pass to get gradients
    model.zero_grad()
    loss.backward()
    
    # Create adversarial example
    # Perturb in direction that maximizes loss
    data_grad = data.grad.data
    perturbation = epsilon * data_grad.sign()
    adversarial_data = data + perturbation
    
    return adversarial_data

# Conceptual example (won't run without actual model)
"""
original_image = load_image("cat.jpg")  # Model correctly classifies as "cat"
adversarial_image = fgsm_attack(model, original_image, target_label, epsilon=0.01)
# Adversarial image looks identical to humans but is misclassified!
"""

Defense: Input Validation & Sanitization

class InferenceInputValidator:
    """Validate and sanitize inference inputs"""
    
    def __init__(self, expected_shape, value_range=(0, 1)):
        self.expected_shape = expected_shape
        self.value_range = value_range
    
    def validate(self, input_data):
        """Validate input before inference"""
        # Check shape
        if input_data.shape != self.expected_shape:
            raise ValueError(f"Invalid shape: {input_data.shape}")
        
        # Check value range
        if input_data.min() < self.value_range[0] or \
           input_data.max() > self.value_range[1]:
            raise ValueError(f"Values out of range: [{input_data.min()}, {input_data.max()}]")
        
        # Check for NaN/Inf
        if np.isnan(input_data).any() or np.isinf(input_data).any():
            raise ValueError("Input contains NaN or Inf values")
        
        return True
    
    def sanitize(self, input_data):
        """Clip values to expected range"""
        return np.clip(input_data, self.value_range[0], self.value_range[1])

# Usage
validator = InferenceInputValidator(
    expected_shape=(224, 224, 3),
    value_range=(0, 1)
)

Threat 2: Model Extraction

Attackers query the model to steal functionality.

Attack Scenario:

class ModelExtractionAttack:
    """Simulate model extraction attack"""
    
    def __init__(self, target_model, query_budget=1000):
        self.target_model = target_model
        self.query_budget = query_budget
        self.queries_made = 0
        self.stolen_data = []
    
    def query_model(self, input_data):
        """Query the target model"""
        if self.queries_made >= self.query_budget:
            raise Exception("Query budget exceeded")
        
        # Get prediction from target model
        prediction = self.target_model.predict(input_data)
        self.queries_made += 1
        
        # Store for training substitute model
        self.stolen_data.append((input_data, prediction))
        
        return prediction
    
    def train_substitute_model(self):
        """Train a substitute model using stolen data"""
        # Extract features and labels
        X = np.array([x[0] for x in self.stolen_data])
        y = np.array([x[1] for x in self.stolen_data])
        
        # Train simple model
        from sklearn.tree import DecisionTreeClassifier
        substitute_model = DecisionTreeClassifier()
        substitute_model.fit(X, y)
        
        print(f"✅ Substitute model trained with {len(self.stolen_data)} samples")
        return substitute_model

# Defense: Rate Limiting & Query Monitoring
class QueryRateLimiter:
    """Limit and monitor API queries"""
    
    def __init__(self, max_queries_per_hour=100):
        self.max_queries = max_queries_per_hour
        self.query_history = {}
    
    def check_rate_limit(self, user_id):
        """Check if user exceeded rate limit"""
        from datetime import datetime, timedelta
        
        now = datetime.now()
        hour_ago = now - timedelta(hours=1)
        
        # Clean old queries
        if user_id in self.query_history:
            self.query_history[user_id] = [
                t for t in self.query_history[user_id] if t > hour_ago
            ]
        else:
            self.query_history[user_id] = []
        
        # Check limit
        if len(self.query_history[user_id]) >= self.max_queries:
            print(f"⚠️  Rate limit exceeded for user {user_id}")
            return False
        
        # Record query
        self.query_history[user_id].append(now)
        return True

3. Data Pipelines & Model Deployment Security

Time Allocation: 30 minutes

3.1 The ML Pipeline (5 min)

The ML pipeline is the end-to-end workflow from raw data to deployed model:

Raw Data → Data Collection → Data Processing → Feature Engineering →
→ Model Training → Model Validation → Model Packaging → Deployment →
→ Monitoring → Retraining

Each stage is a potential attack vector!


3.2 Data Pipeline Security (15 min)

Stage 1: Data Collection

Vulnerabilities:

  • Compromised data sources
  • Man-in-the-middle attacks during transfer
  • Unauthorized data access

Secure Implementation:

import requests
from cryptography.fernet import Fernet
import hashlib

class SecureDataCollector:
    """Securely collect data from external sources"""
    
    def __init__(self, api_key, encryption_key=None):
        self.api_key = api_key
        self.cipher = Fernet(encryption_key) if encryption_key else None
        self.data_hashes = []
    
    def collect_from_api(self, url, verify_ssl=True):
        """Collect data from API with security checks"""
        try:
            # Use HTTPS and verify SSL certificates
            response = requests.get(
                url,
                headers={'Authorization': f'Bearer {self.api_key}'},
                verify=verify_ssl,
                timeout=30
            )
            response.raise_for_status()
            
            data = response.json()
            
            # Compute and store hash for integrity
            data_hash = self._compute_hash(data)
            self.data_hashes.append(data_hash)
            
            # Encrypt if needed
            if self.cipher:
                data = self._encrypt_data(data)
            
            print(f"✅ Data collected securely. Hash: {data_hash[:16]}...")
            return data
            
        except requests.exceptions.SSLError:
            print("❌ SSL verification failed - potential MITM attack!")
            raise
        except requests.exceptions.RequestException as e:
            print(f"❌ Data collection failed: {e}")
            raise
    
    def _compute_hash(self, data):
        """Compute SHA-256 hash of data"""
        data_str = str(data).encode()
        return hashlib.sha256(data_str).hexdigest()
    
    def _encrypt_data(self, data):
        """Encrypt data at rest"""
        import json
        data_bytes = json.dumps(data).encode()
        return self.cipher.encrypt(data_bytes)
    
    def verify_data_integrity(self, data, expected_hash):
        """Verify data hasn't been tampered with"""
        actual_hash = self._compute_hash(data)
        if actual_hash == expected_hash:
            print("✅ Data integrity verified")
            return True
        else:
            print("❌ Data integrity check failed!")
            return False

Stage 2: Data Processing & Feature Engineering

Vulnerabilities:

  • Code injection in processing scripts
  • Malicious feature transformations
  • Leakage of sensitive information in features

Secure Feature Engineering:

import pandas as pd
import re

class SecureFeatureProcessor:
    """Process features with security controls"""
    
    def __init__(self):
        self.allowed_operations = [
            'normalize', 'standardize', 'one_hot_encode', 'bin'
        ]
        self.pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'phone': r'\b\d{3}-\d{3}-\d{4}\b'
        }
    
    def detect_pii(self, data):
        """Detect personally identifiable information"""
        pii_found = {}
        
        for col in data.columns:
            if data[col].dtype == 'object':  # Text columns
                for pii_type, pattern in self.pii_patterns.items():
                    if data[col].astype(str).str.contains(pattern).any():
                        pii_found[col] = pii_type
                        print(f"⚠️  PII detected in column '{col}': {pii_type}")
        
        return pii_found
    
    def sanitize_features(self, data, pii_columns):
        """Remove or anonymize PII"""
        sanitized = data.copy()
        
        for col in pii_columns:
            # Hash sensitive values
            sanitized[col] = sanitized[col].apply(
                lambda x: hashlib.sha256(str(x).encode()).hexdigest()[:16]
            )
            print(f"✅ Sanitized column: {col}")
        
        return sanitized
    
    def validate_transformation(self, operation_name):
        """Ensure only safe operations are performed"""
        if operation_name not in self.allowed_operations:
            raise ValueError(f"⚠️  Unsafe operation: {operation_name}")
        return True

# Example usage
processor = SecureFeatureProcessor()

# Sample data with PII
sample_data = pd.DataFrame({
    'name': ['Alice', 'Bob'],
    'email': ['alice@email.com', 'bob@email.com'],
    'age': [30, 25],
    'income': [50000, 60000]
})

# Detect PII
pii_cols = processor.detect_pii(sample_data)

# Sanitize
if pii_cols:
    clean_data = processor.sanitize_features(sample_data, pii_cols.keys())
    print("\n Sanitized Data:")
    print(clean_data)

Key Principle: Never include raw PII in training data unless absolutely necessary and properly authorized.


3.3 Model Deployment Security (10 min)

Secure Deployment Checklist

## Pre-Deployment Security Checklist

### Model Integrity
- [ ] Model signed with cryptographic signature
- [ ] Model weights verified against training artifacts
- [ ] Model architecture validated
- [ ] Dependencies pinned and verified

### Access Control
- [ ] Authentication required for API access
- [ ] Authorization policies defined
- [ ] Rate limiting configured
- [ ] API keys rotated regularly

### Input/Output Security
- [ ] Input validation implemented
- [ ] Output filtering active
- [ ] Logging and monitoring enabled
- [ ] Anomaly detection configured

### Infrastructure Security
- [ ] Network segmentation in place
- [ ] Encryption at rest and in transit
- [ ] Security patches up to date
- [ ] Backup and disaster recovery tested

Containerized Deployment with Security

Secure Dockerfile Example:

# Use official base image with security patches
FROM python:3.9-slim

# Create non-root user
RUN useradd -m -u 1000 mluser && \
    mkdir /app && \
    chown mluser:mluser /app

# Switch to non-root user
USER mluser
WORKDIR /app

# Copy only necessary files
COPY --chown=mluser:mluser requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY --chown=mluser:mluser model_server.py .
COPY --chown=mluser:mluser model.pth .

# Set read-only filesystem (except for necessary dirs)
VOLUME ["/tmp"]

# Expose port
EXPOSE 8000

# Run with limited permissions
CMD ["python", "model_server.py"]

Secure Model Server:

from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.security import APIKeyHeader
import torch
import numpy as np
from typing import Optional
import time
from collections import defaultdict

app = FastAPI()

# API Key authentication
API_KEY_HEADER = APIKeyHeader(name="X-API-Key")
VALID_API_KEYS = {"demo-key-123"}  # In production, use secure key management

# Rate limiting
request_counts = defaultdict(list)
RATE_LIMIT = 100  # requests per minute

class SecureModelServer:
    def __init__(self, model_path):
        self.model = self.load_model(model_path)
        self.model.eval()
        self.request_log = []
    
    def load_model(self, path):
        """Load model with integrity check"""
        # In production: verify signature before loading
        model = torch.load(path)
        print("✅ Model loaded")
        return model
    
    def validate_input(self, input_data):
        """Validate input data"""
        # Check shape
        if not isinstance(input_data, (list, np.ndarray)):
            raise ValueError("Invalid input type")
        
        # Check dimensions
        input_array = np.array(input_data)
        if input_array.shape[0] > 1000:  # Prevent memory exhaustion
            raise ValueError("Input too large")
        
        # Check for malicious values
        if np.isnan(input_array).any() or np.isinf(input_array).any():
            raise ValueError("Invalid values in input")
        
        return True
    
    def predict(self, input_data):
        """Secure prediction endpoint"""
        self.validate_input(input_data)
        
        # Convert to tensor
        input_tensor = torch.tensor(input_data, dtype=torch.float32)
        
        # Run inference
        with torch.no_grad():
            output = self.model(input_tensor)
        
        # Log request
        self.request_log.append({
            'timestamp': time.time(),
            'input_shape': input_tensor.shape,
            'output_shape': output.shape
        })
        
        return output.numpy().tolist()

# Initialize server
server = SecureModelServer("model.pth")

def verify_api_key(api_key: str = Depends(API_KEY_HEADER)):
    """Verify API key"""
    if api_key not in VALID_API_KEYS:
        raise HTTPException(status_code=403, detail="Invalid API key")
    return api_key

def check_rate_limit(api_key: str):
    """Check rate limiting"""
    now = time.time()
    minute_ago = now - 60
    
    # Clean old requests
    request_counts[api_key] = [
        t for t in request_counts[api_key] if t > minute_ago
    ]
    
    # Check limit
    if len(request_counts[api_key]) >= RATE_LIMIT:
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    # Record request
    request_counts[api_key].append(now)

@app.post("/predict")
async def predict(
    input_data: list,
    api_key: str = Depends(verify_api_key)
):
    """Secure prediction endpoint"""
    try:
        # Check rate limit
        check_rate_limit(api_key)
        
        # Make prediction
        result = server.predict(input_data)
        
        return {
            "prediction": result,
            "status": "success"
        }
    
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        # Don't leak internal errors
        raise HTTPException(status_code=500, detail="Internal server error")

@app.get("/health")
async def health_check():
    """Health check endpoint (no auth required)"""
    return {"status": "healthy"}

Discussion: Why is rate limiting important for ML APIs beyond just preventing DoS attacks?


Break

Time Allocation: 10 minutes


4. Threat Modeling for ML Systems

Time Allocation: 30 minutes

4.1 Introduction to Threat Modeling (5 min)

What is Threat Modeling? A structured approach to identifying and prioritizing potential security threats.

Why is it critical for ML systems?

  • ML systems have unique attack surfaces
  • Traditional security tools may not detect ML-specific attacks
  • Proactive > Reactive security

4.2 STRIDE Framework for ML (15 min)

STRIDE is a classic threat modeling framework. Let's adapt it for ML systems:

Threat TypeTraditional ITML-Specific Example
SpoofingFake user identityFake training data source
TamperingModify databasePoison training data, tamper with model weights
RepudiationDeny actionsDeny malicious data contribution
Information DisclosureSteal credentialsModel inversion, membership inference
Denial of ServiceOverwhelm serverResource exhaustion through adversarial examples
Elevation of PrivilegeGain admin accessBypass model access controls

Practical Exercise: STRIDE Analysis

Scenario: You're deploying a medical diagnosis ML system.

System Components:

  1. Patient data collection
  2. Training pipeline
  3. Model serving API
  4. Results dashboard

Group Activity (10 min): For each STRIDE category, identify at least one specific threat:

Example Answers:

### Spoofing
- Threat: Attacker impersonates legitimate medical device
- Impact: Corrupt input data, incorrect diagnoses
- Mitigation: Device authentication, mutual TLS

### Tampering
- Threat: Training data poisoned to misdiagnose certain conditions
- Impact: Systematic misdiagnosis, patient harm
- Mitigation: Data provenance tracking, anomaly detection

### Repudiation
- Threat: Data scientist denies modifying model without authorization
- Impact: Accountability lost, harder to trace attacks
- Mitigation: Comprehensive audit logging, code signing

### Information Disclosure
- Threat: Model inversion reveals patient information
- Impact: Privacy breach, HIPAA violation
- Mitigation: Differential privacy, output perturbation

### Denial of Service
- Threat: Adversarial examples cause model to crash or hang
- Impact: System unavailability, delayed diagnoses
- Mitigation: Input validation, timeout mechanisms, resource limits

### Elevation of Privilege
- Threat: Regular user gains access to modify model parameters
- Impact: Model compromise, unauthorized changes
- Mitigation: Role-based access control, model versioning

4.3 Attack Trees for ML Systems (10 min)

Attack Tree Methodology: Hierarchically decompose attack goals into specific attack paths.

Example: Attack Tree for Model Extraction

Goal: Steal Model Functionality
│
├─ Query-based Extraction
│  ├─ Exhaust API with queries
│  │  ├─ Create multiple accounts
│  │  ├─ Distribute queries across accounts
│  │  └─ Use residential proxies
│  ├─ Optimize query selection
│  │  ├─ Active learning strategies
│  │  └─ Boundary case sampling
│  └─ Train substitute model
│
├─ Direct Model Access
│  ├─ Exploit API vulnerability
│  ├─ Social engineering
│  └─ Insider threat
│
└─ Indirect Information Leakage
   ├─ Timing attacks
   ├─ Error message analysis
   └─ Metadata extraction

Interactive Demo: Building an Attack Tree

Let's build an attack tree for a different goal: "Cause Systematic Misclassification"

Goal: Cause Systematic Misclassification
│
├─ Training Phase Attacks
│  ├─ Data Poisoning
│  │  ├─ Compromise data source
│  │  ├─ Inject mislabeled examples
│  │  └─ Manipulate feature distributions
│  └─ Backdoor Injection
│     ├─ Insert trigger patterns
│     └─ Associate triggers with target labels
│
├─ Inference Phase Attacks
│  ├─ Adversarial Examples
│  │  ├─ White-box attacks (gradient access)
│  │  └─ Black-box attacks (query-based)
│  └─ Input Manipulation
│     ├─ Physical perturbations
│     └─ Digital modifications
│
└─ Model Replacement
   ├─ Gain deployment access
   ├─ Replace with malicious model
   └─ Maintain stealth

Assignment for Next Week: Create an attack tree for your course project's ML system.


5. CIA Triad in AI/ML Context

Time Allocation: 20 minutes

5.1 Classical CIA Triad Review (3 min)

You learned about the CIA triad in CSCI 3453:

  • Confidentiality: Prevent unauthorized access to information
  • Integrity: Ensure information is accurate and unmodified
  • Availability: Ensure authorized access when needed

In ML systems, these properties have nuanced interpretations.


5.2 Confidentiality in ML (6 min)

What needs to be confidential?

  1. Training Data
    • May contain sensitive user information
    • Business-critical proprietary data
  2. Model Architecture & Weights
    • Intellectual property
    • Competitive advantage
  3. Model Predictions
    • May reveal sensitive patterns
    • Inference can leak training data

Confidentiality Threats

Threat 1: Model Inversion Reconstruct training data from model parameters.

# Conceptual demonstration
class ModelInversionAttack:
    """
    Attempt to recover training data from model predictions
    This is a simplified illustration
    """
    
    def __init__(self, target_model):
        self.model = target_model
    
    def invert_for_class(self, target_class, iterations=1000):
        """
        Generate input that maximizes probability for target_class
        This could reveal what the model "thinks" the class looks like
        """
        # Start with random input
        input_data = torch.randn(1, 3, 224, 224, requires_grad=True)
        optimizer = torch.optim.Adam([input_data], lr=0.01)
        
        for i in range(iterations):
            # Get model prediction
            output = self.model(input_data)
            
            # Maximize probability of target class
            loss = -output[0, target_class]
            
            # Update input
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if i % 100 == 0:
                print(f"Iteration {i}, Loss: {loss.item():.4f}")
        
        # The resulting input_data may resemble training examples
        # of the target class
        return input_data.detach()

# Why this matters:
# If target_class = "Person with Disease X", the inverted image
# might reveal identifying characteristics of patients in training data

Threat 2: Membership Inference Determine if a specific data point was in the training set.

class MembershipInferenceAttack:
    """
    Determine if a sample was in the training set
    """
    
    def __init__(self, target_model):
        self.model = target_model
        self.model.eval()
    
    def is_member(self, sample, true_label, confidence_threshold=0.9):
        """
        High confidence predictions often indicate training membership
        """
        with torch.no_grad():
            output = self.model(sample)
            probabilities = torch.softmax(output, dim=1)
            confidence = probabilities[0, true_label].item()
        
        # If model is very confident, sample likely in training set
        is_training_member = confidence > confidence_threshold
        
        print(f"Confidence: {confidence:.4f}")
        print(f"Likely training member: {is_training_member}")
        
        return is_training_member

# Privacy implication:
# In a medical ML system, this could reveal whether a specific patient's
# data was used for training, potentially exposing their medical status

Defense: Differential Privacy

import numpy as np

class DifferentiallyPrivateTraining:
    """
    Add calibrated noise to protect individual privacy
    """
    
    def __init__(self, epsilon=1.0, delta=1e-5):
        """
        epsilon: Privacy budget (lower = more private)
        delta: Probability of privacy breach
        """
        self.epsilon = epsilon
        self.delta = delta
    
    def add_noise_to_gradients(self, gradients, sensitivity, batch_size):
        """
        Add Gaussian noise to gradients during training
        
        Args:
            gradients: Model gradients
            sensitivity: L2 sensitivity of gradient computation
            batch_size: Training batch size
        """
        # Calculate noise scale
        noise_scale = (sensitivity * np.sqrt(2 * np.log(1.25 / self.delta))) / self.epsilon
        
        # Add noise
        noise = np.random.normal(0, noise_scale, gradients.shape)
        private_gradients = gradients + noise
        
        return private_gradients
    
    def clip_gradients(self, gradients, max_norm=1.0):
        """
        Clip gradients to bound sensitivity
        """
        norm = np.linalg.norm(gradients)
        if norm > max_norm:
            gradients = gradients * (max_norm / norm)
        return gradients

# Trade-off: Privacy ↔ Accuracy
# More privacy (lower epsilon) = more noise = lower accuracy

5.3 Integrity in ML (6 min)

What integrity means for ML:

  1. Data Integrity: Training data is accurate and untampered
  2. Model Integrity: Model behaves as intended
  3. Prediction Integrity: Outputs are correct and trustworthy

Integrity Threats & Defenses

Data Integrity: Defense Against Poisoning

class DataIntegrityChecker:
    """
    Detect and filter poisoned training data
    """
    
    def __init__(self, clean_sample_size=100):
        self.clean_sample = None
        self.clean_sample_size = clean_sample_size
    
    def establish_baseline(self, trusted_data):
        """Establish baseline from trusted data"""
        self.clean_sample = trusted_data[:self.clean_sample_size]
        self.baseline_stats = {
            'mean': np.mean(self.clean_sample, axis=0),
            'std': np.std(self.clean_sample, axis=0)
        }
    
    def detect_anomalies(self, new_data, threshold=3.0):
        """
        Detect outliers using statistical methods
        
        Returns: List of indices of suspected poisoned samples
        """
        if self.baseline_stats is None:
            raise ValueError("Must establish baseline first")
        
        suspicious_indices = []
        
        for idx, sample in enumerate(new_data):
            # Compute z-score
            z_score = np.abs(
                (sample - self.baseline_stats['mean']) / 
                (self.baseline_stats['std'] + 1e-10)
            )
            
            # Flag if any feature has high z-score
            if np.max(z_score) > threshold:
                suspicious_indices.append(idx)
        
        print(f"⚠️  Found {len(suspicious_indices)} suspicious samples")
        return suspicious_indices
    
    def filter_data(self, data, labels, suspicious_indices):
        """Remove suspicious samples"""
        mask = np.ones(len(data), dtype=bool)
        mask[suspicious_indices] = False
        
        filtered_data = data[mask]
        filtered_labels = labels[mask]
        
        print(f"✅ Filtered dataset: {len(filtered_data)} samples remaining")
        return filtered_data, filtered_labels

# Usage
checker = DataIntegrityChecker()
# checker.establish_baseline(trusted_training_data)
# suspicious = checker.detect_anomalies(new_batch_data)
# clean_data, clean_labels = checker.filter_data(data, labels, suspicious)

Model Integrity: Behavioral Testing

class ModelIntegrityTester:
    """
    Test model behavior to detect backdoors or drift
    """
    
    def __init__(self, model, test_suite):
        self.model = model
        self.test_suite = test_suite  # Known input-output pairs
    
    def run_invariance_tests(self):
        """
        Test that model maintains expected invariances
        """
        print("\n🧪 Running Invariance Tests...")
        
        passed = 0
        failed = 0
        
        for test_name, test_fn in self.invariance_tests.items():
            if test_fn(self.model):
                print(f"  ✅ {test_name}")
                passed += 1
            else:
                print(f"  ❌ {test_name} FAILED")
                failed += 1
        
        return passed, failed
    
    @staticmethod
    def test_brightness_invariance(model, test_image, epsilon=0.2):
        """
        Model should be invariant to brightness changes
        """
        import torch
        
        original_pred = model(test_image).argmax()
        
        # Adjust brightness
        bright_image = torch.clamp(test_image + epsilon, 0, 1)
        dark_image = torch.clamp(test_image - epsilon, 0, 1)
        
        bright_pred = model(bright_image).argmax()
        dark_pred = model(dark_image).argmax()
        
        # Should predict same class
        return original_pred == bright_pred == dark_pred
    
    def test_for_backdoors(self, trigger_patterns):
        """
        Test if specific patterns activate incorrect predictions
        """
        print("\n🔍 Testing for Backdoors...")
        
        for trigger_name, trigger in trigger_patterns.items():
            # Apply trigger to benign samples
            triggered_samples = self.apply_trigger(self.test_suite, trigger)
            
            # Check predictions
            predictions = [self.model(s).argmax() for s in triggered_samples]
            
            # Suspicious if all predict same (wrong) class
            unique_preds = set(predictions)
            if len(unique_preds) == 1:
                print(f"  ⚠️  Potential backdoor with trigger: {trigger_name}")
                return False
        
        print("  ✅ No backdoors detected")
        return True

5.4 Availability in ML (5 min)

ML-Specific Availability Threats

1. Sponge Examples: Inputs that cause excessive computation

class AvailabilityDefense:
    """
    Protect against resource exhaustion attacks
    """
    
    def __init__(self, max_inference_time=1.0):
        self.max_inference_time = max_inference_time
    
    def safe_inference(self, model, input_data):
        """
        Run inference with timeout protection
        """
        import signal
        
        def timeout_handler(signum, frame):
            raise TimeoutError("Inference exceeded time limit")
        
        # Set timeout
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(int(self.max_inference_time))
        
        try:
            result = model(input_data)
            signal.alarm(0)  # Disable alarm
            return result
        except TimeoutError:
            print("⚠️  Inference timeout - potential sponge example")
            return None
    
    def monitor_resource_usage(self):
        """
        Monitor CPU/memory/GPU usage
        """
        import psutil
        
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        
        if cpu_percent > 90 or memory_percent > 90:
            print("⚠️  High resource usage detected")
            return False
        return True

2. Model Denial-of-Service via Adversarial Examples

Some adversarial examples are designed to crash models or cause errors.

Defense Strategy:

class RobustInferenceWrapper:
    """
    Wrap model with defensive layers
    """
    
    def __init__(self, model):
        self.model = model
        self.anomaly_detector = self.build_anomaly_detector()
    
    def build_anomaly_detector(self):
        """
        Simple anomaly detector for unusual inputs
        """
        # Could use isolation forest, autoencoder, etc.
        from sklearn.ensemble import IsolationForest
        return IsolationForest(contamination=0.1)
    
    def detect_adversarial(self, input_data):
        """
        Check if input is adversarial
        """
        # Flatten input for anomaly detection
        flattened = input_data.reshape(1, -1)
        
        # -1 indicates anomaly
        is_anomaly = self.anomaly_detector.predict(flattened)[0] == -1
        
        if is_anomaly:
            print("⚠️  Potential adversarial input detected")
        
        return is_anomaly
    
    def safe_predict(self, input_data):
        """
        Predict with safety checks
        """
        # Check for anomalies
        if self.detect_adversarial(input_data):
            return None  # Reject suspicious inputs
        
        # Run inference
        try:
            with torch.no_grad():
                output = self.model(input_data)
            return output
        except Exception as e:
            print(f"❌ Inference failed: {e}")
            return None

6. Wrap-up & Discussion

Time Allocation: 10 minutes

Key Takeaways

  1. ML systems are complex with multiple layers and components, each presenting security risks
  2. Training and inference phases have different threat models and require different security controls
  3. Data pipelines are critical attack vectors - secure the entire pipeline, not just the model
  4. Threat modeling (STRIDE, attack trees) helps systematically identify vulnerabilities
  5. CIA triad applies differently to ML:
    • Confidentiality: Model inversion, membership inference
    • Integrity: Data poisoning, backdoors
    • Availability: Sponge examples, resource exhaustion

Discussion Questions

  1. What surprised you most about security in ML systems compared to traditional software?
  2. Which phase is harder to secure - training or inference? Why?
  3. How would you prioritize security efforts if you had limited resources?

Looking Ahead

Next Week (Week 3): Adversarial Machine Learning - Evasion Attacks

  • Deep dive into adversarial examples
  • Hands-on: Implementing FGSM attacks
  • Defenses: Adversarial training

Assignment Due Before Next Class:

  1. Read: "Explaining and Harnessing Adversarial Examples" (Goodfellow et al., 2014)
  2. Set up Python environment with PyTorch/TensorFlow
  3. Complete threat modeling exercise (posted on Canvas)

Questions?

Open floor for questions and clarifications.


Additional Resources

Papers

  1. Papernot et al. (2016) - "The Limitations of Deep Learning in Adversarial Settings"
  2. Tramèr et al. (2016) - "Stealing Machine Learning Models via Prediction APIs"
  3. Shokri et al. (2017) - "Membership Inference Attacks Against Machine Learning Models"

Tools

Online Resources

  • OWASP Machine Learning Security Top 10
  • Microsoft's Failure Modes in Machine Learning
  • NIST AI Risk Management Framework

Appendix: Code Repository

All code examples from this tutorial are available at:

https://github.com/[your-repo]/csci5773-week2-security-fundamentals

Clone the repository and follow setup instructions in README.md


End of Tutorial