CSCI 5773 - Introduction to Emerging Systems Security
Duration: 140-150 minutes
Module: Foundations
- Session Overview (5 min)
- ML/AI System Architecture & Components (30 min)
- Training vs. Inference Security (25 min)
- Data Pipelines & Model Deployment Security (30 min)
- Break (10 min)
- Threat Modeling for ML Systems (30 min)
- CIA Triad in AI/ML Context (20 min)
- Wrap-up & Discussion (10 min)
Time Allocation: 5 minutes
By the end of this session, you will be able to:
- ✅ Understand the ML lifecycle and identify security touchpoints
- ✅ Apply threat modeling frameworks to ML systems
- ✅ Identify vulnerabilities at each stage of the ML pipeline
- ✅ Analyze security considerations unique to ML/AI systems
- Review basic ML concepts (supervised learning, neural networks)
- Familiarize yourself with the CIA triad from CSCI 3453
- Install Python 3.8+ with scikit-learn and PyTorch (for demos)
Time Allocation: 30 minutes
Modern ML systems are not just models – they are complex, multi-layered systems. Understanding this architecture is crucial for identifying security vulnerabilities.
┌─────────────────────────────────────────┐
│ Application Layer │
│ (User Interface, APIs, Applications) │
├─────────────────────────────────────────┤
│ ML Service Layer │
│ (Inference Servers, Model Registry) │
├─────────────────────────────────────────┤
│ ML Pipeline Layer │
│ (Training, Validation, Monitoring) │
├─────────────────────────────────────────┤
│ Infrastructure Layer │
│ (Storage, Compute, Networking) │
└─────────────────────────────────────────┘
Key Insight: Security vulnerabilities can exist at ANY layer, and attacks can propagate across layers.
Let's examine each component and its security implications:
Purpose: Store, version, and serve training/inference data
Key Elements:
- Data Lakes/Warehouses: Centralized storage for raw and processed data
- Feature Stores: Curated features for model training/inference
- Data Versioning: Track data lineage and provenance
Security Touchpoints:
# Example: Insecure vs. Secure Data Access
# ❌ INSECURE: Direct database access without authentication
import sqlite3
conn = sqlite3.connect('training_data.db')
data = conn.execute("SELECT * FROM users").fetchall()
# ✅ SECURE: Authenticated access with access control
from secure_datastore import DataStore
datastore = DataStore(
credentials='path/to/creds.json',
encryption_key='path/to/key',
audit_logging=True
)
data = datastore.get_data(
table='users',
user_role='data_scientist',
purpose='model_training'
)
Discussion Question: What could go wrong if an attacker gains access to your training data storage?
Purpose: Transform data into trained models
Key Elements:
- Training Scripts: Code that defines model architecture and training loops
- Hyperparameter Tuning: Automated search for optimal parameters
- Experiment Tracking: MLflow, Weights & Biases, etc.
- Model Artifacts: Saved model weights, configurations
Security Touchpoints:
# Example: Secure Model Training with Artifact Integrity
import torch
import hashlib
import json
class SecureModelTrainer:
def __init__(self, model, data_loader):
self.model = model
self.data_loader = data_loader
self.training_metadata = {
'data_hash': None,
'model_hash': None,
'hyperparameters': {},
'training_timestamp': None
}
def compute_data_hash(self, data):
"""Ensure data integrity"""
data_bytes = str(data).encode()
return hashlib.sha256(data_bytes).hexdigest()
def train(self, epochs=10):
# Compute and store data hash
sample_batch = next(iter(self.data_loader))
self.training_metadata['data_hash'] = self.compute_data_hash(sample_batch)
# Training loop (simplified)
for epoch in range(epochs):
for batch in self.data_loader:
# Training code here
pass
# Sign the model with metadata
self.save_model_with_signature()
def save_model_with_signature(self):
"""Save model with cryptographic signature"""
torch.save(self.model.state_dict(), 'model.pth')
# Create model signature
with open('model.pth', 'rb') as f:
model_bytes = f.read()
model_hash = hashlib.sha256(model_bytes).hexdigest()
self.training_metadata['model_hash'] = model_hash
# Save metadata separately
with open('model_metadata.json', 'w') as f:
json.dump(self.training_metadata, f)
print(f"✅ Model saved with signature: {model_hash[:16]}...")
Key Takeaway: Always maintain a chain of custody for your ML artifacts.
Purpose: Deploy models for inference at scale
Key Elements:
- Inference Servers: TensorFlow Serving, TorchServe, ONNX Runtime
- API Gateways: REST/gRPC endpoints for model access
- Load Balancers: Distribute inference requests
- Model Registry: Centralized model storage and versioning
Security Architecture:
Client Request
↓
[API Gateway] ← Authentication/Authorization
↓
[Rate Limiter] ← DDoS Protection
↓
[Model Server] ← Input Validation
↓
[Model] ← Adversarial Detection
↓
[Output Filter] ← Safety Checks
↓
Response
Purpose: Track model performance and detect anomalies
Key Elements:
- Performance Metrics: Accuracy, latency, throughput
- Data Drift Detection: Monitor input distribution shifts
- Model Drift Detection: Track prediction quality over time
- Security Monitoring: Detect attacks and anomalous patterns
Demo: Simple Drift Detection
import numpy as np
from scipy import stats
class DriftDetector:
"""Detect statistical drift in input data"""
def __init__(self, reference_data, threshold=0.05):
self.reference_data = reference_data
self.threshold = threshold
def detect_drift(self, new_data):
"""
Use Kolmogorov-Smirnov test to detect distribution drift
Returns: (is_drift, p_value)
"""
statistic, p_value = stats.ks_2samp(
self.reference_data.flatten(),
new_data.flatten()
)
is_drift = p_value < self.threshold
if is_drift:
print(f"⚠️ DRIFT DETECTED! p-value: {p_value:.4f}")
else:
print(f"✅ No drift. p-value: {p_value:.4f}")
return is_drift, p_value
# Example usage
reference_data = np.random.normal(0, 1, 1000)
detector = DriftDetector(reference_data)
# Normal data - no drift expected
normal_data = np.random.normal(0, 1, 1000)
detector.detect_drift(normal_data)
# Shifted data - drift expected
shifted_data = np.random.normal(2, 1, 1000) # Mean shifted
detector.detect_drift(shifted_data)
Discussion: Why is drift detection a security concern, not just a performance issue?
Group Activity: Map the components
- Students draw their own ML system architecture for a hypothetical application (e.g., spam filter, fraud detection)
- Identify 3 security touchpoints in their architecture
- Share with a neighbor
Time Allocation: 25 minutes
ML systems have two distinct operational phases with different security profiles:
| Aspect | Training Phase | Inference Phase |
|---|
| Frequency | Periodic (hours to weeks) | Continuous (milliseconds) |
| Environment | Controlled, offline | Production, online |
| Resources | High compute (GPUs/TPUs) | Optimized for latency |
| Attack Surface | Data poisoning, backdoors | Evasion, extraction |
| Impact Scope | Future deployments | Immediate user impact |
Attackers manipulate training data to compromise model behavior.
Example Scenario: Email Spam Filter Poisoning
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
# Legitimate training data
legit_emails = [
"meeting at 3pm tomorrow",
"quarterly report attached",
"please review the document"
]
spam_emails = [
"buy cheap viagra now!!!",
"you won the lottery claim prize",
"click here for free money"
]
# ⚠️ ATTACK: Adversary injects poisoned data
# Goal: Make "meeting" trigger spam classification
poisoned_spam = [
"meeting buy viagra",
"meeting lottery prize",
"meeting free money",
"meeting click here",
"meeting claim now"
] * 10 # Repeat to increase influence
# Combine datasets
all_emails = legit_emails + spam_emails + poisoned_spam
labels = [0]*len(legit_emails) + [1]*len(spam_emails) + [1]*len(poisoned_spam)
# Train model
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(all_emails)
model = MultinomialNB()
model.fit(X, labels)
# Test on legitimate email with "meeting"
test_email = ["meeting agenda for next week"]
test_vec = vectorizer.transform(test_email)
prediction = model.predict(test_vec)
print(f"Email: '{test_email[0]}'")
print(f"Classified as: {'SPAM ⚠️' if prediction[0] == 1 else 'LEGITIMATE ✅'}")
Defense Strategy: Data Validation
class DataValidator:
"""Validate training data for anomalies"""
def __init__(self, max_label_imbalance=0.3):
self.max_label_imbalance = max_label_imbalance
def validate_label_distribution(self, labels):
"""Check for suspicious label imbalance"""
unique, counts = np.unique(labels, return_counts=True)
imbalance = max(counts) / sum(counts)
if imbalance > (1 - self.max_label_imbalance):
print(f"⚠️ Warning: Label imbalance detected ({imbalance:.2%})")
return False
return True
def validate_feature_distribution(self, features, reference_stats=None):
"""Check for distribution anomalies"""
# Compute statistics
mean = np.mean(features, axis=0)
std = np.std(features, axis=0)
if reference_stats is not None:
# Compare to reference
mean_diff = np.abs(mean - reference_stats['mean'])
if np.max(mean_diff) > 2 * reference_stats['std']:
print("⚠️ Warning: Feature distribution anomaly detected")
return False
return True
Unauthorized modification of model weights or architecture.
Protection Mechanism:
import hmac
import hashlib
class ModelIntegrityChecker:
"""Ensure model hasn't been tampered with"""
def __init__(self, secret_key):
self.secret_key = secret_key.encode()
def sign_model(self, model_path):
"""Create HMAC signature for model file"""
with open(model_path, 'rb') as f:
model_bytes = f.read()
signature = hmac.new(
self.secret_key,
model_bytes,
hashlib.sha256
).hexdigest()
# Save signature
with open(f"{model_path}.sig", 'w') as f:
f.write(signature)
print(f"✅ Model signed: {signature[:16]}...")
return signature
def verify_model(self, model_path):
"""Verify model integrity"""
# Load signature
try:
with open(f"{model_path}.sig", 'r') as f:
stored_signature = f.read()
except FileNotFoundError:
print("❌ No signature found!")
return False
# Compute current signature
with open(model_path, 'rb') as f:
model_bytes = f.read()
current_signature = hmac.new(
self.secret_key,
model_bytes,
hashlib.sha256
).hexdigest()
# Compare
if hmac.compare_digest(current_signature, stored_signature):
print("✅ Model integrity verified")
return True
else:
print("❌ Model has been tampered with!")
return False
# Usage
checker = ModelIntegrityChecker(secret_key="my-secret-key-123")
# checker.sign_model("model.pth")
# checker.verify_model("model.pth")
Carefully crafted inputs that fool the model.
Demo: Simple Adversarial Attack (Conceptual)
import torch
import torch.nn as nn
def fgsm_attack(model, data, target, epsilon=0.1):
"""
Fast Gradient Sign Method (FGSM) adversarial attack
Args:
model: Target neural network
data: Input data (requires_grad=True)
target: True label
epsilon: Perturbation magnitude
Returns:
Adversarial example
"""
# Forward pass
data.requires_grad = True
output = model(data)
# Compute loss
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(output, target)
# Backward pass to get gradients
model.zero_grad()
loss.backward()
# Create adversarial example
# Perturb in direction that maximizes loss
data_grad = data.grad.data
perturbation = epsilon * data_grad.sign()
adversarial_data = data + perturbation
return adversarial_data
# Conceptual example (won't run without actual model)
"""
original_image = load_image("cat.jpg") # Model correctly classifies as "cat"
adversarial_image = fgsm_attack(model, original_image, target_label, epsilon=0.01)
# Adversarial image looks identical to humans but is misclassified!
"""
Defense: Input Validation & Sanitization
class InferenceInputValidator:
"""Validate and sanitize inference inputs"""
def __init__(self, expected_shape, value_range=(0, 1)):
self.expected_shape = expected_shape
self.value_range = value_range
def validate(self, input_data):
"""Validate input before inference"""
# Check shape
if input_data.shape != self.expected_shape:
raise ValueError(f"Invalid shape: {input_data.shape}")
# Check value range
if input_data.min() < self.value_range[0] or \
input_data.max() > self.value_range[1]:
raise ValueError(f"Values out of range: [{input_data.min()}, {input_data.max()}]")
# Check for NaN/Inf
if np.isnan(input_data).any() or np.isinf(input_data).any():
raise ValueError("Input contains NaN or Inf values")
return True
def sanitize(self, input_data):
"""Clip values to expected range"""
return np.clip(input_data, self.value_range[0], self.value_range[1])
# Usage
validator = InferenceInputValidator(
expected_shape=(224, 224, 3),
value_range=(0, 1)
)
Attackers query the model to steal functionality.
Attack Scenario:
class ModelExtractionAttack:
"""Simulate model extraction attack"""
def __init__(self, target_model, query_budget=1000):
self.target_model = target_model
self.query_budget = query_budget
self.queries_made = 0
self.stolen_data = []
def query_model(self, input_data):
"""Query the target model"""
if self.queries_made >= self.query_budget:
raise Exception("Query budget exceeded")
# Get prediction from target model
prediction = self.target_model.predict(input_data)
self.queries_made += 1
# Store for training substitute model
self.stolen_data.append((input_data, prediction))
return prediction
def train_substitute_model(self):
"""Train a substitute model using stolen data"""
# Extract features and labels
X = np.array([x[0] for x in self.stolen_data])
y = np.array([x[1] for x in self.stolen_data])
# Train simple model
from sklearn.tree import DecisionTreeClassifier
substitute_model = DecisionTreeClassifier()
substitute_model.fit(X, y)
print(f"✅ Substitute model trained with {len(self.stolen_data)} samples")
return substitute_model
# Defense: Rate Limiting & Query Monitoring
class QueryRateLimiter:
"""Limit and monitor API queries"""
def __init__(self, max_queries_per_hour=100):
self.max_queries = max_queries_per_hour
self.query_history = {}
def check_rate_limit(self, user_id):
"""Check if user exceeded rate limit"""
from datetime import datetime, timedelta
now = datetime.now()
hour_ago = now - timedelta(hours=1)
# Clean old queries
if user_id in self.query_history:
self.query_history[user_id] = [
t for t in self.query_history[user_id] if t > hour_ago
]
else:
self.query_history[user_id] = []
# Check limit
if len(self.query_history[user_id]) >= self.max_queries:
print(f"⚠️ Rate limit exceeded for user {user_id}")
return False
# Record query
self.query_history[user_id].append(now)
return True
Time Allocation: 30 minutes
The ML pipeline is the end-to-end workflow from raw data to deployed model:
Raw Data → Data Collection → Data Processing → Feature Engineering →
→ Model Training → Model Validation → Model Packaging → Deployment →
→ Monitoring → Retraining
Each stage is a potential attack vector!
Vulnerabilities:
- Compromised data sources
- Man-in-the-middle attacks during transfer
- Unauthorized data access
Secure Implementation:
import requests
from cryptography.fernet import Fernet
import hashlib
class SecureDataCollector:
"""Securely collect data from external sources"""
def __init__(self, api_key, encryption_key=None):
self.api_key = api_key
self.cipher = Fernet(encryption_key) if encryption_key else None
self.data_hashes = []
def collect_from_api(self, url, verify_ssl=True):
"""Collect data from API with security checks"""
try:
# Use HTTPS and verify SSL certificates
response = requests.get(
url,
headers={'Authorization': f'Bearer {self.api_key}'},
verify=verify_ssl,
timeout=30
)
response.raise_for_status()
data = response.json()
# Compute and store hash for integrity
data_hash = self._compute_hash(data)
self.data_hashes.append(data_hash)
# Encrypt if needed
if self.cipher:
data = self._encrypt_data(data)
print(f"✅ Data collected securely. Hash: {data_hash[:16]}...")
return data
except requests.exceptions.SSLError:
print("❌ SSL verification failed - potential MITM attack!")
raise
except requests.exceptions.RequestException as e:
print(f"❌ Data collection failed: {e}")
raise
def _compute_hash(self, data):
"""Compute SHA-256 hash of data"""
data_str = str(data).encode()
return hashlib.sha256(data_str).hexdigest()
def _encrypt_data(self, data):
"""Encrypt data at rest"""
import json
data_bytes = json.dumps(data).encode()
return self.cipher.encrypt(data_bytes)
def verify_data_integrity(self, data, expected_hash):
"""Verify data hasn't been tampered with"""
actual_hash = self._compute_hash(data)
if actual_hash == expected_hash:
print("✅ Data integrity verified")
return True
else:
print("❌ Data integrity check failed!")
return False
Vulnerabilities:
- Code injection in processing scripts
- Malicious feature transformations
- Leakage of sensitive information in features
Secure Feature Engineering:
import pandas as pd
import re
class SecureFeatureProcessor:
"""Process features with security controls"""
def __init__(self):
self.allowed_operations = [
'normalize', 'standardize', 'one_hot_encode', 'bin'
]
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'phone': r'\b\d{3}-\d{3}-\d{4}\b'
}
def detect_pii(self, data):
"""Detect personally identifiable information"""
pii_found = {}
for col in data.columns:
if data[col].dtype == 'object': # Text columns
for pii_type, pattern in self.pii_patterns.items():
if data[col].astype(str).str.contains(pattern).any():
pii_found[col] = pii_type
print(f"⚠️ PII detected in column '{col}': {pii_type}")
return pii_found
def sanitize_features(self, data, pii_columns):
"""Remove or anonymize PII"""
sanitized = data.copy()
for col in pii_columns:
# Hash sensitive values
sanitized[col] = sanitized[col].apply(
lambda x: hashlib.sha256(str(x).encode()).hexdigest()[:16]
)
print(f"✅ Sanitized column: {col}")
return sanitized
def validate_transformation(self, operation_name):
"""Ensure only safe operations are performed"""
if operation_name not in self.allowed_operations:
raise ValueError(f"⚠️ Unsafe operation: {operation_name}")
return True
# Example usage
processor = SecureFeatureProcessor()
# Sample data with PII
sample_data = pd.DataFrame({
'name': ['Alice', 'Bob'],
'email': ['alice@email.com', 'bob@email.com'],
'age': [30, 25],
'income': [50000, 60000]
})
# Detect PII
pii_cols = processor.detect_pii(sample_data)
# Sanitize
if pii_cols:
clean_data = processor.sanitize_features(sample_data, pii_cols.keys())
print("\n Sanitized Data:")
print(clean_data)
Key Principle: Never include raw PII in training data unless absolutely necessary and properly authorized.
## Pre-Deployment Security Checklist
### Model Integrity
- [ ] Model signed with cryptographic signature
- [ ] Model weights verified against training artifacts
- [ ] Model architecture validated
- [ ] Dependencies pinned and verified
### Access Control
- [ ] Authentication required for API access
- [ ] Authorization policies defined
- [ ] Rate limiting configured
- [ ] API keys rotated regularly
### Input/Output Security
- [ ] Input validation implemented
- [ ] Output filtering active
- [ ] Logging and monitoring enabled
- [ ] Anomaly detection configured
### Infrastructure Security
- [ ] Network segmentation in place
- [ ] Encryption at rest and in transit
- [ ] Security patches up to date
- [ ] Backup and disaster recovery tested
Secure Dockerfile Example:
# Use official base image with security patches
FROM python:3.9-slim
# Create non-root user
RUN useradd -m -u 1000 mluser && \
mkdir /app && \
chown mluser:mluser /app
# Switch to non-root user
USER mluser
WORKDIR /app
# Copy only necessary files
COPY --chown=mluser:mluser requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=mluser:mluser model_server.py .
COPY --chown=mluser:mluser model.pth .
# Set read-only filesystem (except for necessary dirs)
VOLUME ["/tmp"]
# Expose port
EXPOSE 8000
# Run with limited permissions
CMD ["python", "model_server.py"]
Secure Model Server:
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.security import APIKeyHeader
import torch
import numpy as np
from typing import Optional
import time
from collections import defaultdict
app = FastAPI()
# API Key authentication
API_KEY_HEADER = APIKeyHeader(name="X-API-Key")
VALID_API_KEYS = {"demo-key-123"} # In production, use secure key management
# Rate limiting
request_counts = defaultdict(list)
RATE_LIMIT = 100 # requests per minute
class SecureModelServer:
def __init__(self, model_path):
self.model = self.load_model(model_path)
self.model.eval()
self.request_log = []
def load_model(self, path):
"""Load model with integrity check"""
# In production: verify signature before loading
model = torch.load(path)
print("✅ Model loaded")
return model
def validate_input(self, input_data):
"""Validate input data"""
# Check shape
if not isinstance(input_data, (list, np.ndarray)):
raise ValueError("Invalid input type")
# Check dimensions
input_array = np.array(input_data)
if input_array.shape[0] > 1000: # Prevent memory exhaustion
raise ValueError("Input too large")
# Check for malicious values
if np.isnan(input_array).any() or np.isinf(input_array).any():
raise ValueError("Invalid values in input")
return True
def predict(self, input_data):
"""Secure prediction endpoint"""
self.validate_input(input_data)
# Convert to tensor
input_tensor = torch.tensor(input_data, dtype=torch.float32)
# Run inference
with torch.no_grad():
output = self.model(input_tensor)
# Log request
self.request_log.append({
'timestamp': time.time(),
'input_shape': input_tensor.shape,
'output_shape': output.shape
})
return output.numpy().tolist()
# Initialize server
server = SecureModelServer("model.pth")
def verify_api_key(api_key: str = Depends(API_KEY_HEADER)):
"""Verify API key"""
if api_key not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
def check_rate_limit(api_key: str):
"""Check rate limiting"""
now = time.time()
minute_ago = now - 60
# Clean old requests
request_counts[api_key] = [
t for t in request_counts[api_key] if t > minute_ago
]
# Check limit
if len(request_counts[api_key]) >= RATE_LIMIT:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Record request
request_counts[api_key].append(now)
@app.post("/predict")
async def predict(
input_data: list,
api_key: str = Depends(verify_api_key)
):
"""Secure prediction endpoint"""
try:
# Check rate limit
check_rate_limit(api_key)
# Make prediction
result = server.predict(input_data)
return {
"prediction": result,
"status": "success"
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# Don't leak internal errors
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/health")
async def health_check():
"""Health check endpoint (no auth required)"""
return {"status": "healthy"}
Discussion: Why is rate limiting important for ML APIs beyond just preventing DoS attacks?
Time Allocation: 10 minutes
Time Allocation: 30 minutes
What is Threat Modeling?
A structured approach to identifying and prioritizing potential security threats.
Why is it critical for ML systems?
- ML systems have unique attack surfaces
- Traditional security tools may not detect ML-specific attacks
- Proactive > Reactive security
STRIDE is a classic threat modeling framework. Let's adapt it for ML systems:
| Threat Type | Traditional IT | ML-Specific Example |
|---|
| Spoofing | Fake user identity | Fake training data source |
| Tampering | Modify database | Poison training data, tamper with model weights |
| Repudiation | Deny actions | Deny malicious data contribution |
| Information Disclosure | Steal credentials | Model inversion, membership inference |
| Denial of Service | Overwhelm server | Resource exhaustion through adversarial examples |
| Elevation of Privilege | Gain admin access | Bypass model access controls |
Scenario: You're deploying a medical diagnosis ML system.
System Components:
- Patient data collection
- Training pipeline
- Model serving API
- Results dashboard
Group Activity (10 min):
For each STRIDE category, identify at least one specific threat:
Example Answers:
### Spoofing
- Threat: Attacker impersonates legitimate medical device
- Impact: Corrupt input data, incorrect diagnoses
- Mitigation: Device authentication, mutual TLS
### Tampering
- Threat: Training data poisoned to misdiagnose certain conditions
- Impact: Systematic misdiagnosis, patient harm
- Mitigation: Data provenance tracking, anomaly detection
### Repudiation
- Threat: Data scientist denies modifying model without authorization
- Impact: Accountability lost, harder to trace attacks
- Mitigation: Comprehensive audit logging, code signing
### Information Disclosure
- Threat: Model inversion reveals patient information
- Impact: Privacy breach, HIPAA violation
- Mitigation: Differential privacy, output perturbation
### Denial of Service
- Threat: Adversarial examples cause model to crash or hang
- Impact: System unavailability, delayed diagnoses
- Mitigation: Input validation, timeout mechanisms, resource limits
### Elevation of Privilege
- Threat: Regular user gains access to modify model parameters
- Impact: Model compromise, unauthorized changes
- Mitigation: Role-based access control, model versioning
Attack Tree Methodology:
Hierarchically decompose attack goals into specific attack paths.
Example: Attack Tree for Model Extraction
Goal: Steal Model Functionality
│
├─ Query-based Extraction
│ ├─ Exhaust API with queries
│ │ ├─ Create multiple accounts
│ │ ├─ Distribute queries across accounts
│ │ └─ Use residential proxies
│ ├─ Optimize query selection
│ │ ├─ Active learning strategies
│ │ └─ Boundary case sampling
│ └─ Train substitute model
│
├─ Direct Model Access
│ ├─ Exploit API vulnerability
│ ├─ Social engineering
│ └─ Insider threat
│
└─ Indirect Information Leakage
├─ Timing attacks
├─ Error message analysis
└─ Metadata extraction
Interactive Demo: Building an Attack Tree
Let's build an attack tree for a different goal: "Cause Systematic Misclassification"
Goal: Cause Systematic Misclassification
│
├─ Training Phase Attacks
│ ├─ Data Poisoning
│ │ ├─ Compromise data source
│ │ ├─ Inject mislabeled examples
│ │ └─ Manipulate feature distributions
│ └─ Backdoor Injection
│ ├─ Insert trigger patterns
│ └─ Associate triggers with target labels
│
├─ Inference Phase Attacks
│ ├─ Adversarial Examples
│ │ ├─ White-box attacks (gradient access)
│ │ └─ Black-box attacks (query-based)
│ └─ Input Manipulation
│ ├─ Physical perturbations
│ └─ Digital modifications
│
└─ Model Replacement
├─ Gain deployment access
├─ Replace with malicious model
└─ Maintain stealth
Assignment for Next Week: Create an attack tree for your course project's ML system.
Time Allocation: 20 minutes
You learned about the CIA triad in CSCI 3453:
- Confidentiality: Prevent unauthorized access to information
- Integrity: Ensure information is accurate and unmodified
- Availability: Ensure authorized access when needed
In ML systems, these properties have nuanced interpretations.
- Training Data
- May contain sensitive user information
- Business-critical proprietary data
- Model Architecture & Weights
- Intellectual property
- Competitive advantage
- Model Predictions
- May reveal sensitive patterns
- Inference can leak training data
Threat 1: Model Inversion
Reconstruct training data from model parameters.
# Conceptual demonstration
class ModelInversionAttack:
"""
Attempt to recover training data from model predictions
This is a simplified illustration
"""
def __init__(self, target_model):
self.model = target_model
def invert_for_class(self, target_class, iterations=1000):
"""
Generate input that maximizes probability for target_class
This could reveal what the model "thinks" the class looks like
"""
# Start with random input
input_data = torch.randn(1, 3, 224, 224, requires_grad=True)
optimizer = torch.optim.Adam([input_data], lr=0.01)
for i in range(iterations):
# Get model prediction
output = self.model(input_data)
# Maximize probability of target class
loss = -output[0, target_class]
# Update input
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 100 == 0:
print(f"Iteration {i}, Loss: {loss.item():.4f}")
# The resulting input_data may resemble training examples
# of the target class
return input_data.detach()
# Why this matters:
# If target_class = "Person with Disease X", the inverted image
# might reveal identifying characteristics of patients in training data
Threat 2: Membership Inference
Determine if a specific data point was in the training set.
class MembershipInferenceAttack:
"""
Determine if a sample was in the training set
"""
def __init__(self, target_model):
self.model = target_model
self.model.eval()
def is_member(self, sample, true_label, confidence_threshold=0.9):
"""
High confidence predictions often indicate training membership
"""
with torch.no_grad():
output = self.model(sample)
probabilities = torch.softmax(output, dim=1)
confidence = probabilities[0, true_label].item()
# If model is very confident, sample likely in training set
is_training_member = confidence > confidence_threshold
print(f"Confidence: {confidence:.4f}")
print(f"Likely training member: {is_training_member}")
return is_training_member
# Privacy implication:
# In a medical ML system, this could reveal whether a specific patient's
# data was used for training, potentially exposing their medical status
Defense: Differential Privacy
import numpy as np
class DifferentiallyPrivateTraining:
"""
Add calibrated noise to protect individual privacy
"""
def __init__(self, epsilon=1.0, delta=1e-5):
"""
epsilon: Privacy budget (lower = more private)
delta: Probability of privacy breach
"""
self.epsilon = epsilon
self.delta = delta
def add_noise_to_gradients(self, gradients, sensitivity, batch_size):
"""
Add Gaussian noise to gradients during training
Args:
gradients: Model gradients
sensitivity: L2 sensitivity of gradient computation
batch_size: Training batch size
"""
# Calculate noise scale
noise_scale = (sensitivity * np.sqrt(2 * np.log(1.25 / self.delta))) / self.epsilon
# Add noise
noise = np.random.normal(0, noise_scale, gradients.shape)
private_gradients = gradients + noise
return private_gradients
def clip_gradients(self, gradients, max_norm=1.0):
"""
Clip gradients to bound sensitivity
"""
norm = np.linalg.norm(gradients)
if norm > max_norm:
gradients = gradients * (max_norm / norm)
return gradients
# Trade-off: Privacy ↔ Accuracy
# More privacy (lower epsilon) = more noise = lower accuracy
- Data Integrity: Training data is accurate and untampered
- Model Integrity: Model behaves as intended
- Prediction Integrity: Outputs are correct and trustworthy
Data Integrity: Defense Against Poisoning
class DataIntegrityChecker:
"""
Detect and filter poisoned training data
"""
def __init__(self, clean_sample_size=100):
self.clean_sample = None
self.clean_sample_size = clean_sample_size
def establish_baseline(self, trusted_data):
"""Establish baseline from trusted data"""
self.clean_sample = trusted_data[:self.clean_sample_size]
self.baseline_stats = {
'mean': np.mean(self.clean_sample, axis=0),
'std': np.std(self.clean_sample, axis=0)
}
def detect_anomalies(self, new_data, threshold=3.0):
"""
Detect outliers using statistical methods
Returns: List of indices of suspected poisoned samples
"""
if self.baseline_stats is None:
raise ValueError("Must establish baseline first")
suspicious_indices = []
for idx, sample in enumerate(new_data):
# Compute z-score
z_score = np.abs(
(sample - self.baseline_stats['mean']) /
(self.baseline_stats['std'] + 1e-10)
)
# Flag if any feature has high z-score
if np.max(z_score) > threshold:
suspicious_indices.append(idx)
print(f"⚠️ Found {len(suspicious_indices)} suspicious samples")
return suspicious_indices
def filter_data(self, data, labels, suspicious_indices):
"""Remove suspicious samples"""
mask = np.ones(len(data), dtype=bool)
mask[suspicious_indices] = False
filtered_data = data[mask]
filtered_labels = labels[mask]
print(f"✅ Filtered dataset: {len(filtered_data)} samples remaining")
return filtered_data, filtered_labels
# Usage
checker = DataIntegrityChecker()
# checker.establish_baseline(trusted_training_data)
# suspicious = checker.detect_anomalies(new_batch_data)
# clean_data, clean_labels = checker.filter_data(data, labels, suspicious)
Model Integrity: Behavioral Testing
class ModelIntegrityTester:
"""
Test model behavior to detect backdoors or drift
"""
def __init__(self, model, test_suite):
self.model = model
self.test_suite = test_suite # Known input-output pairs
def run_invariance_tests(self):
"""
Test that model maintains expected invariances
"""
print("\n🧪 Running Invariance Tests...")
passed = 0
failed = 0
for test_name, test_fn in self.invariance_tests.items():
if test_fn(self.model):
print(f" ✅ {test_name}")
passed += 1
else:
print(f" ❌ {test_name} FAILED")
failed += 1
return passed, failed
@staticmethod
def test_brightness_invariance(model, test_image, epsilon=0.2):
"""
Model should be invariant to brightness changes
"""
import torch
original_pred = model(test_image).argmax()
# Adjust brightness
bright_image = torch.clamp(test_image + epsilon, 0, 1)
dark_image = torch.clamp(test_image - epsilon, 0, 1)
bright_pred = model(bright_image).argmax()
dark_pred = model(dark_image).argmax()
# Should predict same class
return original_pred == bright_pred == dark_pred
def test_for_backdoors(self, trigger_patterns):
"""
Test if specific patterns activate incorrect predictions
"""
print("\n🔍 Testing for Backdoors...")
for trigger_name, trigger in trigger_patterns.items():
# Apply trigger to benign samples
triggered_samples = self.apply_trigger(self.test_suite, trigger)
# Check predictions
predictions = [self.model(s).argmax() for s in triggered_samples]
# Suspicious if all predict same (wrong) class
unique_preds = set(predictions)
if len(unique_preds) == 1:
print(f" ⚠️ Potential backdoor with trigger: {trigger_name}")
return False
print(" ✅ No backdoors detected")
return True
1. Sponge Examples: Inputs that cause excessive computation
class AvailabilityDefense:
"""
Protect against resource exhaustion attacks
"""
def __init__(self, max_inference_time=1.0):
self.max_inference_time = max_inference_time
def safe_inference(self, model, input_data):
"""
Run inference with timeout protection
"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Inference exceeded time limit")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(self.max_inference_time))
try:
result = model(input_data)
signal.alarm(0) # Disable alarm
return result
except TimeoutError:
print("⚠️ Inference timeout - potential sponge example")
return None
def monitor_resource_usage(self):
"""
Monitor CPU/memory/GPU usage
"""
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
if cpu_percent > 90 or memory_percent > 90:
print("⚠️ High resource usage detected")
return False
return True
2. Model Denial-of-Service via Adversarial Examples
Some adversarial examples are designed to crash models or cause errors.
Defense Strategy:
class RobustInferenceWrapper:
"""
Wrap model with defensive layers
"""
def __init__(self, model):
self.model = model
self.anomaly_detector = self.build_anomaly_detector()
def build_anomaly_detector(self):
"""
Simple anomaly detector for unusual inputs
"""
# Could use isolation forest, autoencoder, etc.
from sklearn.ensemble import IsolationForest
return IsolationForest(contamination=0.1)
def detect_adversarial(self, input_data):
"""
Check if input is adversarial
"""
# Flatten input for anomaly detection
flattened = input_data.reshape(1, -1)
# -1 indicates anomaly
is_anomaly = self.anomaly_detector.predict(flattened)[0] == -1
if is_anomaly:
print("⚠️ Potential adversarial input detected")
return is_anomaly
def safe_predict(self, input_data):
"""
Predict with safety checks
"""
# Check for anomalies
if self.detect_adversarial(input_data):
return None # Reject suspicious inputs
# Run inference
try:
with torch.no_grad():
output = self.model(input_data)
return output
except Exception as e:
print(f"❌ Inference failed: {e}")
return None
Time Allocation: 10 minutes
- ML systems are complex with multiple layers and components, each presenting security risks
- Training and inference phases have different threat models and require different security controls
- Data pipelines are critical attack vectors - secure the entire pipeline, not just the model
- Threat modeling (STRIDE, attack trees) helps systematically identify vulnerabilities
- CIA triad applies differently to ML:
- Confidentiality: Model inversion, membership inference
- Integrity: Data poisoning, backdoors
- Availability: Sponge examples, resource exhaustion
- What surprised you most about security in ML systems compared to traditional software?
- Which phase is harder to secure - training or inference? Why?
- How would you prioritize security efforts if you had limited resources?
Next Week (Week 3): Adversarial Machine Learning - Evasion Attacks
- Deep dive into adversarial examples
- Hands-on: Implementing FGSM attacks
- Defenses: Adversarial training
Assignment Due Before Next Class:
- Read: "Explaining and Harnessing Adversarial Examples" (Goodfellow et al., 2014)
- Set up Python environment with PyTorch/TensorFlow
- Complete threat modeling exercise (posted on Canvas)
Open floor for questions and clarifications.
- Papernot et al. (2016) - "The Limitations of Deep Learning in Adversarial Settings"
- Tramèr et al. (2016) - "Stealing Machine Learning Models via Prediction APIs"
- Shokri et al. (2017) - "Membership Inference Attacks Against Machine Learning Models"
- OWASP Machine Learning Security Top 10
- Microsoft's Failure Modes in Machine Learning
- NIST AI Risk Management Framework
All code examples from this tutorial are available at:
https://github.com/[your-repo]/csci5773-week2-security-fundamentals
Clone the repository and follow setup instructions in README.md
End of Tutorial