Skip to main content
Reference architectures demonstrate how Edge Compute products work together to solve real-world problems. Each architecture includes a diagram, component breakdown, and sample implementation.

Architectures

Global API Gateway

Route API traffic globally with caching, auth, and rate limiting.

Telecom Event Processor

Process voice, SMS, and messaging events in real-time.

Real-Time Media Pipeline

Transform and route media streams at the edge.

IoT Data Ingestion

Collect and process IoT sensor data globally.

Global API Gateway

A globally distributed API gateway that provides authentication, rate limiting, and caching for backend services.

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                         Client Requests                              │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│                    Telnyx Edge (Global PoPs)                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Auth      │→ │Rate Limiter │→ │   Cache     │→ │   Router    │ │
│  │ Middleware  │  │             │  │   Layer     │  │             │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

                    ┌─────────────┼─────────────┐
                    ▼             ▼             ▼
              ┌──────────┐  ┌──────────┐  ┌──────────┐
              │ Service  │  │ Service  │  │ Service  │
              │    A     │  │    B     │  │    C     │
              └──────────┘  └──────────┘  └──────────┘

Components

ComponentProductPurpose
Edge FunctionEdge ComputeRequest processing, routing
Rate LimitingEdge KVDistributed counters
CachingEdge KVResponse cache
AuthSecretsJWT verification keys

Implementation

import os
import jwt
import json
import hashlib
from datetime import datetime
from urllib.parse import urlparse, urlencode, parse_qs

class APIGateway:
    def __init__(self):
        self.jwt_secret = os.getenv("JWT_SECRET")
        self.rate_limit = 100  # requests per minute
    
    async def handler(self, request):
        # 1. Authentication
        auth_result = self.authenticate(request)
        if auth_result["error"]:
            return self.json_response(auth_result, 401)
        
        user_id = auth_result["user_id"]
        
        # 2. Rate limiting
        if await self.is_rate_limited(user_id):
            return self.json_response(
                {"error": "Rate limit exceeded"}, 
                429,
                {"Retry-After": "60"}
            )
        
        # 3. Cache check
        cache_key = self.cache_key(request)
        cached = await self.kv.get(cache_key)
        if cached:
            return self.json_response(json.loads(cached))
        
        # 4. Route to backend
        response = await self.route_request(request)
        
        # 5. Cache response
        if response["status"] == 200:
            await self.kv.put(cache_key, json.dumps(response), ttl=300)
        
        return response
    
    def authenticate(self, request):
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            return {"error": "Missing token", "user_id": None}
        
        token = auth_header[7:]
        try:
            payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
            return {"error": None, "user_id": payload["sub"]}
        except jwt.InvalidTokenError:
            return {"error": "Invalid token", "user_id": None}
    
    async def is_rate_limited(self, user_id):
        key = f"rate:{user_id}:{datetime.utcnow().strftime('%Y%m%d%H%M')}"
        count = int(await self.kv.get(key) or 0)
        
        if count >= self.rate_limit:
            return True
        
        await self.kv.put(key, str(count + 1), ttl=60)
        return False
    
    def cache_key(self, request):
        # Generate cache key from method + path + query string
        return f"cache:{request.method}:{request.path}:{hashlib.md5(request.url.encode()).hexdigest()}"
    
    async def route_request(self, request):
        parsed = urlparse(request.url)
        path = parsed.path
        query = parsed.query  # Preserve query string
        
        # Route based on path prefix
        if path.startswith("/api/users"):
            backend = "https://users-service.internal"
        elif path.startswith("/api/orders"):
            backend = "https://orders-service.internal"
        else:
            return self.json_response({"error": "Not found"}, 404)
        
        # Forward request with query string preserved
        upstream_url = f"{backend}{path}"
        if query:
            upstream_url = f"{upstream_url}?{query}"
        
        response = await fetch(upstream_url, {
            "method": request.method,
            "headers": request.headers,
            "body": await request.text() if request.method != "GET" else None
        })
        
        return {
            "status": response.status,
            "headers": dict(response.headers),
            "body": await response.text()
        }

Use Cases

  • Microservices gateway — Unified entry point for distributed services
  • Third-party API proxy — Add auth and rate limiting to external APIs
  • Multi-region deployment — Route to nearest backend region

Telecom Event Processor

Process voice calls, SMS messages, and fax events in real-time with low latency.

Architecture Diagram

┌──────────────────────────────────────────────────────────────┐
│                    Telnyx Network                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐     │
│  │  Voice   │  │   SMS    │  │   Fax    │  │  Number  │     │
│  │  Calls   │  │ Messages │  │  Events  │  │  Lookup  │     │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘     │
└───────┼─────────────┼─────────────┼─────────────┼────────────┘
        │             │             │             │
        └─────────────┴──────┬──────┴─────────────┘

              ┌──────────────────────────────┐
              │    Edge Compute Function     │
              │  ┌─────────┐  ┌───────────┐  │
              │  │ Webhook │→ │ Business  │  │
              │  │ Handler │  │   Logic   │  │
              │  └─────────┘  └───────────┘  │
              └──────────────────────────────┘

              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
        ┌──────────┐  ┌──────────┐  ┌──────────┐
        │   CRM    │  │ Analytics│  │ Alerts   │
        │  System  │  │ Platform │  │ Service  │
        └──────────┘  └──────────┘  └──────────┘

Implementation

// Unified telecom event handler
export async function handler(request) {
    const webhook = await request.json();
    const eventType = webhook.data.event_type;
    
    // Route by event type
    switch (eventType) {
        case "call.initiated":
        case "call.answered":
        case "call.hangup":
            return handleVoiceEvent(webhook);
        
        case "message.received":
        case "message.sent":
            return handleSMSEvent(webhook);
        
        case "fax.received":
            return handleFaxEvent(webhook);
        
        case "fax.sent":
            return handleFaxEvent(webhook);
        
        default:
            console.log(`Unhandled event: ${eventType}`);
            return new Response("OK");
    }
}

async function handleVoiceEvent(webhook) {
    const event = webhook.data;
    const callId = event.payload.call_control_id;
    
    // Log to analytics
    await fetch(process.env.ANALYTICS_URL, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
            type: "voice",
            event: event.event_type,
            call_id: callId,
            from: event.payload.from,
            to: event.payload.to,
            timestamp: event.occurred_at
        })
    });
    
    // Business logic
    if (event.event_type === "call.initiated") {
        // Check CRM for caller info
        const caller = await lookupCaller(event.payload.from);
        
        if (caller.vip) {
            // Route VIP to priority queue
            return generateTeXML(`
                <Response>
                    <Say>Welcome back, ${caller.name}. Connecting you now.</Say>
                    <Dial><Queue name="vip"/></Dial>
                </Response>
            `);
        }
    }
    
    return new Response("OK");
}

async function handleSMSEvent(webhook) {
    const event = webhook.data;
    
    if (event.event_type === "message.received") {
        const text = event.payload.text.toLowerCase();
        const from = event.payload.from.phone_number;
        
        // Keyword detection (TCPA-compliant opt-out keywords)
        if (text.includes("stop") || text.includes("unsubscribe") || 
            text.includes("cancel") || text.includes("quit")) {
            await updateOptOut(from, true);
            await sendSMS(from, "You have been unsubscribed. Reply START to resubscribe.");
        } else if (text.includes("help")) {
            await sendSMS(from, "Reply STOP to unsubscribe, START to resubscribe.");
        } else if (text.includes("start")) {
            await updateOptOut(from, false);
            await sendSMS(from, "You have been resubscribed.");
        }
    }
    
    return new Response("OK");
}

async function handleFaxEvent(webhook) {
    const event = webhook.data;
    
    // Log fax event
    await fetch(process.env.ANALYTICS_URL, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
            type: "fax",
            event: event.event_type,
            fax_id: event.payload.fax_id,
            from: event.payload.from,
            to: event.payload.to,
            pages: event.payload.page_count,
            timestamp: event.occurred_at
        })
    });
    
    // Download and store fax if received
    if (event.event_type === "fax.received" && event.payload.media_url) {
        await storeFax(event.payload.fax_id, event.payload.media_url);
    }
    
    return new Response("OK");
}

Use Cases

  • Contact center — Route calls based on caller history
  • SMS marketing — Process opt-ins/outs in real-time
  • Two-factor auth — Generate and verify SMS codes

Real-Time Media Pipeline

Transform and route media streams for video, audio, and real-time communication.

Architecture Diagram

┌─────────────┐      ┌─────────────────────────────────────────┐
│   Media     │      │           Telnyx Edge                    │
│   Source    │─────▶│  ┌──────────┐  ┌──────────┐  ┌────────┐ │
│  (Camera)   │      │  │ Ingest   │→ │Transform │→ │ Route  │ │
└─────────────┘      │  └──────────┘  └──────────┘  └────────┘ │
                     └─────────────────────────────────────────┘

                     ┌──────────────────┼──────────────────┐
                     ▼                  ▼                  ▼
               ┌──────────┐      ┌──────────┐      ┌──────────┐
               │ Storage  │      │  CDN     │      │ LiveKit  │
               │ (S3)     │      │ Delivery │      │ Stream   │
               └──────────┘      └──────────┘      └──────────┘

Implementation

import asyncio
from datetime import datetime

class MediaPipeline:
    async def handler(self, request):
        content_type = request.headers.get("Content-Type", "")
        
        if "video" in content_type or "audio" in content_type:
            return await self.process_media(request)
        elif "application/json" in content_type:
            return await self.handle_control(request)
        else:
            return {"status": 400, "body": "Unsupported content type"}
    
    async def process_media(self, request):
        media_data = await request.bytes()
        stream_id = request.headers.get("X-Stream-ID")
        
        # Transform (e.g., transcode, watermark)
        processed = await self.transform(media_data)
        
        # Route to destinations
        await asyncio.gather(
            self.store_archive(stream_id, processed),
            self.push_to_cdn(stream_id, processed),
            self.forward_to_livekit(stream_id, processed)
        )
        
        return {"status": 200, "body": "OK"}
    
    async def transform(self, media_data):
        # Apply transformations
        # - Resize/transcode
        # - Add watermark
        # - Extract thumbnails
        return media_data
    
    async def store_archive(self, stream_id, data):
        await self.s3.put_object(
            Bucket="media-archive",
            Key=f"{stream_id}/{datetime.utcnow().isoformat()}.webm",
            Body=data
        )
    
    async def push_to_cdn(self, stream_id, data):
        await fetch(f"{self.cdn_ingest_url}/{stream_id}", {
            "method": "POST",
            "body": data
        })
    
    async def forward_to_livekit(self, stream_id, data):
        # Forward to LiveKit SFU for real-time distribution
        await fetch(f"{self.livekit_url}/ingest/{stream_id}", {
            "method": "POST",
            "headers": {"Authorization": f"Bearer {self.livekit_token}"},
            "body": data
        })
    
    async def handle_control(self, request):
        # Handle control messages (start/stop stream, change settings)
        body = await request.json()
        action = body.get("action")
        stream_id = body.get("stream_id")
        
        if action == "start":
            # Initialize stream resources
            return {"status": 200, "body": {"stream_id": stream_id, "status": "started"}}
        elif action == "stop":
            # Clean up stream resources
            return {"status": 200, "body": {"stream_id": stream_id, "status": "stopped"}}
        else:
            return {"status": 400, "body": {"error": "Unknown action"}}

Use Cases

  • Live streaming — Ingest and distribute live video
  • Video conferencing — Real-time media processing
  • Surveillance — Edge processing for camera feeds

IoT Data Ingestion

Collect sensor data from IoT devices globally with edge processing and aggregation.

Architecture Diagram

┌──────────────────────────────────────────────────────────────────┐
│                       IoT Devices (Global)                        │
│  ┌───────┐  ┌───────┐  ┌───────┐  ┌───────┐  ┌───────┐          │
│  │Sensor │  │Sensor │  │Sensor │  │Sensor │  │Sensor │          │
│  │  A    │  │  B    │  │  C    │  │  D    │  │  E    │          │
│  └───┬───┘  └───┬───┘  └───┬───┘  └───┬───┘  └───┬───┘          │
└──────┼──────────┼──────────┼──────────┼──────────┼───────────────┘
       │          │          │          │          │
       └──────────┴────┬─────┴──────────┴──────────┘

         ┌──────────────────────────────┐
         │     Telnyx Edge (Per-PoP)    │
         │  ┌─────────┐  ┌───────────┐  │
         │  │Validate │→ │ Aggregate │  │
         │  │ + Parse │  │ + Buffer  │  │
         │  └─────────┘  └───────────┘  │
         └──────────────────────────────┘


         ┌──────────────────────────────┐
         │      Central Data Lake       │
         └──────────────────────────────┘

Implementation

// IoT data ingestion with edge processing
// Note: For production, use a persistent queue (e.g., Kafka, Redis)
// to avoid data loss. This example shows the processing pattern.

export async function handler(request) {
    // Parse sensor data
    const data = await request.json();
    
    // Validate
    if (!isValidSensorData(data)) {
        return new Response(JSON.stringify({ error: "Invalid data" }), {
            status: 400
        });
    }
    
    // Enrich with edge metadata
    const enriched = {
        ...data,
        edge_pop: process.env.TELNYX_POP,
        received_at: Date.now(),
        device_id: request.headers.get("X-Device-ID")
    };
    
    // Send directly to data lake (no in-memory buffering to avoid data loss)
    const response = await fetch(process.env.DATA_LAKE_URL, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
            reading: enriched,
            edge_pop: process.env.TELNYX_POP
        })
    });
    
    if (!response.ok) {
        // Return error so client can retry
        return new Response(JSON.stringify({ error: "Failed to store" }), {
            status: 502
        });
    }
    
    return new Response(JSON.stringify({ received: true }));
}

function isValidSensorData(data) {
    return data.sensor_type && 
           typeof data.value === "number" &&
           data.timestamp;
}
For high-volume ingestion with batching, use an external queue (Kafka, Redis Streams) to buffer data reliably. In-memory buffers in serverless functions risk data loss on cold starts or process recycling.

Use Cases

  • Fleet management — Vehicle telemetry processing
  • Smart buildings — Sensor data aggregation
  • Industrial IoT — Edge preprocessing for manufacturing

  • Examples — Code snippets for common patterns
  • Tutorials — Step-by-step guides
  • Runtime — Execution environment details