Skip to content

Create API security assessment resource #383

@santosomar

Description

@santosomar

Create something similar to this AI assisted example:

API Security Assessment Tool

A comprehensive automated security testing tool for REST APIs, covering 9 critical security domains.

Features

Transport & TLS Security - HTTPS enforcement, certificate validation, HSTS headers
Authentication Testing - JWT validation, token lifetime, algorithm security
Authorization Testing - IDOR prevention, privilege escalation, mass assignment
Input Validation - SQL injection, NoSQL injection, command injection
SSRF Protection - URL validation, private IP blocking, cloud metadata protection
Rate Limiting - DoS protection, request throttling
Information Disclosure - Error handling, header exposure, verbose messages
Management Endpoints - Admin interface exposure, debug endpoints
Automated Reporting - JSON export, severity classification, remediation guidance

Installation

# Install dependencies
pip install requests pyjwt

# Make script executable (optional)
chmod +x api_security_assessment.py

Quick Start

Basic Usage

# Test an API
python api_security_assessment.py --url https://api.example.com

# Test with authentication token
python api_security_assessment.py --url https://api.example.com --token "Bearer your-token-here"

# Test specific endpoints
python api_security_assessment.py \
  --url https://api.example.com \
  --endpoints /api/users,/api/data,/api/search

Programmatic Usage

from api_security_assessment import APISecurityAssessment

# Configure your API
API_URL = "https://your-api.com"
AUTH_TOKEN = "your-jwt-token"  # Optional

# Define endpoints to test
TEST_ENDPOINTS = [
    "/api/v1/users",
    "/api/v1/products",
    "/api/v1/orders"
]

# Run full assessment
assessor = APISecurityAssessment(API_URL, AUTH_TOKEN)
assessor.run_full_assessment(TEST_ENDPOINTS)

Custom Testing

# Test specific security domains
assessor = APISecurityAssessment(API_URL, AUTH_TOKEN)

# Test only TLS configuration
assessor.test_tls_security()

# Test only authentication
assessor.test_authentication()

# Test authorization on specific endpoints
assessor.test_authorization(["/api/admin", "/api/settings"])

# Test for SQL injection
assessor.test_input_validation(["/api/search"])

# Test for SSRF
assessor.test_ssrf(["/api/webhook"])

# Test rate limiting
assessor.test_rate_limiting("/api/data")

# Test information disclosure
assessor.test_information_disclosure()

# Test management endpoints
assessor.test_management_endpoints()

# Generate report
assessor.generate_report()

Example Output

============================================================
API Security Assessment: https://api.example.com
============================================================

=== Testing TLS Security ===
[MEDIUM] Missing HSTS Header

=== Testing Authentication ===
  Token algorithm: RS256
  Token lifetime: 2.0 hours
[MEDIUM] Long JWT Lifetime

=== Testing Authorization ===
  /api/users: Protected (401)
  /api/data: Protected (401)

=== Testing Input Validation ===
[CRITICAL] SQL Injection in /api/search

=== Testing SSRF Protection ===

=== Testing Rate Limiting ===
  Sending 120 requests to test rate limiting...
  Rate limiting detected: 100 successful, 20 rate-limited

=== Testing Information Disclosure ===
[LOW] Header Exposure: Server

=== Testing Management Endpoints ===
[HIGH] Exposed Management Endpoint: /swagger

============================================================
ASSESSMENT SUMMARY
============================================================

Total Findings: 4
  🔴 CRITICAL: 1
  🟠 HIGH:     1
  🟡 MEDIUM:   2
  🟢 LOW:      1

============================================================
DETAILED FINDINGS
============================================================

1. [CRITICAL] SQL Injection in /api/search
   Category: Input Validation
   Description: Endpoint vulnerable to SQL injection
   Evidence: Payload: ' OR '1'='1, Response contains SQL error
   Remediation: Use parameterized queries and prepared statements

2. [HIGH] Exposed Management Endpoint: /swagger
   Category: Management Endpoints
   Description: Management or admin endpoint accessible without restrictions
   Evidence: Status: 200, Size: 15234 bytes
   Remediation: Restrict access to management endpoints via network controls

...

Report saved to: api_security_assessment_20251031_143022.json

Security Domains Tested

1. Transport & TLS Security

  • HTTPS enforcement
  • HTTP to HTTPS redirect
  • HSTS header presence
  • TLS version and cipher strength
  • Certificate validation

2. Authentication

  • JWT algorithm validation
  • Token signature strength (RS256/ES256 vs HS256/none)
  • Token lifetime validation
  • Token expiration handling
  • Claim validation (iss, aud, exp, nbf, iat)

3. Authorization

  • Unauthenticated access attempts
  • IDOR (Insecure Direct Object Reference)
  • Privilege escalation testing
  • Mass assignment vulnerabilities
  • Resource-level access control

4. Input Validation

  • SQL injection testing
  • NoSQL injection testing
  • Command injection testing
  • LDAP injection testing
  • Schema validation

5. SSRF Prevention

  • Internal network access (localhost, 127.0.0.1, private IPs)
  • Cloud metadata endpoints (AWS, GCP, Azure)
  • File protocol access (file://)
  • Protocol validation (gopher://, dict://, ftp://)
  • DNS rebinding protection

6. Rate Limiting & DoS

  • Request rate limits per IP
  • Request rate limits per user
  • Burst protection
  • Request size limits
  • Timeout enforcement

7. Information Disclosure

  • Verbose error messages
  • Stack traces in responses
  • Server/framework version headers
  • Sensitive data in responses
  • Timing attack vulnerabilities

8. Management Endpoints

  • Admin panel exposure
  • Debug endpoints
  • Health check endpoints
  • Metrics endpoints
  • API documentation (Swagger/OpenAPI)

Configuration Options

Command Line Arguments

--url         Base URL of the API (required)
--token       Authentication token (optional)
--endpoints   Comma-separated list of endpoints to test (optional)

Environment Variables

You can also set these via environment variables:

export API_URL="https://api.example.com"
export API_TOKEN="Bearer your-token"
export API_ENDPOINTS="/api/users,/api/data"

python api_security_assessment.py --url "$API_URL" --token "$API_TOKEN" --endpoints "$API_ENDPOINTS"

Report Format

Reports are saved in JSON format with the following structure:

[
  {
    "severity": "CRITICAL",
    "category": "Input Validation",
    "title": "SQL Injection in /api/search",
    "description": "Endpoint vulnerable to SQL injection",
    "evidence": "Payload: ' OR '1'='1, Response contains SQL error",
    "remediation": "Use parameterized queries and prepared statements"
  }
]

Best Practices

Before Testing

  1. Get Authorization - Always obtain written permission before testing
  2. Test Non-Production - Use staging/development environments when possible
  3. Define Scope - Clearly identify which endpoints and functionalities to test
  4. Set Rate Limits - Configure testing parameters to avoid overwhelming the API
  5. Backup Data - Ensure data is backed up before testing

During Testing

  1. Monitor Impact - Watch for performance degradation
  2. Document Everything - Record all tests and findings
  3. Test Incrementally - Start with less invasive tests
  4. Respect Rate Limits - Don't overwhelm the API
  5. Save Evidence - Capture screenshots and logs

After Testing

  1. Generate Reports - Create comprehensive documentation
  2. Prioritize Findings - Focus on critical and high severity issues first
  3. Provide Remediation - Include actionable fix recommendations
  4. Retest After Fixes - Verify that vulnerabilities are properly addressed
  5. Schedule Regular Tests - Perform assessments periodically

Limitations

⚠️ Not a Complete Penetration Test - This tool performs automated checks but doesn't replace manual security testing
⚠️ False Positives Possible - Some findings may require manual verification
⚠️ Limited Coverage - Business logic flaws may not be detected
⚠️ Network Dependent - Some tests may be affected by firewalls or proxies
⚠️ API-Specific - Custom authentication schemes may require tool modifications

Advanced Usage

Custom Test Payloads

Modify the tool to include custom payloads:

# Add custom SQL injection payloads
sql_payloads = [
    "' OR '1'='1",
    "' UNION SELECT * FROM users--",
    # Add your custom payloads here
]

# Add custom SSRF targets
ssrf_payloads = [
    "http://internal.example.com",
    # Add your internal endpoints here
]

Integration with CI/CD

# .github/workflows/api-security-test.yml
name: API Security Assessment

on:
  schedule:
    - cron: '0 0 * * 0'  # Weekly
  workflow_dispatch:

jobs:
  security-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Install dependencies
        run: pip install requests pyjwt
      
      - name: Run API security assessment
        run: |
          python part5_agents_and_tools/api_security_assessment.py \
            --url ${{ secrets.API_URL }} \
            --token ${{ secrets.API_TOKEN }} \
            --endpoints /api/users,/api/data
      
      - name: Upload report
        uses: actions/upload-artifact@v3
        with:
          name: security-report
          path: api_security_assessment_*.json

Related Documentation

  • Full Guide: See API_SECURITY_ASSESSMENT_GUIDE.md for comprehensive methodology
  • CodeGuard Rules: Check .cursor/rules/codeguard-0-api-web-services.mdc for security standards
  • OWASP API Security Top 10: https://owasp.org/www-project-api-security/

Troubleshooting

SSL Certificate Errors

# Disable SSL verification (NOT recommended for production)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# In the script, modify requests
resp = requests.get(url, verify=False)

Proxy Configuration

# Configure proxy
proxies = {
    'http': 'http://proxy.example.com:8080',
    'https': 'https://proxy.example.com:8080',
}

# Use with requests
resp = requests.get(url, proxies=proxies)

Timeout Issues

# Increase timeout
resp = requests.get(url, timeout=30)  # 30 seconds

#!/usr/bin/env python3
"""
Complete API Security Assessment Tool
Performs comprehensive security testing across all domains

Usage:
    python api_security_assessment.py --url https://api.example.com
    python api_security_assessment.py --url https://api.example.com --token "Bearer xyz"
    python api_security_assessment.py --url https://api.example.com --endpoints /api/users,/api/data
"""

import requests
import jwt
import time
import json
import argparse
from typing import Dict, List, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor


@dataclass
class SecurityFinding:
    """Represents a security finding from the assessment"""
    severity: str  # CRITICAL, HIGH, MEDIUM, LOW
    category: str
    title: str
    description: str
    evidence: str
    remediation: str


class APISecurityAssessment:
    """Comprehensive API Security Assessment Tool"""
    
    def __init__(self, base_url: str, auth_token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.auth_token = auth_token
        self.findings: List[SecurityFinding] = []
        self.session = requests.Session()
        if auth_token:
            self.session.headers.update({"Authorization": f"Bearer {auth_token}"})
    
    def add_finding(self, severity: str, category: str, title: str, 
                   description: str, evidence: str, remediation: str):
        """Add a security finding"""
        finding = SecurityFinding(
            severity=severity,
            category=category,
            title=title,
            description=description,
            evidence=evidence,
            remediation=remediation
        )
        self.findings.append(finding)
        print(f"[{severity}] {title}")
    
    def test_tls_security(self):
        """Test TLS/SSL configuration"""
        print("\n=== Testing TLS Security ===")
        
        # Test HTTP redirect
        try:
            http_url = self.base_url.replace('https://', 'http://')
            resp = requests.get(http_url, allow_redirects=False, timeout=5)
            
            if resp.status_code not in [301, 302]:
                self.add_finding(
                    "HIGH",
                    "Transport Security",
                    "No HTTPS Redirect",
                    "HTTP traffic is not redirected to HTTPS",
                    f"HTTP request returned status: {resp.status_code}",
                    "Configure web server to redirect all HTTP traffic to HTTPS"
                )
        except Exception as e:
            print(f"  Could not test HTTP redirect: {e}")
        
        # Check HSTS header
        try:
            resp = self.session.get(self.base_url, timeout=10)
            if 'Strict-Transport-Security' not in resp.headers:
                self.add_finding(
                    "MEDIUM",
                    "Transport Security",
                    "Missing HSTS Header",
                    "Strict-Transport-Security header not set",
                    "HSTS header not present in response",
                    "Add 'Strict-Transport-Security: max-age=31536000; includeSubDomains' header"
                )
        except Exception as e:
            print(f"  Could not test HSTS: {e}")
    
    def test_authentication(self):
        """Test authentication mechanisms"""
        print("\n=== Testing Authentication ===")
        
        # Test for JWT algorithm confusion
        if self.auth_token and self.auth_token.startswith('eyJ'):
            try:
                # Decode without verification to inspect
                unverified = jwt.decode(self.auth_token, options={"verify_signature": False})
                
                # Check for weak algorithms
                header = jwt.get_unverified_header(self.auth_token)
                if header.get('alg') in ['none', 'HS256']:
                    self.add_finding(
                        "HIGH",
                        "Authentication",
                        "Weak JWT Algorithm",
                        f"JWT uses weak algorithm: {header.get('alg')}",
                        f"Token header: {header}",
                        "Use RS256 or ES256 for JWT signatures"
                    )
                
                # Check token lifetime
                if 'exp' in unverified:
                    lifetime = unverified['exp'] - unverified.get('iat', time.time())
                    if lifetime > 3600:  # More than 1 hour
                        self.add_finding(
                            "MEDIUM",
                            "Authentication",
                            "Long JWT Lifetime",
                            f"JWT token lifetime is {lifetime/3600:.1f} hours",
                            f"exp: {unverified['exp']}, iat: {unverified.get('iat')}",
                            "Reduce token lifetime to 15-60 minutes for access tokens"
                        )
                    
                print(f"  Token algorithm: {header.get('alg')}")
                print(f"  Token lifetime: {lifetime/3600:.1f} hours")
                
            except Exception as e:
                print(f"  Could not decode JWT: {e}")
        else:
            print("  No JWT token provided, skipping JWT tests")
    
    def test_authorization(self, test_endpoints: List[str]):
        """Test authorization controls"""
        print("\n=== Testing Authorization ===")
        
        for endpoint in test_endpoints:
            try:
                # Test without authentication
                resp = requests.get(f"{self.base_url}{endpoint}", timeout=5)
                
                if resp.status_code == 200:
                    self.add_finding(
                        "CRITICAL",
                        "Authorization",
                        f"Unauthenticated Access to {endpoint}",
                        "Endpoint accessible without authentication",
                        f"Status: {resp.status_code}, Response size: {len(resp.content)}",
                        "Enforce authentication on all sensitive endpoints"
                    )
                else:
                    print(f"  {endpoint}: Protected ({resp.status_code})")
            except Exception as e:
                print(f"  Error testing {endpoint}: {e}")
    
    def test_input_validation(self, test_endpoints: List[str]):
        """Test input validation and injection"""
        print("\n=== Testing Input Validation ===")
        
        sql_payloads = ["' OR '1'='1", "' OR '1'='1' --", "1' UNION SELECT NULL--"]
        
        for endpoint in test_endpoints:
            for payload in sql_payloads:
                try:
                    resp = self.session.get(f"{self.base_url}{endpoint}", 
                                           params={"q": payload}, 
                                           timeout=5)
                    
                    # Check for SQL errors
                    error_indicators = ["SQL syntax", "mysql", "postgresql", "sqlite"]
                    if any(indicator in resp.text.lower() for indicator in error_indicators):
                        self.add_finding(
                            "CRITICAL",
                            "Input Validation",
                            f"SQL Injection in {endpoint}",
                            "Endpoint vulnerable to SQL injection",
                            f"Payload: {payload}, Response contains SQL error",
                            "Use parameterized queries and prepared statements"
                        )
                        break
                except Exception as e:
                    pass
    
    def test_ssrf(self, test_endpoints: List[str]):
        """Test for SSRF vulnerabilities"""
        print("\n=== Testing SSRF Protection ===")
        
        ssrf_payloads = [
            "http://127.0.0.1",
            "http://localhost",
            "http://169.254.169.254/latest/meta-data/",
            "file:///etc/passwd",
        ]
        
        for endpoint in test_endpoints:
            for payload in ssrf_payloads:
                try:
                    resp = self.session.get(f"{self.base_url}{endpoint}",
                                           params={"url": payload},
                                           timeout=5)
                    
                    if resp.status_code == 200:
                        self.add_finding(
                            "CRITICAL",
                            "SSRF",
                            f"SSRF Vulnerability in {endpoint}",
                            "Endpoint vulnerable to Server-Side Request Forgery",
                            f"Payload: {payload} returned 200 OK",
                            "Validate and sanitize URLs; block private IP ranges and cloud metadata endpoints"
                        )
                except Exception as e:
                    pass
    
    def test_rate_limiting(self, test_endpoint: str, limit: int = 100):
        """Test rate limiting"""
        print("\n=== Testing Rate Limiting ===")
        
        def make_request(i):
            try:
                resp = self.session.get(f"{self.base_url}{test_endpoint}", timeout=5)
                return resp.status_code
            except:
                return None
        
        print(f"  Sending {limit + 20} requests to test rate limiting...")
        with ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(make_request, range(limit + 20)))
        
        rate_limited = sum(1 for r in results if r == 429)
        successful = sum(1 for r in results if r == 200)
        
        if rate_limited == 0:
            self.add_finding(
                "MEDIUM",
                "Rate Limiting",
                "No Rate Limiting Detected",
                f"Sent {len(results)} requests without rate limiting",
                f"All requests completed without 429 responses",
                "Implement rate limiting per IP, user, and globally"
            )
        else:
            print(f"  Rate limiting detected: {successful} successful, {rate_limited} rate-limited")
    
    def test_information_disclosure(self):
        """Test for information leakage"""
        print("\n=== Testing Information Disclosure ===")
        
        try:
            resp = self.session.get(self.base_url, timeout=10)
            
            # Check headers
            risky_headers = {
                "Server": "Server version exposed",
                "X-Powered-By": "Framework exposed",
                "X-AspNet-Version": "Framework version exposed",
            }
            
            for header, description in risky_headers.items():
                if header in resp.headers:
                    self.add_finding(
                        "LOW",
                        "Information Disclosure",
                        f"Header Exposure: {header}",
                        description,
                        f"{header}: {resp.headers[header]}",
                        f"Remove or genericize the {header} header"
                    )
            
            # Check for error messages
            error_resp = self.session.get(f"{self.base_url}/nonexistent-endpoint-test-12345", timeout=5)
            error_patterns = ["Traceback", "Stack trace", ".py\"", "Exception in"]
            
            if any(pattern in error_resp.text for pattern in error_patterns):
                self.add_finding(
                    "MEDIUM",
                    "Information Disclosure",
                    "Verbose Error Messages",
                    "Error responses contain stack traces or internal details",
                    "Error response contains framework/code details",
                    "Implement generic error messages and log details server-side"
                )
        except Exception as e:
            print(f"  Error testing information disclosure: {e}")
    
    def test_management_endpoints(self):
        """Test for exposed management endpoints"""
        print("\n=== Testing Management Endpoints ===")
        
        admin_paths = [
            "/admin", "/api/admin", "/health", "/metrics",
            "/debug", "/swagger", "/api-docs", "/actuator",
            "/openapi.json", "/swagger.json"
        ]
        
        for path in admin_paths:
            try:
                resp = requests.get(f"{self.base_url}{path}", timeout=5)
                if resp.status_code == 200:
                    self.add_finding(
                        "HIGH",
                        "Management Endpoints",
                        f"Exposed Management Endpoint: {path}",
                        "Management or admin endpoint accessible without restrictions",
                        f"Status: {resp.status_code}, Size: {len(resp.content)} bytes",
                        "Restrict access to management endpoints via network controls and strong authentication"
                    )
                elif resp.status_code in [401, 403]:
                    print(f"  {path}: Protected ({resp.status_code})")
            except:
                pass
    
    def run_full_assessment(self, test_endpoints: Optional[List[str]] = None):
        """Run complete security assessment"""
        if test_endpoints is None:
            test_endpoints = ["/api/users", "/api/data", "/api/search"]
        
        print(f"\n{'='*60}")
        print(f"API Security Assessment: {self.base_url}")
        print(f"{'='*60}")
        
        self.test_tls_security()
        self.test_authentication()
        self.test_authorization(test_endpoints)
        self.test_input_validation(test_endpoints)
        self.test_ssrf(test_endpoints)
        self.test_rate_limiting(test_endpoints[0] if test_endpoints else "/")
        self.test_information_disclosure()
        self.test_management_endpoints()
        
        self.generate_report()
    
    def generate_report(self):
        """Generate assessment report"""
        print(f"\n{'='*60}")
        print("ASSESSMENT SUMMARY")
        print(f"{'='*60}\n")
        
        severity_counts = {
            "CRITICAL": len([f for f in self.findings if f.severity == "CRITICAL"]),
            "HIGH": len([f for f in self.findings if f.severity == "HIGH"]),
            "MEDIUM": len([f for f in self.findings if f.severity == "MEDIUM"]),
            "LOW": len([f for f in self.findings if f.severity == "LOW"]),
        }
        
        print(f"Total Findings: {len(self.findings)}")
        print(f"  🔴 CRITICAL: {severity_counts['CRITICAL']}")
        print(f"  🟠 HIGH:     {severity_counts['HIGH']}")
        print(f"  🟡 MEDIUM:   {severity_counts['MEDIUM']}")
        print(f"  🟢 LOW:      {severity_counts['LOW']}")
        
        if self.findings:
            print(f"\n{'='*60}")
            print("DETAILED FINDINGS")
            print(f"{'='*60}\n")
            
            for i, finding in enumerate(self.findings, 1):
                print(f"{i}. [{finding.severity}] {finding.title}")
                print(f"   Category: {finding.category}")
                print(f"   Description: {finding.description}")
                print(f"   Evidence: {finding.evidence}")
                print(f"   Remediation: {finding.remediation}")
                print()
            
            # Export to JSON
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            report_file = f"api_security_assessment_{timestamp}.json"
            with open(report_file, 'w') as f:
                json.dump([
                    {
                        "severity": f.severity,
                        "category": f.category,
                        "title": f.title,
                        "description": f.description,
                        "evidence": f.evidence,
                        "remediation": f.remediation
                    }
                    for f in self.findings
                ], f, indent=2)
            
            print(f"Report saved to: {report_file}")
        else:
            print("\n✓ No security issues found!")


def main():
    """Main entry point for CLI usage"""
    parser = argparse.ArgumentParser(
        description="Comprehensive API Security Assessment Tool",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s --url https://api.example.com
  %(prog)s --url https://api.example.com --token "Bearer xyz123"
  %(prog)s --url https://api.example.com --endpoints /api/users,/api/data,/api/search
        """
    )
    
    parser.add_argument(
        '--url',
        required=True,
        help='Base URL of the API to assess (e.g., https://api.example.com)'
    )
    
    parser.add_argument(
        '--token',
        help='Authentication token (JWT or Bearer token)'
    )
    
    parser.add_argument(
        '--endpoints',
        help='Comma-separated list of endpoints to test (e.g., /api/users,/api/data)'
    )
    
    args = parser.parse_args()
    
    # Parse endpoints
    test_endpoints = None
    if args.endpoints:
        test_endpoints = [e.strip() for e in args.endpoints.split(',')]
    
    # Run assessment
    assessor = APISecurityAssessment(args.url, args.token)
    assessor.run_full_assessment(test_endpoints)


if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions