Architectures
Global API Gateway
Route API traffic globally with caching, auth, and rate limiting.
Telecom Event Processor
Process voice, SMS, and messaging events in real-time.
Real-Time Media Pipeline
Transform and route media streams at the edge.
IoT Data Ingestion
Collect and process IoT sensor data globally.
Global API Gateway
A globally distributed API gateway that provides authentication, rate limiting, and caching for backend services.Architecture Diagram
┌─────────────────────────────────────────────────────────────────────┐
│ Client Requests │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Telnyx Edge (Global PoPs) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Auth │→ │Rate Limiter │→ │ Cache │→ │ Router │ │
│ │ Middleware │ │ │ │ Layer │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Service │ │ Service │ │ Service │
│ A │ │ B │ │ C │
└──────────┘ └──────────┘ └──────────┘
Components
| Component | Product | Purpose |
|---|---|---|
| Edge Function | Edge Compute | Request processing, routing |
| Rate Limiting | Edge KV | Distributed counters |
| Caching | Edge KV | Response cache |
| Auth | Secrets | JWT verification keys |
Implementation
import os
import jwt
import json
import hashlib
from datetime import datetime
from urllib.parse import urlparse, urlencode, parse_qs
class APIGateway:
def __init__(self):
self.jwt_secret = os.getenv("JWT_SECRET")
self.rate_limit = 100 # requests per minute
async def handler(self, request):
# 1. Authentication
auth_result = self.authenticate(request)
if auth_result["error"]:
return self.json_response(auth_result, 401)
user_id = auth_result["user_id"]
# 2. Rate limiting
if await self.is_rate_limited(user_id):
return self.json_response(
{"error": "Rate limit exceeded"},
429,
{"Retry-After": "60"}
)
# 3. Cache check
cache_key = self.cache_key(request)
cached = await self.kv.get(cache_key)
if cached:
return self.json_response(json.loads(cached))
# 4. Route to backend
response = await self.route_request(request)
# 5. Cache response
if response["status"] == 200:
await self.kv.put(cache_key, json.dumps(response), ttl=300)
return response
def authenticate(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return {"error": "Missing token", "user_id": None}
token = auth_header[7:]
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return {"error": None, "user_id": payload["sub"]}
except jwt.InvalidTokenError:
return {"error": "Invalid token", "user_id": None}
async def is_rate_limited(self, user_id):
key = f"rate:{user_id}:{datetime.utcnow().strftime('%Y%m%d%H%M')}"
count = int(await self.kv.get(key) or 0)
if count >= self.rate_limit:
return True
await self.kv.put(key, str(count + 1), ttl=60)
return False
def cache_key(self, request):
# Generate cache key from method + path + query string
return f"cache:{request.method}:{request.path}:{hashlib.md5(request.url.encode()).hexdigest()}"
async def route_request(self, request):
parsed = urlparse(request.url)
path = parsed.path
query = parsed.query # Preserve query string
# Route based on path prefix
if path.startswith("/api/users"):
backend = "https://users-service.internal"
elif path.startswith("/api/orders"):
backend = "https://orders-service.internal"
else:
return self.json_response({"error": "Not found"}, 404)
# Forward request with query string preserved
upstream_url = f"{backend}{path}"
if query:
upstream_url = f"{upstream_url}?{query}"
response = await fetch(upstream_url, {
"method": request.method,
"headers": request.headers,
"body": await request.text() if request.method != "GET" else None
})
return {
"status": response.status,
"headers": dict(response.headers),
"body": await response.text()
}
Use Cases
- Microservices gateway — Unified entry point for distributed services
- Third-party API proxy — Add auth and rate limiting to external APIs
- Multi-region deployment — Route to nearest backend region
Telecom Event Processor
Process voice calls, SMS messages, and fax events in real-time with low latency.Architecture Diagram
┌──────────────────────────────────────────────────────────────┐
│ Telnyx Network │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Voice │ │ SMS │ │ Fax │ │ Number │ │
│ │ Calls │ │ Messages │ │ Events │ │ Lookup │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼────────────┘
│ │ │ │
└─────────────┴──────┬──────┴─────────────┘
▼
┌──────────────────────────────┐
│ Edge Compute Function │
│ ┌─────────┐ ┌───────────┐ │
│ │ Webhook │→ │ Business │ │
│ │ Handler │ │ Logic │ │
│ └─────────┘ └───────────┘ │
└──────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ CRM │ │ Analytics│ │ Alerts │
│ System │ │ Platform │ │ Service │
└──────────┘ └──────────┘ └──────────┘
Implementation
// Unified telecom event handler
export async function handler(request) {
const webhook = await request.json();
const eventType = webhook.data.event_type;
// Route by event type
switch (eventType) {
case "call.initiated":
case "call.answered":
case "call.hangup":
return handleVoiceEvent(webhook);
case "message.received":
case "message.sent":
return handleSMSEvent(webhook);
case "fax.received":
return handleFaxEvent(webhook);
case "fax.sent":
return handleFaxEvent(webhook);
default:
console.log(`Unhandled event: ${eventType}`);
return new Response("OK");
}
}
async function handleVoiceEvent(webhook) {
const event = webhook.data;
const callId = event.payload.call_control_id;
// Log to analytics
await fetch(process.env.ANALYTICS_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
type: "voice",
event: event.event_type,
call_id: callId,
from: event.payload.from,
to: event.payload.to,
timestamp: event.occurred_at
})
});
// Business logic
if (event.event_type === "call.initiated") {
// Check CRM for caller info
const caller = await lookupCaller(event.payload.from);
if (caller.vip) {
// Route VIP to priority queue
return generateTeXML(`
<Response>
<Say>Welcome back, ${caller.name}. Connecting you now.</Say>
<Dial><Queue name="vip"/></Dial>
</Response>
`);
}
}
return new Response("OK");
}
async function handleSMSEvent(webhook) {
const event = webhook.data;
if (event.event_type === "message.received") {
const text = event.payload.text.toLowerCase();
const from = event.payload.from.phone_number;
// Keyword detection (TCPA-compliant opt-out keywords)
if (text.includes("stop") || text.includes("unsubscribe") ||
text.includes("cancel") || text.includes("quit")) {
await updateOptOut(from, true);
await sendSMS(from, "You have been unsubscribed. Reply START to resubscribe.");
} else if (text.includes("help")) {
await sendSMS(from, "Reply STOP to unsubscribe, START to resubscribe.");
} else if (text.includes("start")) {
await updateOptOut(from, false);
await sendSMS(from, "You have been resubscribed.");
}
}
return new Response("OK");
}
async function handleFaxEvent(webhook) {
const event = webhook.data;
// Log fax event
await fetch(process.env.ANALYTICS_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
type: "fax",
event: event.event_type,
fax_id: event.payload.fax_id,
from: event.payload.from,
to: event.payload.to,
pages: event.payload.page_count,
timestamp: event.occurred_at
})
});
// Download and store fax if received
if (event.event_type === "fax.received" && event.payload.media_url) {
await storeFax(event.payload.fax_id, event.payload.media_url);
}
return new Response("OK");
}
Use Cases
- Contact center — Route calls based on caller history
- SMS marketing — Process opt-ins/outs in real-time
- Two-factor auth — Generate and verify SMS codes
Real-Time Media Pipeline
Transform and route media streams for video, audio, and real-time communication.Architecture Diagram
┌─────────────┐ ┌─────────────────────────────────────────┐
│ Media │ │ Telnyx Edge │
│ Source │─────▶│ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ (Camera) │ │ │ Ingest │→ │Transform │→ │ Route │ │
└─────────────┘ │ └──────────┘ └──────────┘ └────────┘ │
└─────────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Storage │ │ CDN │ │ LiveKit │
│ (S3) │ │ Delivery │ │ Stream │
└──────────┘ └──────────┘ └──────────┘
Implementation
import asyncio
from datetime import datetime
class MediaPipeline:
async def handler(self, request):
content_type = request.headers.get("Content-Type", "")
if "video" in content_type or "audio" in content_type:
return await self.process_media(request)
elif "application/json" in content_type:
return await self.handle_control(request)
else:
return {"status": 400, "body": "Unsupported content type"}
async def process_media(self, request):
media_data = await request.bytes()
stream_id = request.headers.get("X-Stream-ID")
# Transform (e.g., transcode, watermark)
processed = await self.transform(media_data)
# Route to destinations
await asyncio.gather(
self.store_archive(stream_id, processed),
self.push_to_cdn(stream_id, processed),
self.forward_to_livekit(stream_id, processed)
)
return {"status": 200, "body": "OK"}
async def transform(self, media_data):
# Apply transformations
# - Resize/transcode
# - Add watermark
# - Extract thumbnails
return media_data
async def store_archive(self, stream_id, data):
await self.s3.put_object(
Bucket="media-archive",
Key=f"{stream_id}/{datetime.utcnow().isoformat()}.webm",
Body=data
)
async def push_to_cdn(self, stream_id, data):
await fetch(f"{self.cdn_ingest_url}/{stream_id}", {
"method": "POST",
"body": data
})
async def forward_to_livekit(self, stream_id, data):
# Forward to LiveKit SFU for real-time distribution
await fetch(f"{self.livekit_url}/ingest/{stream_id}", {
"method": "POST",
"headers": {"Authorization": f"Bearer {self.livekit_token}"},
"body": data
})
async def handle_control(self, request):
# Handle control messages (start/stop stream, change settings)
body = await request.json()
action = body.get("action")
stream_id = body.get("stream_id")
if action == "start":
# Initialize stream resources
return {"status": 200, "body": {"stream_id": stream_id, "status": "started"}}
elif action == "stop":
# Clean up stream resources
return {"status": 200, "body": {"stream_id": stream_id, "status": "stopped"}}
else:
return {"status": 400, "body": {"error": "Unknown action"}}
Use Cases
- Live streaming — Ingest and distribute live video
- Video conferencing — Real-time media processing
- Surveillance — Edge processing for camera feeds
IoT Data Ingestion
Collect sensor data from IoT devices globally with edge processing and aggregation.Architecture Diagram
┌──────────────────────────────────────────────────────────────────┐
│ IoT Devices (Global) │
│ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │Sensor │ │Sensor │ │Sensor │ │Sensor │ │Sensor │ │
│ │ A │ │ B │ │ C │ │ D │ │ E │ │
│ └───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘ │
└──────┼──────────┼──────────┼──────────┼──────────┼───────────────┘
│ │ │ │ │
└──────────┴────┬─────┴──────────┴──────────┘
▼
┌──────────────────────────────┐
│ Telnyx Edge (Per-PoP) │
│ ┌─────────┐ ┌───────────┐ │
│ │Validate │→ │ Aggregate │ │
│ │ + Parse │ │ + Buffer │ │
│ └─────────┘ └───────────┘ │
└──────────────────────────────┘
│
▼
┌──────────────────────────────┐
│ Central Data Lake │
└──────────────────────────────┘
Implementation
// IoT data ingestion with edge processing
// Note: For production, use a persistent queue (e.g., Kafka, Redis)
// to avoid data loss. This example shows the processing pattern.
export async function handler(request) {
// Parse sensor data
const data = await request.json();
// Validate
if (!isValidSensorData(data)) {
return new Response(JSON.stringify({ error: "Invalid data" }), {
status: 400
});
}
// Enrich with edge metadata
const enriched = {
...data,
edge_pop: process.env.TELNYX_POP,
received_at: Date.now(),
device_id: request.headers.get("X-Device-ID")
};
// Send directly to data lake (no in-memory buffering to avoid data loss)
const response = await fetch(process.env.DATA_LAKE_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
reading: enriched,
edge_pop: process.env.TELNYX_POP
})
});
if (!response.ok) {
// Return error so client can retry
return new Response(JSON.stringify({ error: "Failed to store" }), {
status: 502
});
}
return new Response(JSON.stringify({ received: true }));
}
function isValidSensorData(data) {
return data.sensor_type &&
typeof data.value === "number" &&
data.timestamp;
}
For high-volume ingestion with batching, use an external queue (Kafka, Redis Streams) to buffer data reliably. In-memory buffers in serverless functions risk data loss on cold starts or process recycling.
Use Cases
- Fleet management — Vehicle telemetry processing
- Smart buildings — Sensor data aggregation
- Industrial IoT — Edge preprocessing for manufacturing