Overview
Network Security (NS) controls protect network communications for AI assistants that operate in networked environments. These controls ensure data confidentiality, integrity, and authenticity for all network traffic.
NS-01: TLS Enforcement
Metadata
| Attribute | Value |
|---|---|
| Control ID | NS-01 |
| Requirement Level | MUST |
| Assurance Levels | L2, L3 |
| STRIDE | Information Disclosure, Tampering |
| AATT | AATT-E1 |
Requirement
Implementations MUST enforce TLS for all network communications. TLS version MUST be 1.2 or higher. Weak cipher suites MUST be disabled.
Rationale
Network traffic without encryption can be intercepted, revealing sensitive code, credentials, and conversations. TLS provides confidentiality and integrity for data in transit.
Implementation Guidance
TLS Configuration:
```yaml # TLS enforcement configuration network: tls: enabled: true required: true # No fallback to unencrypted min_version: “TLS1.2” preferred_version: “TLS1.3”
# Cipher suites (TLS 1.2)
cipher_suites:
allowed:
- "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
- "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
- "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384"
- "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"
denied:
- "*CBC*"
- "*RC4*"
- "*3DES*"
- "*NULL*"
- "*EXPORT*"
- "*DES*"
- "*MD5*"
# TLS 1.3 cipher suites
tls13_suites:
- "TLS_AES_256_GCM_SHA384"
- "TLS_AES_128_GCM_SHA256"
- "TLS_CHACHA20_POLY1305_SHA256"
# HTTP upgrade
upgrade_insecure:
enabled: true
log_upgrades: true
```
Implementation:
```python import ssl import urllib3 import requests
class SecureNetworkClient:
"""Network client with enforced TLS"""
def __init__(self, config):
self.config = config
self.session = self._create_secure_session()
def _create_secure_session(self):
"""Create session with TLS enforcement"""
session = requests.Session()
# Create SSL context
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# Set minimum version
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
# Prefer TLS 1.3
if hasattr(ssl.TLSVersion, 'TLSv1_3'):
ctx.maximum_version = ssl.TLSVersion.TLSv1_3
# Disable weak ciphers
ctx.set_ciphers(':'.join([
'ECDHE+AESGCM',
'DHE+AESGCM',
'!aNULL',
'!eNULL',
'!EXPORT',
'!DES',
'!RC4',
'!3DES',
'!MD5',
'!PSK',
]))
# Require certificates
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_default_certs()
# Mount adapter with SSL context
adapter = requests.adapters.HTTPAdapter()
adapter.init_poolmanager(ssl_context=ctx)
session.mount('https://', adapter)
return session
def fetch(self, url):
"""Fetch URL with TLS enforcement"""
# Upgrade HTTP to HTTPS
if url.startswith('http://'):
if self.config['network']['tls']['upgrade_insecure']['enabled']:
url = url.replace('http://', 'https://', 1)
logger.info(f"Upgraded to HTTPS: {url}")
else:
raise SecurityError("HTTP not allowed, TLS required")
response = self.session.get(url)
return response
def verify_tls_connection(host, port=443):
"""Verify TLS configuration of remote host"""
import socket
context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_2
with socket.create_connection((host, port)) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
info = {
'version': ssock.version(),
'cipher': ssock.cipher(),
'peer_cert': ssock.getpeercert(),
}
# Verify minimum version
if ssock.version() not in ('TLSv1.2', 'TLSv1.3'):
raise SecurityError(f"Weak TLS version: {ssock.version()}")
return info
```
Verification
Automated Verification
```bash ossasai-audit.sh --check NS-01 # Test TLS enforcement curl -v http://api.example.com/ # Expected: Redirected to HTTPS or connection refused # Verify TLS version openssl s_client -connect api.example.com:443 -tls1_1 # Expected: Connection failed (TLS 1.1 not allowed) # Check cipher suites nmap --script ssl-enum-ciphers -p 443 api.example.com ```Manual Verification
1. Verify TLS is required for all connections 2. Test with weak TLS versions (should fail) 3. Test with weak cipher suites (should fail) 4. Verify HTTP automatically upgrades to HTTPS 5. Check for plaintext data in network tracesEvidence
- TLS configuration
- SSL scan results (SSL Labs, testssl.sh)
- Network capture showing no plaintext
- Cipher suite verification
Remediation
- Enable TLS for all endpoints
- Set minimum TLS version to 1.2
- Configure strong cipher suites
- Disable plaintext HTTP
- Enable HTTP to HTTPS upgrade
References
- Mozilla TLS Configuration Guidelines
- NIST SP 800-52: TLS Guidelines
- OWASP TLS Cheat Sheet
NS-02: Certificate Validation
Metadata
| Attribute | Value |
|---|---|
| Control ID | NS-02 |
| Requirement Level | MUST |
| Assurance Levels | L2, L3 |
| STRIDE | Spoofing, Tampering |
| AATT | AATT-C5 |
Requirement
Implementations MUST validate TLS certificates for all connections. Validation MUST include certificate chain verification, hostname verification, and expiration checking. Certificate pinning SHOULD be used for critical connections (L3).
Rationale
TLS without certificate validation is vulnerable to man-in-the-middle attacks. An attacker with network access can intercept and modify traffic using a fraudulent certificate.
Implementation Guidance
Certificate Validation:
```yaml # Certificate validation configuration network: certificates: validation: enabled: true verify_chain: true verify_hostname: true check_expiration: true check_revocation: true # OCSP/CRL
# Trust store
trust_store:
use_system: true
additional_certs:
- path: "/etc/ocsas/certs/internal-ca.pem"
purpose: "internal_services"
# Certificate pinning (L3)
pinning:
enabled: true
pins:
- host: "api.ocsas.dev"
pins:
- "sha256/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
- "sha256/BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB="
include_subdomains: true
# Transparency
certificate_transparency:
enabled: true
require_scts: true # Signed Certificate Timestamps
# Warnings
warn_on:
- "self_signed"
- "expiring_soon" # Within 30 days
- "weak_signature" # SHA-1
```
Implementation:
```python import ssl import certifi import hashlib from datetime import datetime, timedelta
class CertificateValidator:
def __init__(self, config):
self.config = config
self.pins = self._load_pins(config)
def validate_certificate(self, cert_der, hostname):
"""Comprehensive certificate validation"""
results = {
'hostname': hostname,
'valid': True,
'checks': []
}
# Parse certificate
cert = ssl.DER_cert_to_PEM_cert(cert_der)
x509 = self._parse_cert(cert)
# 1. Chain validation (handled by SSL context)
results['checks'].append({
'check': 'chain',
'passed': True # If we got here, chain validated
})
# 2. Hostname verification
hostname_valid = self._verify_hostname(x509, hostname)
results['checks'].append({
'check': 'hostname',
'passed': hostname_valid
})
if not hostname_valid:
results['valid'] = False
# 3. Expiration check
expiry_check = self._check_expiration(x509)
results['checks'].append(expiry_check)
if not expiry_check['passed']:
results['valid'] = False
# 4. Revocation check (OCSP)
revocation_check = self._check_revocation(x509)
results['checks'].append(revocation_check)
if not revocation_check['passed']:
results['valid'] = False
# 5. Pin validation (if configured)
if self.config['network']['certificates']['pinning']['enabled']:
pin_check = self._verify_pin(cert_der, hostname)
results['checks'].append(pin_check)
if not pin_check['passed']:
results['valid'] = False
# 6. CT validation
if self.config['network']['certificates']['certificate_transparency']['enabled']:
ct_check = self._verify_ct(x509)
results['checks'].append(ct_check)
# CT failure is warning only
return results
def _verify_pin(self, cert_der, hostname):
"""Verify certificate pin"""
# Calculate SPKI hash
spki_hash = self._get_spki_hash(cert_der)
# Find pins for hostname
for pin_config in self.config['network']['certificates']['pinning']['pins']:
if self._hostname_matches(hostname, pin_config['host']):
if spki_hash in pin_config['pins']:
return {'check': 'pin', 'passed': True}
else:
return {
'check': 'pin',
'passed': False,
'reason': f'Certificate pin mismatch for {hostname}'
}
# No pin configured for this host
return {'check': 'pin', 'passed': True, 'note': 'no_pin_configured'}
def _get_spki_hash(self, cert_der):
"""Calculate SPKI SHA-256 hash"""
from cryptography import x509
from cryptography.hazmat.primitives import serialization
cert = x509.load_der_x509_certificate(cert_der)
spki = cert.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return 'sha256/' + base64.b64encode(
hashlib.sha256(spki).digest()
).decode()
def _check_expiration(self, x509):
"""Check certificate expiration"""
not_after = x509.not_valid_after
now = datetime.utcnow()
if now > not_after:
return {
'check': 'expiration',
'passed': False,
'reason': f'Certificate expired on {not_after}'
}
# Warn if expiring soon
warning_threshold = now + timedelta(days=30)
if not_after < warning_threshold:
return {
'check': 'expiration',
'passed': True,
'warning': f'Certificate expires on {not_after}'
}
return {'check': 'expiration', 'passed': True}
```
Verification
Automated Verification
```bash ossasai-audit.sh --check NS-02 # Test with invalid certificate curl --cacert /dev/null https://api.example.com/ # Expected: Certificate verification failed # Test with self-signed curl https://self-signed.badssl.com/ # Expected: Rejected # Test hostname mismatch curl https://wrong.host.badssl.com/ # Expected: Hostname verification failed ```Manual Verification
1. Verify certificate validation is enabled 2. Test with invalid certificates (should fail) 3. Test with hostname mismatches (should fail) 4. Test with expired certificates (should fail) 5. Verify certificate pinning (L3) 6. Test pin bypass attempts (should fail)Evidence
- Certificate validation configuration
- Test results for invalid certificate scenarios
- Pin configuration (L3)
- Certificate transparency verification
Remediation
- Enable full certificate validation
- Configure hostname verification
- Enable revocation checking
- Implement certificate pinning (L3)
- Enable certificate transparency
References
- OWASP Certificate and Public Key Pinning
- RFC 7469: HTTP Public Key Pinning
- Certificate Transparency (RFC 6962)
NS-03: API Endpoint Security
Metadata
| Attribute | Value |
|---|---|
| Control ID | NS-03 |
| Requirement Level | MUST |
| Assurance Levels | L2, L3 |
| STRIDE | Spoofing, Tampering, Denial of Service |
| AATT | AATT-C5, AATT-D1 |
Requirement
Implementations MUST secure API endpoints with authentication, authorization, input validation, and rate limiting. API keys and tokens MUST be transmitted securely.
Rationale
API endpoints are primary attack targets. Without proper security, attackers can access unauthorized data, abuse resources, or compromise the system.
Implementation Guidance
API Security Configuration:
```yaml # API endpoint security configuration api: # Authentication authentication: required: true methods: - type: “bearer_token” header: “Authorization” format: “Bearer {token}” - type: “api_key” header: “X-API-Key”
# Token configuration
tokens:
algorithm: "RS256"
expiration_seconds: 3600
refresh_enabled: true
# Authorization
authorization:
enabled: true
policy_engine: "opa"
default_deny: true
# Rate limiting
rate_limiting:
enabled: true
default:
requests_per_minute: 60
requests_per_hour: 1000
burst: 10
# Per-endpoint overrides
endpoints:
"/api/v1/execute":
requests_per_minute: 10 # More restrictive
# Input validation
input_validation:
max_request_size_mb: 10
max_header_size_kb: 8
content_types:
allowed:
- "application/json"
- "multipart/form-data"
sanitize_headers: true
# Security headers
response_headers:
"X-Content-Type-Options": "nosniff"
"X-Frame-Options": "DENY"
"X-XSS-Protection": "1; mode=block"
"Strict-Transport-Security": "max-age=31536000; includeSubDomains"
"Content-Security-Policy": "default-src 'none'"
```
Implementation:
```python from functools import wraps import time import jwt
class APISecurityMiddleware:
def __init__(self, config):
self.config = config
self.rate_limiter = RateLimiter(config['api']['rate_limiting'])
def secure_endpoint(self, requires_auth=True, rate_limit=None):
"""Decorator for securing API endpoints"""
def decorator(func):
@wraps(func)
async def wrapper(request, *args, **kwargs):
# 1. Rate limiting
client_id = self._get_client_id(request)
limit = rate_limit or self.config['api']['rate_limiting']['default']
if not self.rate_limiter.allow(client_id, limit):
raise RateLimitExceeded("Rate limit exceeded")
# 2. Authentication
if requires_auth:
auth_result = self._authenticate(request)
if not auth_result['authenticated']:
raise AuthenticationError(auth_result['reason'])
request.user = auth_result['user']
# 3. Input validation
validation_result = self._validate_input(request)
if not validation_result['valid']:
raise ValidationError(validation_result['errors'])
# 4. Execute handler
response = await func(request, *args, **kwargs)
# 5. Add security headers
self._add_security_headers(response)
return response
return wrapper
return decorator
def _authenticate(self, request):
"""Authenticate request"""
# Try bearer token
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
return self._verify_jwt(token)
# Try API key
api_key = request.headers.get('X-API-Key')
if api_key:
return self._verify_api_key(api_key)
return {'authenticated': False, 'reason': 'No credentials provided'}
def _verify_jwt(self, token):
"""Verify JWT token"""
try:
payload = jwt.decode(
token,
self.config['api']['authentication']['tokens']['public_key'],
algorithms=[self.config['api']['authentication']['tokens']['algorithm']]
)
# Check expiration
if payload.get('exp', 0) < time.time():
return {'authenticated': False, 'reason': 'Token expired'}
return {
'authenticated': True,
'user': payload.get('sub'),
'scopes': payload.get('scopes', [])
}
except jwt.InvalidTokenError as e:
return {'authenticated': False, 'reason': str(e)}
def _validate_input(self, request):
"""Validate request input"""
errors = []
# Check request size
content_length = int(request.headers.get('Content-Length', 0))
max_size = self.config['api']['input_validation']['max_request_size_mb'] * 1024 * 1024
if content_length > max_size:
errors.append(f"Request too large: {content_length} > {max_size}")
# Check content type
content_type = request.headers.get('Content-Type', '').split(';')[0]
allowed = self.config['api']['input_validation']['content_types']['allowed']
if content_type and content_type not in allowed:
errors.append(f"Content-Type not allowed: {content_type}")
return {'valid': len(errors) == 0, 'errors': errors}
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, config):
self.config = config
self.buckets = {}
def allow(self, client_id, limits):
"""Check if request is allowed"""
now = time.time()
bucket = self.buckets.get(client_id)
if not bucket:
bucket = {
'tokens': limits['burst'],
'last_update': now
}
self.buckets[client_id] = bucket
# Refill tokens
elapsed = now - bucket['last_update']
refill_rate = limits['requests_per_minute'] / 60
bucket['tokens'] = min(
limits['burst'],
bucket['tokens'] + elapsed * refill_rate
)
bucket['last_update'] = now
# Check if allowed
if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False
```
Verification
Automated Verification
```bash ossasai-audit.sh --check NS-03 # Test authentication requirement curl -X POST https://api.example.com/execute # Expected: 401 Unauthorized # Test rate limiting for i in {1..100}; do curl -s https://api.example.com/; done # Expected: 429 Too Many Requests after limit # Test input validation curl -X POST -d "@large_file.bin" https://api.example.com/ # Expected: 413 Payload Too Large ```Manual Verification
1. Verify authentication is required 2. Test with invalid credentials (should fail) 3. Test rate limiting enforcement 4. Test input validation 5. Verify security headers in responses 6. Test authorization policiesEvidence
- API security configuration
- Authentication test results
- Rate limiting test results
- Security header verification
Remediation
- Enable authentication for all endpoints
- Implement rate limiting
- Configure input validation
- Add security headers
- Enable authorization policies
References
- OWASP API Security Top 10
- NIST SP 800-53 SC-8: Transmission Confidentiality and Integrity
NS-04: Network Traffic Analysis
Metadata
| Attribute | Value |
|---|---|
| Control ID | NS-04 |
| Requirement Level | MUST |
| Assurance Levels | L3 |
| STRIDE | Information Disclosure |
| AATT | AATT-E1 |
Requirement
Implementations MUST monitor and analyze network traffic for security anomalies. Monitoring MUST include detection of data exfiltration patterns, unauthorized destinations, and suspicious traffic volumes.
Rationale
Network monitoring provides defense-in-depth against data exfiltration and command-and-control communications. Even with other controls, monitoring can detect novel attacks or control failures.
Implementation Guidance
Traffic Analysis Configuration:
```yaml # Network traffic analysis configuration monitoring: network: enabled: true
# Traffic capture
capture:
enabled: true
interface: "all"
bpf_filter: "tcp port 443 or tcp port 80"
retention_hours: 168 # 7 days
# Destination monitoring
destinations:
track_all: true
allowlist:
- "*.ocsas.dev"
- "api.openai.com"
- "registry.npmjs.org"
alert_on_unknown: true
# Volume monitoring
volume:
track_per_destination: true
alert_thresholds:
single_request_mb: 10
hourly_total_mb: 100
daily_total_mb: 500
# Pattern detection
patterns:
# Data exfiltration indicators
exfiltration:
- pattern: "base64_encoded_large_payload"
description: "Large base64 data in request"
threshold_kb: 50
- pattern: "dns_tunneling"
description: "Suspicious DNS query patterns"
query_length_threshold: 50
- pattern: "high_entropy_payload"
description: "Encrypted or compressed data"
entropy_threshold: 0.9
# C2 indicators
command_and_control:
- pattern: "periodic_beaconing"
description: "Regular interval connections"
interval_variance_threshold: 0.1
- pattern: "domain_generation"
description: "DGA-like domain patterns"
# Alerting
alerts:
destinations:
- type: "syslog"
facility: "security"
- type: "webhook"
url: "${ALERT_WEBHOOK_URL}"
```
Traffic Analysis Implementation:
```python import asyncio from collections import defaultdict from datetime import datetime, timedelta import math
class NetworkTrafficAnalyzer:
def __init__(self, config):
self.config = config
self.destination_history = defaultdict(list)
self.volume_tracker = VolumeTracker()
self.anomaly_detector = AnomalyDetector(config)
async def analyze_connection(self, connection):
"""Analyze network connection"""
alerts = []
# 1. Check destination allowlist
if not self._is_allowed_destination(connection.destination):
alerts.append({
'type': 'unknown_destination',
'severity': 'high',
'destination': connection.destination,
'timestamp': datetime.utcnow()
})
# 2. Track volume
self.volume_tracker.record(
connection.destination,
connection.bytes_sent,
connection.bytes_received
)
volume_alerts = self.volume_tracker.check_thresholds(
self.config['monitoring']['network']['volume']
)
alerts.extend(volume_alerts)
# 3. Pattern detection
pattern_alerts = await self.anomaly_detector.analyze(connection)
alerts.extend(pattern_alerts)
# 4. Send alerts
if alerts:
await self._send_alerts(alerts)
return alerts
def _is_allowed_destination(self, destination):
"""Check if destination is in allowlist"""
allowlist = self.config['monitoring']['network']['destinations']['allowlist']
for pattern in allowlist:
if self._match_pattern(destination, pattern):
return True
return False
class AnomalyDetector:
def __init__(self, config):
self.config = config
self.beacon_tracker = BeaconTracker()
async def analyze(self, connection):
"""Detect anomalous patterns"""
alerts = []
# Check for data exfiltration patterns
if connection.payload:
# High entropy check
entropy = self._calculate_entropy(connection.payload)
if entropy > self.config['monitoring']['network']['patterns']['exfiltration'][2]['entropy_threshold']:
alerts.append({
'type': 'high_entropy_payload',
'severity': 'medium',
'entropy': entropy,
'destination': connection.destination
})
# Large base64 check
if self._contains_large_base64(connection.payload):
alerts.append({
'type': 'base64_exfiltration',
'severity': 'high',
'destination': connection.destination
})
# Check for beaconing
beacon_alert = self.beacon_tracker.check_beacon(
connection.destination,
connection.timestamp
)
if beacon_alert:
alerts.append(beacon_alert)
return alerts
def _calculate_entropy(self, data):
"""Calculate Shannon entropy"""
if not data:
return 0
byte_counts = defaultdict(int)
for byte in data:
byte_counts[byte] += 1
entropy = 0
length = len(data)
for count in byte_counts.values():
prob = count / length
entropy -= prob * math.log2(prob)
return entropy / 8 # Normalize to 0-1
class BeaconTracker:
"""Detect periodic beaconing behavior"""
def __init__(self, window_size=10):
self.connections = defaultdict(list)
self.window_size = window_size
def check_beacon(self, destination, timestamp):
"""Check for beaconing pattern"""
self.connections[destination].append(timestamp)
# Keep only recent connections
cutoff = timestamp - timedelta(hours=24)
self.connections[destination] = [
t for t in self.connections[destination]
if t > cutoff
]
# Need enough samples
times = self.connections[destination]
if len(times) < self.window_size:
return None
# Calculate intervals
intervals = []
for i in range(1, len(times)):
intervals.append((times[i] - times[i-1]).total_seconds())
# Check for regularity
if len(intervals) >= 2:
mean_interval = sum(intervals) / len(intervals)
variance = sum((i - mean_interval)**2 for i in intervals) / len(intervals)
cv = (variance ** 0.5) / mean_interval if mean_interval > 0 else 1
# Low coefficient of variation indicates beaconing
if cv < 0.1: # Very regular
return {
'type': 'periodic_beaconing',
'severity': 'high',
'destination': destination,
'interval_seconds': mean_interval,
'variance': cv
}
return None
```
Verification
Automated Verification
```bash ossasai-audit.sh --check NS-04 # Test unknown destination alerting curl https://unknown-domain.example.com/ # Expected: Alert generated # Test volume alerting # Upload large file curl -X POST -d "@large_file.bin" https://api.example.com/ # Expected: Volume alert if threshold exceeded ```Manual Verification
1. Verify traffic monitoring is enabled 2. Test unknown destination detection 3. Test volume threshold alerting 4. Verify exfiltration pattern detection 5. Test beaconing detection 6. Review alert deliveryEvidence
- Traffic monitoring configuration
- Sample alerts and detections
- Anomaly detection test results
- Alert delivery verification
Remediation
- Enable network traffic monitoring
- Configure destination allowlist
- Set volume thresholds
- Enable pattern detection
- Configure alerting channels
References
- MITRE ATT&CK: Exfiltration
- NIST SP 800-53 SI-4: System Monitoring