import os
import jwt
import json
import hashlib
from datetime import datetime
from urllib.parse import urlparse, urlencode, parse_qs
class APIGateway:
def __init__(self):
self.jwt_secret = os.getenv("JWT_SECRET")
self.rate_limit = 100 # requests per minute
async def handler(self, request):
# 1. Authentication
auth_result = self.authenticate(request)
if auth_result["error"]:
return self.json_response(auth_result, 401)
user_id = auth_result["user_id"]
# 2. Rate limiting
if await self.is_rate_limited(user_id):
return self.json_response(
{"error": "Rate limit exceeded"},
429,
{"Retry-After": "60"}
)
# 3. Cache check
cache_key = self.cache_key(request)
cached = await self.kv.get(cache_key)
if cached:
return self.json_response(json.loads(cached))
# 4. Route to backend
response = await self.route_request(request)
# 5. Cache response
if response["status"] == 200:
await self.kv.put(cache_key, json.dumps(response), ttl=300)
return response
def authenticate(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return {"error": "Missing token", "user_id": None}
token = auth_header[7:]
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return {"error": None, "user_id": payload["sub"]}
except jwt.InvalidTokenError:
return {"error": "Invalid token", "user_id": None}
async def is_rate_limited(self, user_id):
key = f"rate:{user_id}:{datetime.utcnow().strftime('%Y%m%d%H%M')}"
count = int(await self.kv.get(key) or 0)
if count >= self.rate_limit:
return True
await self.kv.put(key, str(count + 1), ttl=60)
return False
def cache_key(self, request):
# Generate cache key from method + path + query string
return f"cache:{request.method}:{request.path}:{hashlib.md5(request.url.encode()).hexdigest()}"
async def route_request(self, request):
parsed = urlparse(request.url)
path = parsed.path
query = parsed.query # Preserve query string
# Route based on path prefix
if path.startswith("/api/users"):
backend = "https://users-service.internal"
elif path.startswith("/api/orders"):
backend = "https://orders-service.internal"
else:
return self.json_response({"error": "Not found"}, 404)
# Forward request with query string preserved
upstream_url = f"{backend}{path}"
if query:
upstream_url = f"{upstream_url}?{query}"
response = await fetch(upstream_url, {
"method": request.method,
"headers": request.headers,
"body": await request.text() if request.method != "GET" else None
})
return {
"status": response.status,
"headers": dict(response.headers),
"body": await response.text()
}