Skip to main content

Service Integrations

Integrate Telnyx Edge Compute functions with voice, messaging, AI, and external services for powerful real-time applications.

Telnyx Service Integrations

Edge Compute functions have optimized, low-latency access to all Telnyx services:

Voice & Call Control

Messaging Services

Network & IoT

Integration Patterns

Webhook Processing

Process Telnyx webhooks with automatic retry and error handling:

// Webhook processor pattern
export default async function(request) {
try {
const webhook = await request.json();

// Verify webhook signature (recommended)
if (!verifyWebhookSignature(request.headers, webhook)) {
return new Response('Invalid signature', { status: 401 });
}

// Route by event type
switch (webhook.event_type) {
case 'call.initiated':
return await handleCallInitiated(webhook);
case 'message.received':
return await handleMessageReceived(webhook);
case 'conference.participant.joined':
return await handleConferenceJoined(webhook);
default:
console.log(`Unhandled event: ${webhook.event_type}`);
return new Response('OK', { status: 200 });
}
} catch (error) {
console.error('Webhook processing error:', error);
return new Response('Internal error', { status: 500 });
}
}

function verifyWebhookSignature(headers, payload) {
const signature = headers.get('telnyx-signature');
const timestamp = headers.get('telnyx-timestamp');

// Implement signature verification
return true; // Simplified for example
}

API Client Pattern

Reuse Telnyx API clients across requests:

import telnyx
import os
from typing import Optional

# Initialize client globally (outside handler)
telnyx_client = telnyx.Client(api_key=os.environ["TELNYX_API_KEY"])

async def handler(request):
"""Reuse client across all requests for better performance"""

# Voice operations
call = await make_call("+15551234567", "+15559876543")

# Messaging operations
message = await send_sms("+15551234567", "Hello from Edge!")

# AI operations
response = await chat_completion("How can I help you?")

return {
"call_id": call.id,
"message_id": message.id,
"ai_response": response.text
}

async def make_call(from_number: str, to_number: str):
"""Make outbound call with optimized client"""
return await telnyx_client.calls.create(
from_=from_number,
to=to_number,
connection_id=os.environ["TELNYX_CONNECTION_ID"]
)

async def send_sms(to_number: str, text: str):
"""Send SMS with error handling"""
return await telnyx_client.messages.create(
from_=os.environ["TELNYX_PHONE_NUMBER"],
to=to_number,
text=text
)

async def chat_completion(prompt: str):
"""Get AI response"""
return await telnyx_client.ai.chat_completions.create(
model="telnyx-ai",
messages=[{"role": "user", "content": prompt}]
)

Event-Driven Architecture

Build event-driven systems with function chaining:

// Event router function
package main

import (
"encoding/json"
"log"
"net/http"
)

type WebhookEvent struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}

func main() {
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
var event WebhookEvent

if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
log.Printf("Failed to decode webhook: %v", err)
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}

// Process event asynchronously
go processEvent(event)

// Return immediate response
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]string{"status": "accepted"}); err != nil {
log.Printf("Failed to encode response: %v", err)
}
})

http.ListenAndServe(":8080", nil)
}

func processEvent(event WebhookEvent) {
switch event.Type {
case "call.answered":
// Trigger call processing workflow
if err := triggerFunction("call-processor", event); err != nil {
log.Printf("Failed to trigger call processor: %v", err)
}

case "call.ended":
// Trigger analytics and cleanup
if err := triggerFunction("call-analytics", event); err != nil {
log.Printf("Failed to trigger call analytics: %v", err)
}
if err := triggerFunction("call-cleanup", event); err != nil {
log.Printf("Failed to trigger call cleanup: %v", err)
}

case "message.received":
// Trigger message processing pipeline
if err := triggerFunction("message-analyzer", event); err != nil {
log.Printf("Failed to trigger message analyzer: %v", err)
}
}
}

func triggerFunction(functionName string, data interface{}) error {
// Call another edge function
url := fmt.Sprintf("https://%s.edge.telnyx.com", functionName)

jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data: %w", err)
}

resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to call function %s: %w", functionName, err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
return fmt.Errorf("function %s returned error status: %d", functionName, resp.StatusCode)
}

return nil
}

Real-World Use Cases

Intelligent Call Routing

Route calls based on AI analysis and business rules:

// Smart call router
export default async function(request) {
const webhook = await request.json();

if (webhook.event_type === 'call.initiated') {
const caller = webhook.from;
const callee = webhook.to;

// Get caller information
const callerInfo = await lookupCaller(caller);

// AI-powered routing decision
const routingDecision = await analyzeCall({
caller_info: callerInfo,
time_of_day: new Date().getHours(),
call_history: await getCallHistory(caller)
});

// Route the call
if (routingDecision.priority === 'high') {
await routeToVIP(webhook.call_control_id);
} else if (routingDecision.department === 'support') {
await routeToSupport(webhook.call_control_id);
} else {
await routeToGeneralQueue(webhook.call_control_id);
}
}

return new Response('OK');
}

async function analyzeCall(context) {
// Use Telnyx AI for routing decisions
const response = await fetch('https://api.telnyx.com/v2/ai/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.TELNYX_API_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{
role: 'system',
content: 'Analyze this call context and recommend routing: VIP, support, or general queue'
}, {
role: 'user',
content: JSON.stringify(context)
}]
})
});

return await response.json();
}

IoT Device Management

Process IoT device events and manage device fleets:

use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Deserialize)]
struct IoTWebhook {
device_id: String,
event_type: String,
data: HashMap<String, serde_json::Value>,
timestamp: String,
}

#[derive(Serialize)]
struct DeviceCommand {
command: String,
parameters: HashMap<String, serde_json::Value>,
}

pub async fn handler(request: warp::hyper::Request<warp::hyper::Body>)
-> Result<warp::reply::Response, warp::Rejection> {

let webhook: IoTWebhook = parse_json_body(request).await?;

match webhook.event_type.as_str() {
"device.connected" => {
handle_device_connection(&webhook).await?;
},
"sensor.reading" => {
handle_sensor_data(&webhook).await?;
},
"device.alert" => {
handle_device_alert(&webhook).await?;
},
_ => {
log::warn!("Unknown IoT event: {}", webhook.event_type);
}
}

Ok(warp::reply::with_status("OK", warp::http::StatusCode::OK).into_response())
}

async fn handle_sensor_data(webhook: &IoTWebhook) -> Result<(), Box<dyn std::error::Error>> {
let device_id = &webhook.device_id;
let temperature = webhook.data.get("temperature")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);

// Check for alerts
if temperature > 80.0 {
// Send alert SMS
send_alert_sms(&format!(
"High temperature alert: Device {} reporting {}°F",
device_id, temperature
)).await?;

// Execute cooling command
send_device_command(device_id, DeviceCommand {
command: "set_cooling".to_string(),
parameters: HashMap::from([
("target_temp".to_string(),
serde_json::json!(75.0))
])
}).await?;
}

// Store sensor data
store_sensor_reading(device_id, temperature).await?;

Ok(())
}

async fn handle_device_alert(webhook: &IoTWebhook) -> Result<(), Box<dyn std::error::Error>> {
let alert_type = webhook.data.get("alert_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");

match alert_type {
"battery_low" => {
// Schedule maintenance
schedule_maintenance(&webhook.device_id).await?;
},
"connectivity_issue" => {
// Attempt remote diagnostics
run_device_diagnostics(&webhook.device_id).await?;
},
"security_breach" => {
// Immediate security response
trigger_security_protocol(&webhook.device_id).await?;

// Notify security team via voice call
make_security_alert_call().await?;
},
_ => {
log::info!("Standard alert for device {}: {}", webhook.device_id, alert_type);
}
}

Ok(())
}

External Service Integration

Database Integration

Connect to databases with connection pooling:

// Database integration with connection pooling
import { Pool } from 'pg';

// Initialize connection pool globally
const dbPool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 10,
idleTimeoutMillis: 30000,
});

export default async function(request) {
const client = await dbPool.connect();

try {
// Your database operations
const result = await client.query('SELECT * FROM users WHERE active = $1', [true]);

return new Response(JSON.stringify({
users: result.rows,
count: result.rowCount
}));
} finally {
client.release();
}
}

Third-Party API Integration

Integrate with external services efficiently:

import aiohttp
import asyncio
import os

# HTTP session for connection reuse
session = None

async def get_session():
global session
if session is None:
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
session = aiohttp.ClientSession(connector=connector)
return session

async def handler(request):
"""Integrate with multiple external APIs"""

# Parallel API calls for better performance
session = await get_session()

tasks = [
call_weather_api(session),
call_payment_api(session),
call_analytics_api(session)
]

results = await asyncio.gather(*tasks, return_exceptions=True)

return {
"weather": results[0] if not isinstance(results[0], Exception) else None,
"payment": results[1] if not isinstance(results[1], Exception) else None,
"analytics": results[2] if not isinstance(results[2], Exception) else None,
"timestamp": datetime.utcnow().isoformat()
}

async def call_weather_api(session):
"""Get weather data"""
url = "https://api.weather.com/v1/current"
headers = {
"Authorization": f"Bearer {os.environ['WEATHER_API_KEY']}"
}

async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Weather API error: {response.status}")

async def call_payment_api(session):
"""Process payment"""
url = "https://api.stripe.com/v1/payments"
headers = {"Authorization": f"Bearer {os.environ['STRIPE_SECRET_KEY']}"}

async with session.post(url, headers=headers, json={}) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Payment API error: {response.status}")

Integration Best Practices

Error Handling and Retries

import (
"context"
"time"
"math"
)

func callExternalAPI(ctx context.Context, url string, maxRetries int) (*Response, error) {
var lastErr error

for attempt := 0; attempt <= maxRetries; attempt++ {
// Exponential backoff
if attempt > 0 {
delay := time.Duration(math.Pow(2, float64(attempt))) * time.Second
select {
case <-time.After(delay):
case <-ctx.Done():
return nil, ctx.Err()
}
}

resp, err := makeRequest(ctx, url)
if err == nil {
return resp, nil
}

lastErr = err

// Don't retry on certain errors
if isNonRetryableError(err) {
break
}
}

return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries+1, lastErr)
}

Next Steps

Explore specific integration guides:

  1. Voice Integration - Call control and voice processing.
  2. Messaging Integration - SMS/MMS automation.
  3. Network Integration - SIP and WebRTC handling.

Build powerful integrated applications at the edge!