Service Integrations
Integrate Telnyx Edge Compute functions with voice, messaging, AI, and external services for powerful real-time applications.
Telnyx Service Integrations
Edge Compute functions have optimized, low-latency access to all Telnyx services:
Voice & Call Control
- Voice Webhooks - Process call events in real-time.
- Call Control - Programmatic call management.
- TeXML Processing - XML-based call flows.
- Conference Management - Dynamic conference control.
Messaging Services
- SMS/MMS Processing - Handle message webhooks.
- Auto-responders - Intelligent message replies.
- Message Broadcasting - Bulk message sending.
- 10DLC Compliance - Automated compliance handling.
Network & IoT
- SIP Processing - SIP message handling.
- WebRTC Signaling - Video call signaling.
- IoT Device Webhooks - Device event processing.
Integration Patterns
Webhook Processing
Process Telnyx webhooks with automatic retry and error handling:
// Webhook processor pattern
export default async function(request) {
try {
const webhook = await request.json();
// Verify webhook signature (recommended)
if (!verifyWebhookSignature(request.headers, webhook)) {
return new Response('Invalid signature', { status: 401 });
}
// Route by event type
switch (webhook.event_type) {
case 'call.initiated':
return await handleCallInitiated(webhook);
case 'message.received':
return await handleMessageReceived(webhook);
case 'conference.participant.joined':
return await handleConferenceJoined(webhook);
default:
console.log(`Unhandled event: ${webhook.event_type}`);
return new Response('OK', { status: 200 });
}
} catch (error) {
console.error('Webhook processing error:', error);
return new Response('Internal error', { status: 500 });
}
}
function verifyWebhookSignature(headers, payload) {
const signature = headers.get('telnyx-signature');
const timestamp = headers.get('telnyx-timestamp');
// Implement signature verification
return true; // Simplified for example
}
API Client Pattern
Reuse Telnyx API clients across requests:
import telnyx
import os
from typing import Optional
# Initialize client globally (outside handler)
telnyx_client = telnyx.Client(api_key=os.environ["TELNYX_API_KEY"])
async def handler(request):
"""Reuse client across all requests for better performance"""
# Voice operations
call = await make_call("+15551234567", "+15559876543")
# Messaging operations
message = await send_sms("+15551234567", "Hello from Edge!")
# AI operations
response = await chat_completion("How can I help you?")
return {
"call_id": call.id,
"message_id": message.id,
"ai_response": response.text
}
async def make_call(from_number: str, to_number: str):
"""Make outbound call with optimized client"""
return await telnyx_client.calls.create(
from_=from_number,
to=to_number,
connection_id=os.environ["TELNYX_CONNECTION_ID"]
)
async def send_sms(to_number: str, text: str):
"""Send SMS with error handling"""
return await telnyx_client.messages.create(
from_=os.environ["TELNYX_PHONE_NUMBER"],
to=to_number,
text=text
)
async def chat_completion(prompt: str):
"""Get AI response"""
return await telnyx_client.ai.chat_completions.create(
model="telnyx-ai",
messages=[{"role": "user", "content": prompt}]
)
Event-Driven Architecture
Build event-driven systems with function chaining:
// Event router function
package main
import (
"encoding/json"
"log"
"net/http"
)
type WebhookEvent struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}
func main() {
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
var event WebhookEvent
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
log.Printf("Failed to decode webhook: %v", err)
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Process event asynchronously
go processEvent(event)
// Return immediate response
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]string{"status": "accepted"}); err != nil {
log.Printf("Failed to encode response: %v", err)
}
})
http.ListenAndServe(":8080", nil)
}
func processEvent(event WebhookEvent) {
switch event.Type {
case "call.answered":
// Trigger call processing workflow
if err := triggerFunction("call-processor", event); err != nil {
log.Printf("Failed to trigger call processor: %v", err)
}
case "call.ended":
// Trigger analytics and cleanup
if err := triggerFunction("call-analytics", event); err != nil {
log.Printf("Failed to trigger call analytics: %v", err)
}
if err := triggerFunction("call-cleanup", event); err != nil {
log.Printf("Failed to trigger call cleanup: %v", err)
}
case "message.received":
// Trigger message processing pipeline
if err := triggerFunction("message-analyzer", event); err != nil {
log.Printf("Failed to trigger message analyzer: %v", err)
}
}
}
func triggerFunction(functionName string, data interface{}) error {
// Call another edge function
url := fmt.Sprintf("https://%s.edge.telnyx.com", functionName)
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data: %w", err)
}
resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to call function %s: %w", functionName, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("function %s returned error status: %d", functionName, resp.StatusCode)
}
return nil
}
Real-World Use Cases
Intelligent Call Routing
Route calls based on AI analysis and business rules:
// Smart call router
export default async function(request) {
const webhook = await request.json();
if (webhook.event_type === 'call.initiated') {
const caller = webhook.from;
const callee = webhook.to;
// Get caller information
const callerInfo = await lookupCaller(caller);
// AI-powered routing decision
const routingDecision = await analyzeCall({
caller_info: callerInfo,
time_of_day: new Date().getHours(),
call_history: await getCallHistory(caller)
});
// Route the call
if (routingDecision.priority === 'high') {
await routeToVIP(webhook.call_control_id);
} else if (routingDecision.department === 'support') {
await routeToSupport(webhook.call_control_id);
} else {
await routeToGeneralQueue(webhook.call_control_id);
}
}
return new Response('OK');
}
async function analyzeCall(context) {
// Use Telnyx AI for routing decisions
const response = await fetch('https://api.telnyx.com/v2/ai/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.TELNYX_API_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{
role: 'system',
content: 'Analyze this call context and recommend routing: VIP, support, or general queue'
}, {
role: 'user',
content: JSON.stringify(context)
}]
})
});
return await response.json();
}
IoT Device Management
Process IoT device events and manage device fleets:
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Deserialize)]
struct IoTWebhook {
device_id: String,
event_type: String,
data: HashMap<String, serde_json::Value>,
timestamp: String,
}
#[derive(Serialize)]
struct DeviceCommand {
command: String,
parameters: HashMap<String, serde_json::Value>,
}
pub async fn handler(request: warp::hyper::Request<warp::hyper::Body>)
-> Result<warp::reply::Response, warp::Rejection> {
let webhook: IoTWebhook = parse_json_body(request).await?;
match webhook.event_type.as_str() {
"device.connected" => {
handle_device_connection(&webhook).await?;
},
"sensor.reading" => {
handle_sensor_data(&webhook).await?;
},
"device.alert" => {
handle_device_alert(&webhook).await?;
},
_ => {
log::warn!("Unknown IoT event: {}", webhook.event_type);
}
}
Ok(warp::reply::with_status("OK", warp::http::StatusCode::OK).into_response())
}
async fn handle_sensor_data(webhook: &IoTWebhook) -> Result<(), Box<dyn std::error::Error>> {
let device_id = &webhook.device_id;
let temperature = webhook.data.get("temperature")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
// Check for alerts
if temperature > 80.0 {
// Send alert SMS
send_alert_sms(&format!(
"High temperature alert: Device {} reporting {}°F",
device_id, temperature
)).await?;
// Execute cooling command
send_device_command(device_id, DeviceCommand {
command: "set_cooling".to_string(),
parameters: HashMap::from([
("target_temp".to_string(),
serde_json::json!(75.0))
])
}).await?;
}
// Store sensor data
store_sensor_reading(device_id, temperature).await?;
Ok(())
}
async fn handle_device_alert(webhook: &IoTWebhook) -> Result<(), Box<dyn std::error::Error>> {
let alert_type = webhook.data.get("alert_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match alert_type {
"battery_low" => {
// Schedule maintenance
schedule_maintenance(&webhook.device_id).await?;
},
"connectivity_issue" => {
// Attempt remote diagnostics
run_device_diagnostics(&webhook.device_id).await?;
},
"security_breach" => {
// Immediate security response
trigger_security_protocol(&webhook.device_id).await?;
// Notify security team via voice call
make_security_alert_call().await?;
},
_ => {
log::info!("Standard alert for device {}: {}", webhook.device_id, alert_type);
}
}
Ok(())
}
External Service Integration
Database Integration
Connect to databases with connection pooling:
// Database integration with connection pooling
import { Pool } from 'pg';
// Initialize connection pool globally
const dbPool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 10,
idleTimeoutMillis: 30000,
});
export default async function(request) {
const client = await dbPool.connect();
try {
// Your database operations
const result = await client.query('SELECT * FROM users WHERE active = $1', [true]);
return new Response(JSON.stringify({
users: result.rows,
count: result.rowCount
}));
} finally {
client.release();
}
}
Third-Party API Integration
Integrate with external services efficiently:
import aiohttp
import asyncio
import os
# HTTP session for connection reuse
session = None
async def get_session():
global session
if session is None:
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
session = aiohttp.ClientSession(connector=connector)
return session
async def handler(request):
"""Integrate with multiple external APIs"""
# Parallel API calls for better performance
session = await get_session()
tasks = [
call_weather_api(session),
call_payment_api(session),
call_analytics_api(session)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"weather": results[0] if not isinstance(results[0], Exception) else None,
"payment": results[1] if not isinstance(results[1], Exception) else None,
"analytics": results[2] if not isinstance(results[2], Exception) else None,
"timestamp": datetime.utcnow().isoformat()
}
async def call_weather_api(session):
"""Get weather data"""
url = "https://api.weather.com/v1/current"
headers = {
"Authorization": f"Bearer {os.environ['WEATHER_API_KEY']}"
}
async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Weather API error: {response.status}")
async def call_payment_api(session):
"""Process payment"""
url = "https://api.stripe.com/v1/payments"
headers = {"Authorization": f"Bearer {os.environ['STRIPE_SECRET_KEY']}"}
async with session.post(url, headers=headers, json={}) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Payment API error: {response.status}")
Integration Best Practices
Error Handling and Retries
import (
"context"
"time"
"math"
)
func callExternalAPI(ctx context.Context, url string, maxRetries int) (*Response, error) {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
// Exponential backoff
if attempt > 0 {
delay := time.Duration(math.Pow(2, float64(attempt))) * time.Second
select {
case <-time.After(delay):
case <-ctx.Done():
return nil, ctx.Err()
}
}
resp, err := makeRequest(ctx, url)
if err == nil {
return resp, nil
}
lastErr = err
// Don't retry on certain errors
if isNonRetryableError(err) {
break
}
}
return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries+1, lastErr)
}
Next Steps
Explore specific integration guides:
- Voice Integration - Call control and voice processing.
- Messaging Integration - SMS/MMS automation.
- Network Integration - SIP and WebRTC handling.
Build powerful integrated applications at the edge!