-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
Create something similar to this AI assisted example:
API Security Assessment Tool
A comprehensive automated security testing tool for REST APIs, covering 9 critical security domains.
Features
✅ Transport & TLS Security - HTTPS enforcement, certificate validation, HSTS headers
✅ Authentication Testing - JWT validation, token lifetime, algorithm security
✅ Authorization Testing - IDOR prevention, privilege escalation, mass assignment
✅ Input Validation - SQL injection, NoSQL injection, command injection
✅ SSRF Protection - URL validation, private IP blocking, cloud metadata protection
✅ Rate Limiting - DoS protection, request throttling
✅ Information Disclosure - Error handling, header exposure, verbose messages
✅ Management Endpoints - Admin interface exposure, debug endpoints
✅ Automated Reporting - JSON export, severity classification, remediation guidance
Installation
# Install dependencies
pip install requests pyjwt
# Make script executable (optional)
chmod +x api_security_assessment.pyQuick Start
Basic Usage
# Test an API
python api_security_assessment.py --url https://api.example.com
# Test with authentication token
python api_security_assessment.py --url https://api.example.com --token "Bearer your-token-here"
# Test specific endpoints
python api_security_assessment.py \
--url https://api.example.com \
--endpoints /api/users,/api/data,/api/searchProgrammatic Usage
from api_security_assessment import APISecurityAssessment
# Configure your API
API_URL = "https://your-api.com"
AUTH_TOKEN = "your-jwt-token" # Optional
# Define endpoints to test
TEST_ENDPOINTS = [
"/api/v1/users",
"/api/v1/products",
"/api/v1/orders"
]
# Run full assessment
assessor = APISecurityAssessment(API_URL, AUTH_TOKEN)
assessor.run_full_assessment(TEST_ENDPOINTS)Custom Testing
# Test specific security domains
assessor = APISecurityAssessment(API_URL, AUTH_TOKEN)
# Test only TLS configuration
assessor.test_tls_security()
# Test only authentication
assessor.test_authentication()
# Test authorization on specific endpoints
assessor.test_authorization(["/api/admin", "/api/settings"])
# Test for SQL injection
assessor.test_input_validation(["/api/search"])
# Test for SSRF
assessor.test_ssrf(["/api/webhook"])
# Test rate limiting
assessor.test_rate_limiting("/api/data")
# Test information disclosure
assessor.test_information_disclosure()
# Test management endpoints
assessor.test_management_endpoints()
# Generate report
assessor.generate_report()Example Output
============================================================
API Security Assessment: https://api.example.com
============================================================
=== Testing TLS Security ===
[MEDIUM] Missing HSTS Header
=== Testing Authentication ===
Token algorithm: RS256
Token lifetime: 2.0 hours
[MEDIUM] Long JWT Lifetime
=== Testing Authorization ===
/api/users: Protected (401)
/api/data: Protected (401)
=== Testing Input Validation ===
[CRITICAL] SQL Injection in /api/search
=== Testing SSRF Protection ===
=== Testing Rate Limiting ===
Sending 120 requests to test rate limiting...
Rate limiting detected: 100 successful, 20 rate-limited
=== Testing Information Disclosure ===
[LOW] Header Exposure: Server
=== Testing Management Endpoints ===
[HIGH] Exposed Management Endpoint: /swagger
============================================================
ASSESSMENT SUMMARY
============================================================
Total Findings: 4
🔴 CRITICAL: 1
🟠 HIGH: 1
🟡 MEDIUM: 2
🟢 LOW: 1
============================================================
DETAILED FINDINGS
============================================================
1. [CRITICAL] SQL Injection in /api/search
Category: Input Validation
Description: Endpoint vulnerable to SQL injection
Evidence: Payload: ' OR '1'='1, Response contains SQL error
Remediation: Use parameterized queries and prepared statements
2. [HIGH] Exposed Management Endpoint: /swagger
Category: Management Endpoints
Description: Management or admin endpoint accessible without restrictions
Evidence: Status: 200, Size: 15234 bytes
Remediation: Restrict access to management endpoints via network controls
...
Report saved to: api_security_assessment_20251031_143022.json
Security Domains Tested
1. Transport & TLS Security
- HTTPS enforcement
- HTTP to HTTPS redirect
- HSTS header presence
- TLS version and cipher strength
- Certificate validation
2. Authentication
- JWT algorithm validation
- Token signature strength (RS256/ES256 vs HS256/none)
- Token lifetime validation
- Token expiration handling
- Claim validation (iss, aud, exp, nbf, iat)
3. Authorization
- Unauthenticated access attempts
- IDOR (Insecure Direct Object Reference)
- Privilege escalation testing
- Mass assignment vulnerabilities
- Resource-level access control
4. Input Validation
- SQL injection testing
- NoSQL injection testing
- Command injection testing
- LDAP injection testing
- Schema validation
5. SSRF Prevention
- Internal network access (localhost, 127.0.0.1, private IPs)
- Cloud metadata endpoints (AWS, GCP, Azure)
- File protocol access (file://)
- Protocol validation (gopher://, dict://, ftp://)
- DNS rebinding protection
6. Rate Limiting & DoS
- Request rate limits per IP
- Request rate limits per user
- Burst protection
- Request size limits
- Timeout enforcement
7. Information Disclosure
- Verbose error messages
- Stack traces in responses
- Server/framework version headers
- Sensitive data in responses
- Timing attack vulnerabilities
8. Management Endpoints
- Admin panel exposure
- Debug endpoints
- Health check endpoints
- Metrics endpoints
- API documentation (Swagger/OpenAPI)
Configuration Options
Command Line Arguments
--url Base URL of the API (required)
--token Authentication token (optional)
--endpoints Comma-separated list of endpoints to test (optional)
Environment Variables
You can also set these via environment variables:
export API_URL="https://api.example.com"
export API_TOKEN="Bearer your-token"
export API_ENDPOINTS="/api/users,/api/data"
python api_security_assessment.py --url "$API_URL" --token "$API_TOKEN" --endpoints "$API_ENDPOINTS"Report Format
Reports are saved in JSON format with the following structure:
[
{
"severity": "CRITICAL",
"category": "Input Validation",
"title": "SQL Injection in /api/search",
"description": "Endpoint vulnerable to SQL injection",
"evidence": "Payload: ' OR '1'='1, Response contains SQL error",
"remediation": "Use parameterized queries and prepared statements"
}
]Best Practices
Before Testing
- ✅ Get Authorization - Always obtain written permission before testing
- ✅ Test Non-Production - Use staging/development environments when possible
- ✅ Define Scope - Clearly identify which endpoints and functionalities to test
- ✅ Set Rate Limits - Configure testing parameters to avoid overwhelming the API
- ✅ Backup Data - Ensure data is backed up before testing
During Testing
- ✅ Monitor Impact - Watch for performance degradation
- ✅ Document Everything - Record all tests and findings
- ✅ Test Incrementally - Start with less invasive tests
- ✅ Respect Rate Limits - Don't overwhelm the API
- ✅ Save Evidence - Capture screenshots and logs
After Testing
- ✅ Generate Reports - Create comprehensive documentation
- ✅ Prioritize Findings - Focus on critical and high severity issues first
- ✅ Provide Remediation - Include actionable fix recommendations
- ✅ Retest After Fixes - Verify that vulnerabilities are properly addressed
- ✅ Schedule Regular Tests - Perform assessments periodically
Limitations
Advanced Usage
Custom Test Payloads
Modify the tool to include custom payloads:
# Add custom SQL injection payloads
sql_payloads = [
"' OR '1'='1",
"' UNION SELECT * FROM users--",
# Add your custom payloads here
]
# Add custom SSRF targets
ssrf_payloads = [
"http://internal.example.com",
# Add your internal endpoints here
]Integration with CI/CD
# .github/workflows/api-security-test.yml
name: API Security Assessment
on:
schedule:
- cron: '0 0 * * 0' # Weekly
workflow_dispatch:
jobs:
security-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install dependencies
run: pip install requests pyjwt
- name: Run API security assessment
run: |
python part5_agents_and_tools/api_security_assessment.py \
--url ${{ secrets.API_URL }} \
--token ${{ secrets.API_TOKEN }} \
--endpoints /api/users,/api/data
- name: Upload report
uses: actions/upload-artifact@v3
with:
name: security-report
path: api_security_assessment_*.jsonRelated Documentation
- Full Guide: See
API_SECURITY_ASSESSMENT_GUIDE.mdfor comprehensive methodology - CodeGuard Rules: Check
.cursor/rules/codeguard-0-api-web-services.mdcfor security standards - OWASP API Security Top 10: https://owasp.org/www-project-api-security/
Troubleshooting
SSL Certificate Errors
# Disable SSL verification (NOT recommended for production)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# In the script, modify requests
resp = requests.get(url, verify=False)Proxy Configuration
# Configure proxy
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080',
}
# Use with requests
resp = requests.get(url, proxies=proxies)Timeout Issues
# Increase timeout
resp = requests.get(url, timeout=30) # 30 seconds
#!/usr/bin/env python3
"""
Complete API Security Assessment Tool
Performs comprehensive security testing across all domains
Usage:
python api_security_assessment.py --url https://api.example.com
python api_security_assessment.py --url https://api.example.com --token "Bearer xyz"
python api_security_assessment.py --url https://api.example.com --endpoints /api/users,/api/data
"""
import requests
import jwt
import time
import json
import argparse
from typing import Dict, List, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class SecurityFinding:
"""Represents a security finding from the assessment"""
severity: str # CRITICAL, HIGH, MEDIUM, LOW
category: str
title: str
description: str
evidence: str
remediation: str
class APISecurityAssessment:
"""Comprehensive API Security Assessment Tool"""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.findings: List[SecurityFinding] = []
self.session = requests.Session()
if auth_token:
self.session.headers.update({"Authorization": f"Bearer {auth_token}"})
def add_finding(self, severity: str, category: str, title: str,
description: str, evidence: str, remediation: str):
"""Add a security finding"""
finding = SecurityFinding(
severity=severity,
category=category,
title=title,
description=description,
evidence=evidence,
remediation=remediation
)
self.findings.append(finding)
print(f"[{severity}] {title}")
def test_tls_security(self):
"""Test TLS/SSL configuration"""
print("\n=== Testing TLS Security ===")
# Test HTTP redirect
try:
http_url = self.base_url.replace('https://', 'http://')
resp = requests.get(http_url, allow_redirects=False, timeout=5)
if resp.status_code not in [301, 302]:
self.add_finding(
"HIGH",
"Transport Security",
"No HTTPS Redirect",
"HTTP traffic is not redirected to HTTPS",
f"HTTP request returned status: {resp.status_code}",
"Configure web server to redirect all HTTP traffic to HTTPS"
)
except Exception as e:
print(f" Could not test HTTP redirect: {e}")
# Check HSTS header
try:
resp = self.session.get(self.base_url, timeout=10)
if 'Strict-Transport-Security' not in resp.headers:
self.add_finding(
"MEDIUM",
"Transport Security",
"Missing HSTS Header",
"Strict-Transport-Security header not set",
"HSTS header not present in response",
"Add 'Strict-Transport-Security: max-age=31536000; includeSubDomains' header"
)
except Exception as e:
print(f" Could not test HSTS: {e}")
def test_authentication(self):
"""Test authentication mechanisms"""
print("\n=== Testing Authentication ===")
# Test for JWT algorithm confusion
if self.auth_token and self.auth_token.startswith('eyJ'):
try:
# Decode without verification to inspect
unverified = jwt.decode(self.auth_token, options={"verify_signature": False})
# Check for weak algorithms
header = jwt.get_unverified_header(self.auth_token)
if header.get('alg') in ['none', 'HS256']:
self.add_finding(
"HIGH",
"Authentication",
"Weak JWT Algorithm",
f"JWT uses weak algorithm: {header.get('alg')}",
f"Token header: {header}",
"Use RS256 or ES256 for JWT signatures"
)
# Check token lifetime
if 'exp' in unverified:
lifetime = unverified['exp'] - unverified.get('iat', time.time())
if lifetime > 3600: # More than 1 hour
self.add_finding(
"MEDIUM",
"Authentication",
"Long JWT Lifetime",
f"JWT token lifetime is {lifetime/3600:.1f} hours",
f"exp: {unverified['exp']}, iat: {unverified.get('iat')}",
"Reduce token lifetime to 15-60 minutes for access tokens"
)
print(f" Token algorithm: {header.get('alg')}")
print(f" Token lifetime: {lifetime/3600:.1f} hours")
except Exception as e:
print(f" Could not decode JWT: {e}")
else:
print(" No JWT token provided, skipping JWT tests")
def test_authorization(self, test_endpoints: List[str]):
"""Test authorization controls"""
print("\n=== Testing Authorization ===")
for endpoint in test_endpoints:
try:
# Test without authentication
resp = requests.get(f"{self.base_url}{endpoint}", timeout=5)
if resp.status_code == 200:
self.add_finding(
"CRITICAL",
"Authorization",
f"Unauthenticated Access to {endpoint}",
"Endpoint accessible without authentication",
f"Status: {resp.status_code}, Response size: {len(resp.content)}",
"Enforce authentication on all sensitive endpoints"
)
else:
print(f" {endpoint}: Protected ({resp.status_code})")
except Exception as e:
print(f" Error testing {endpoint}: {e}")
def test_input_validation(self, test_endpoints: List[str]):
"""Test input validation and injection"""
print("\n=== Testing Input Validation ===")
sql_payloads = ["' OR '1'='1", "' OR '1'='1' --", "1' UNION SELECT NULL--"]
for endpoint in test_endpoints:
for payload in sql_payloads:
try:
resp = self.session.get(f"{self.base_url}{endpoint}",
params={"q": payload},
timeout=5)
# Check for SQL errors
error_indicators = ["SQL syntax", "mysql", "postgresql", "sqlite"]
if any(indicator in resp.text.lower() for indicator in error_indicators):
self.add_finding(
"CRITICAL",
"Input Validation",
f"SQL Injection in {endpoint}",
"Endpoint vulnerable to SQL injection",
f"Payload: {payload}, Response contains SQL error",
"Use parameterized queries and prepared statements"
)
break
except Exception as e:
pass
def test_ssrf(self, test_endpoints: List[str]):
"""Test for SSRF vulnerabilities"""
print("\n=== Testing SSRF Protection ===")
ssrf_payloads = [
"http://127.0.0.1",
"http://localhost",
"http://169.254.169.254/latest/meta-data/",
"file:///etc/passwd",
]
for endpoint in test_endpoints:
for payload in ssrf_payloads:
try:
resp = self.session.get(f"{self.base_url}{endpoint}",
params={"url": payload},
timeout=5)
if resp.status_code == 200:
self.add_finding(
"CRITICAL",
"SSRF",
f"SSRF Vulnerability in {endpoint}",
"Endpoint vulnerable to Server-Side Request Forgery",
f"Payload: {payload} returned 200 OK",
"Validate and sanitize URLs; block private IP ranges and cloud metadata endpoints"
)
except Exception as e:
pass
def test_rate_limiting(self, test_endpoint: str, limit: int = 100):
"""Test rate limiting"""
print("\n=== Testing Rate Limiting ===")
def make_request(i):
try:
resp = self.session.get(f"{self.base_url}{test_endpoint}", timeout=5)
return resp.status_code
except:
return None
print(f" Sending {limit + 20} requests to test rate limiting...")
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(make_request, range(limit + 20)))
rate_limited = sum(1 for r in results if r == 429)
successful = sum(1 for r in results if r == 200)
if rate_limited == 0:
self.add_finding(
"MEDIUM",
"Rate Limiting",
"No Rate Limiting Detected",
f"Sent {len(results)} requests without rate limiting",
f"All requests completed without 429 responses",
"Implement rate limiting per IP, user, and globally"
)
else:
print(f" Rate limiting detected: {successful} successful, {rate_limited} rate-limited")
def test_information_disclosure(self):
"""Test for information leakage"""
print("\n=== Testing Information Disclosure ===")
try:
resp = self.session.get(self.base_url, timeout=10)
# Check headers
risky_headers = {
"Server": "Server version exposed",
"X-Powered-By": "Framework exposed",
"X-AspNet-Version": "Framework version exposed",
}
for header, description in risky_headers.items():
if header in resp.headers:
self.add_finding(
"LOW",
"Information Disclosure",
f"Header Exposure: {header}",
description,
f"{header}: {resp.headers[header]}",
f"Remove or genericize the {header} header"
)
# Check for error messages
error_resp = self.session.get(f"{self.base_url}/nonexistent-endpoint-test-12345", timeout=5)
error_patterns = ["Traceback", "Stack trace", ".py\"", "Exception in"]
if any(pattern in error_resp.text for pattern in error_patterns):
self.add_finding(
"MEDIUM",
"Information Disclosure",
"Verbose Error Messages",
"Error responses contain stack traces or internal details",
"Error response contains framework/code details",
"Implement generic error messages and log details server-side"
)
except Exception as e:
print(f" Error testing information disclosure: {e}")
def test_management_endpoints(self):
"""Test for exposed management endpoints"""
print("\n=== Testing Management Endpoints ===")
admin_paths = [
"/admin", "/api/admin", "/health", "/metrics",
"/debug", "/swagger", "/api-docs", "/actuator",
"/openapi.json", "/swagger.json"
]
for path in admin_paths:
try:
resp = requests.get(f"{self.base_url}{path}", timeout=5)
if resp.status_code == 200:
self.add_finding(
"HIGH",
"Management Endpoints",
f"Exposed Management Endpoint: {path}",
"Management or admin endpoint accessible without restrictions",
f"Status: {resp.status_code}, Size: {len(resp.content)} bytes",
"Restrict access to management endpoints via network controls and strong authentication"
)
elif resp.status_code in [401, 403]:
print(f" {path}: Protected ({resp.status_code})")
except:
pass
def run_full_assessment(self, test_endpoints: Optional[List[str]] = None):
"""Run complete security assessment"""
if test_endpoints is None:
test_endpoints = ["/api/users", "/api/data", "/api/search"]
print(f"\n{'='*60}")
print(f"API Security Assessment: {self.base_url}")
print(f"{'='*60}")
self.test_tls_security()
self.test_authentication()
self.test_authorization(test_endpoints)
self.test_input_validation(test_endpoints)
self.test_ssrf(test_endpoints)
self.test_rate_limiting(test_endpoints[0] if test_endpoints else "/")
self.test_information_disclosure()
self.test_management_endpoints()
self.generate_report()
def generate_report(self):
"""Generate assessment report"""
print(f"\n{'='*60}")
print("ASSESSMENT SUMMARY")
print(f"{'='*60}\n")
severity_counts = {
"CRITICAL": len([f for f in self.findings if f.severity == "CRITICAL"]),
"HIGH": len([f for f in self.findings if f.severity == "HIGH"]),
"MEDIUM": len([f for f in self.findings if f.severity == "MEDIUM"]),
"LOW": len([f for f in self.findings if f.severity == "LOW"]),
}
print(f"Total Findings: {len(self.findings)}")
print(f" 🔴 CRITICAL: {severity_counts['CRITICAL']}")
print(f" 🟠 HIGH: {severity_counts['HIGH']}")
print(f" 🟡 MEDIUM: {severity_counts['MEDIUM']}")
print(f" 🟢 LOW: {severity_counts['LOW']}")
if self.findings:
print(f"\n{'='*60}")
print("DETAILED FINDINGS")
print(f"{'='*60}\n")
for i, finding in enumerate(self.findings, 1):
print(f"{i}. [{finding.severity}] {finding.title}")
print(f" Category: {finding.category}")
print(f" Description: {finding.description}")
print(f" Evidence: {finding.evidence}")
print(f" Remediation: {finding.remediation}")
print()
# Export to JSON
timestamp = time.strftime("%Y%m%d_%H%M%S")
report_file = f"api_security_assessment_{timestamp}.json"
with open(report_file, 'w') as f:
json.dump([
{
"severity": f.severity,
"category": f.category,
"title": f.title,
"description": f.description,
"evidence": f.evidence,
"remediation": f.remediation
}
for f in self.findings
], f, indent=2)
print(f"Report saved to: {report_file}")
else:
print("\n✓ No security issues found!")
def main():
"""Main entry point for CLI usage"""
parser = argparse.ArgumentParser(
description="Comprehensive API Security Assessment Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --url https://api.example.com
%(prog)s --url https://api.example.com --token "Bearer xyz123"
%(prog)s --url https://api.example.com --endpoints /api/users,/api/data,/api/search
"""
)
parser.add_argument(
'--url',
required=True,
help='Base URL of the API to assess (e.g., https://api.example.com)'
)
parser.add_argument(
'--token',
help='Authentication token (JWT or Bearer token)'
)
parser.add_argument(
'--endpoints',
help='Comma-separated list of endpoints to test (e.g., /api/users,/api/data)'
)
args = parser.parse_args()
# Parse endpoints
test_endpoints = None
if args.endpoints:
test_endpoints = [e.strip() for e in args.endpoints.split(',')]
# Run assessment
assessor = APISecurityAssessment(args.url, args.token)
assessor.run_full_assessment(test_endpoints)
if __name__ == "__main__":
main()