Skip to main content

Network Integration

Build robust network applications by integrating Edge Compute functions with various protocols and real-time communication systems.

SIP Protocol Integration

Handle SIP signaling and call routing at the edge:

// SIP message processing and routing
export default async function(request) {
const sipMessage = await request.text();
const sipHeaders = parseSipHeaders(sipMessage);

// Route based on SIP method
switch (sipHeaders.method) {
case 'INVITE':
return handleInvite(sipHeaders, sipMessage);
case 'REGISTER':
return handleRegister(sipHeaders);
case 'BYE':
return handleBye(sipHeaders);
default:
return handleGenericSip(sipHeaders, sipMessage);
}
}

async function handleInvite(headers, message) {
const fromUser = extractUserFromUri(headers.from);
const toUser = extractUserFromUri(headers.to);

// Check user availability
const availability = await checkUserAvailability(toUser);

if (!availability.available) {
return new Response(
generateSipResponse(486, 'Busy Here', headers.callId),
{ headers: { 'Content-Type': 'application/sdp' } }
);
}

// Route to appropriate endpoint
const endpoint = await findUserEndpoint(toUser);

if (endpoint) {
// Forward INVITE to endpoint
const response = await fetch(endpoint.url, {
method: 'POST',
headers: { 'Content-Type': 'application/sdp' },
body: message
});

return new Response(await response.text(), {
status: response.status,
headers: { 'Content-Type': 'application/sdp' }
});
}

// No endpoint found
return new Response(
generateSipResponse(404, 'Not Found', headers.callId),
{ headers: { 'Content-Type': 'application/sdp' } }
);
}

function parseSipHeaders(message) {
const lines = message.split('\r\n');
const firstLine = lines[0].split(' ');

return {
method: firstLine[0],
uri: firstLine[1],
version: firstLine[2],
from: extractHeaderValue(message, 'From'),
to: extractHeaderValue(message, 'To'),
callId: extractHeaderValue(message, 'Call-ID'),
cSeq: extractHeaderValue(message, 'CSeq'),
via: extractHeaderValue(message, 'Via')
};
}

function generateSipResponse(code, reason, callId) {
return `SIP/2.0 ${code} ${reason}\r\n` +
`Call-ID: ${callId}\r\n` +
`Content-Length: 0\r\n\r\n`;
}
# Advanced SIP proxy with load balancing
import asyncio
import json
import re
from urllib.parse import urlparse

class SipProxy:
def __init__(self):
self.endpoints = []
self.load_balancer_index = 0

async def route_request(self, sip_message):
headers = self.parse_sip_message(sip_message)

# Extract destination from Request-URI or To header
destination = self.extract_destination(headers)

# Find appropriate endpoint
endpoint = await self.find_endpoint(destination)

if not endpoint:
return self.generate_error_response(404, "Not Found", headers)

# Apply routing rules
routed_message = await self.apply_routing_rules(sip_message, endpoint)

return await self.forward_message(routed_message, endpoint)

def parse_sip_message(self, message):
lines = message.strip().split('\r\n')
request_line = lines[0]

headers = {}
for line in lines[1:]:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()

return {
'request_line': request_line,
'headers': headers,
'method': request_line.split()[0],
'uri': request_line.split()[1] if len(request_line.split()) > 1 else '',
'call_id': headers.get('Call-ID', ''),
'from': headers.get('From', ''),
'to': headers.get('To', '')
}

async def find_endpoint(self, destination):
# Round-robin load balancing
if self.endpoints:
endpoint = self.endpoints[self.load_balancer_index]
self.load_balancer_index = (self.load_balancer_index + 1) % len(self.endpoints)
return endpoint
return None

def generate_error_response(self, code, reason, headers):
response = f"SIP/2.0 {code} {reason}\r\n"
response += f"Call-ID: {headers.get('call_id', '')}\r\n"
response += f"From: {headers.get('from', '')}\r\n"
response += f"To: {headers.get('to', '')}\r\n"
response += "Content-Length: 0\r\n\r\n"
return response

proxy = SipProxy()

async def handler(request):
sip_message = await request.body()
result = await proxy.route_request(sip_message.decode('utf-8'))

return {
'body': result,
'headers': {'Content-Type': 'application/sdp'}
}

WebRTC Signaling

Manage WebRTC peer connections and signaling:

// WebRTC signaling server
package main

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"

"github.com/gorilla/websocket"
)

type SignalingMessage struct {
Type string `json:"type"`
Payload interface{} `json:"payload"`
From string `json:"from"`
To string `json:"to"`
RoomID string `json:"room_id"`
}

type Room struct {
ID string `json:"id"`
Clients map[string]*websocket.Conn `json:"-"`
mutex sync.RWMutex `json:"-"`
}

type SignalingServer struct {
rooms map[string]*Room
mutex sync.RWMutex
}

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Only allow specific origins in production
allowedOrigins := []string{
"https://yourdomain.com",
"https://app.yourdomain.com",
}

origin := r.Header.Get("Origin")

// Allow localhost for development
if strings.Contains(origin, "localhost") || strings.Contains(origin, "127.0.0.1") {
return true
}

// Check against allowed origins
for _, allowed := range allowedOrigins {
if origin == allowed {
return true
}
}

return false
},
}

func (s *SignalingServer) handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "Failed to upgrade connection", 500)
return
}
defer conn.Close()

clientID := r.URL.Query().Get("client_id")
roomID := r.URL.Query().Get("room_id")

if clientID == "" || roomID == "" {
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"error","message":"client_id and room_id are required"}`))
return
}

// Join room
room := s.getOrCreateRoom(roomID)
room.addClient(clientID, conn)
defer room.removeClient(clientID)

// Handle messages
for {
var message SignalingMessage
err := conn.ReadJSON(&message)
if err != nil {
break
}

message.From = clientID
s.handleSignalingMessage(&message, room)
}
}

func (s *SignalingServer) handleSignalingMessage(message *SignalingMessage, room *Room) {
switch message.Type {
case "offer", "answer":
s.forwardToClient(message, room)
case "ice-candidate":
s.forwardToClient(message, room)
case "join":
s.handleJoin(message, room)
case "leave":
s.handleLeave(message, room)
default:
// Forward unknown messages
s.forwardToClient(message, room)
}
}

func (s *SignalingServer) forwardToClient(message *SignalingMessage, room *Room) {
room.mutex.RLock()
defer room.mutex.RUnlock()

if message.To != "" {
// Send to specific client
if conn, exists := room.Clients[message.To]; exists {
conn.WriteJSON(message)
}
} else {
// Broadcast to all clients except sender
for clientID, conn := range room.Clients {
if clientID != message.From {
conn.WriteJSON(message)
}
}
}
}

func (r *Room) addClient(clientID string, conn *websocket.Conn) {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.Clients == nil {
r.Clients = make(map[string]*websocket.Conn)
}
r.Clients[clientID] = conn

// Notify other clients
notification := SignalingMessage{
Type: "client-joined",
Payload: map[string]string{"client_id": clientID},
From: "server",
}

for id, c := range r.Clients {
if id != clientID {
c.WriteJSON(notification)
}
}
}

func (r *Room) removeClient(clientID string) {
r.mutex.Lock()
defer r.mutex.Unlock()

delete(r.Clients, clientID)

// Notify remaining clients
notification := SignalingMessage{
Type: "client-left",
Payload: map[string]string{"client_id": clientID},
From: "server",
}

for _, conn := range r.Clients {
conn.WriteJSON(notification)
}
}

func (s *SignalingServer) getOrCreateRoom(roomID string) *Room {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.rooms == nil {
s.rooms = make(map[string]*Room)
}

room, exists := s.rooms[roomID]
if !exists {
room = &Room{
ID: roomID,
Clients: make(map[string]*websocket.Conn),
}
s.rooms[roomID] = room
}

return room
}

func handler(w http.ResponseWriter, r *http.Request) {
server := &SignalingServer{}
server.handleWebSocket(w, r)
}

IoT Device Communication

Connect and manage IoT devices with MQTT and CoAP protocols:

// MQTT device management and message routing
import { connect } from 'mqtt';

// Global MQTT client (reused across function invocations)
let mqttClient;

async function ensureMqttConnection() {
if (!mqttClient || !mqttClient.connected) {
mqttClient = connect(process.env.MQTT_BROKER_URL, {
clientId: `edge-function-${Date.now()}`,
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD,
clean: true,
reconnectPeriod: 1000,
});

// Wait for connection
await new Promise((resolve) => {
mqttClient.on('connect', resolve);
});
}

return mqttClient;
}

export default async function(request) {
const { action, topic, message, deviceId } = await request.json();

const client = await ensureMqttConnection();

switch (action) {
case 'publish':
return await publishMessage(client, topic, message);
case 'subscribe':
return await subscribeToTopic(client, topic);
case 'get-device-status':
return await getDeviceStatus(client, deviceId);
case 'send-command':
return await sendDeviceCommand(client, deviceId, message);
default:
return new Response('Invalid action', { status: 400 });
}
}

async function publishMessage(client, topic, message) {
return new Promise((resolve) => {
client.publish(topic, JSON.stringify(message), (error) => {
if (error) {
resolve(new Response(JSON.stringify({ error: error.message }), { status: 500 }));
} else {
resolve(new Response(JSON.stringify({ success: true, topic, timestamp: Date.now() })));
}
});
});
}

async function subscribeToTopic(client, topic) {
return new Promise((resolve) => {
const messages = [];

client.subscribe(topic, (error) => {
if (error) {
resolve(new Response(JSON.stringify({ error: error.message }), { status: 500 }));
return;
}

// Listen for messages for a short period
const timeout = setTimeout(() => {
client.unsubscribe(topic);
resolve(new Response(JSON.stringify({
topic,
messages,
count: messages.length
})));
}, 5000);

const messageHandler = (receivedTopic, message) => {
if (receivedTopic === topic) {
messages.push({
message: message.toString(),
timestamp: Date.now()
});
}
};

client.on('message', messageHandler);

// Clean up listener after timeout
setTimeout(() => {
client.removeListener('message', messageHandler);
}, 5000);
});
});
}

async function getDeviceStatus(client, deviceId) {
const statusTopic = `devices/${deviceId}/status`;
const commandTopic = `devices/${deviceId}/commands`;

// Request status update
await publishMessage(client, commandTopic, { command: 'get_status' });

// Listen for status response
return await subscribeToTopic(client, statusTopic);
}

async function sendDeviceCommand(client, deviceId, command) {
const commandTopic = `devices/${deviceId}/commands`;
const responseTopic = `devices/${deviceId}/responses`;

// Send command
await publishMessage(client, commandTopic, {
command: command.type,
parameters: command.parameters,
timestamp: Date.now(),
id: crypto.randomUUID()
});

// Wait for response
return await subscribeToTopic(client, responseTopic);
}
# CoAP server for IoT device communication
import aiocoap
import asyncio
import json
from aiocoap import resource, Context, Message, Code

class DeviceStatusResource(resource.Resource):
"""CoAP resource for device status updates."""

def __init__(self):
super().__init__()
self.device_status = {}

async def render_get(self, request):
"""Handle GET requests for device status."""
device_id = request.opt.uri_path[1] if len(request.opt.uri_path) > 1 else None

if device_id:
status = self.device_status.get(device_id, {"status": "unknown"})
payload = json.dumps(status).encode('utf-8')
else:
payload = json.dumps(self.device_status).encode('utf-8')

return Message(code=Code.CONTENT, payload=payload)

async def render_post(self, request):
"""Handle POST requests to update device status."""
try:
device_id = request.opt.uri_path[1] if len(request.opt.uri_path) > 1 else None
data = json.loads(request.payload.decode('utf-8'))

if device_id:
self.device_status[device_id] = {
**data,
'last_updated': asyncio.get_event_loop().time(),
'ip_address': str(request.remote)
}

# Log device update
print(f"Device {device_id} status updated: {data}")

return Message(code=Code.CREATED, payload=b'Status updated')
else:
return Message(code=Code.BAD_REQUEST, payload=b'Device ID required')

except json.JSONDecodeError:
return Message(code=Code.BAD_REQUEST, payload=b'Invalid JSON')
except Exception as e:
return Message(code=Code.INTERNAL_SERVER_ERROR,
payload=f'Error: {str(e)}'.encode('utf-8'))

class DeviceCommandResource(resource.Resource):
"""CoAP resource for sending commands to devices."""

def __init__(self):
super().__init__()
self.pending_commands = {}

async def render_post(self, request):
"""Handle POST requests to send commands."""
try:
device_id = request.opt.uri_path[1] if len(request.opt.uri_path) > 1 else None
command_data = json.loads(request.payload.decode('utf-8'))

if not device_id:
return Message(code=Code.BAD_REQUEST, payload=b'Device ID required')

# Store command for device to poll
command_id = f"{device_id}_{asyncio.get_event_loop().time()}"
self.pending_commands[device_id] = {
'id': command_id,
'command': command_data,
'timestamp': asyncio.get_event_loop().time()
}

# Send command to device (if connected via observe)
await self.notify_device(device_id, command_data)

return Message(code=Code.CREATED,
payload=f'Command queued: {command_id}'.encode('utf-8'))

except Exception as e:
return Message(code=Code.INTERNAL_SERVER_ERROR,
payload=f'Error: {str(e)}'.encode('utf-8'))

async def render_get(self, request):
"""Handle GET requests to poll for commands."""
device_id = request.opt.uri_path[1] if len(request.opt.uri_path) > 1 else None

if device_id and device_id in self.pending_commands:
command = self.pending_commands.pop(device_id)
payload = json.dumps(command).encode('utf-8')
else:
payload = json.dumps({"message": "No pending commands"}).encode('utf-8')

return Message(code=Code.CONTENT, payload=payload)

async def setup_coap_server():
"""Setup CoAP server with device resources."""
root = resource.Site()

# Add device resources
root.add_resource(['devices', 'status'], DeviceStatusResource())
root.add_resource(['devices', 'commands'], DeviceCommandResource())

# Create CoAP context
context = await Context.create_server_context(root, bind=('0.0.0.0', 5683))

return context

async def handler(request):
"""Main handler function."""
# Setup CoAP server if not already running
context = await setup_coap_server()

return {
'message': 'CoAP server running on port 5683',
'resources': [
'/devices/status/<device_id>',
'/devices/commands/<device_id>'
]
}

Network Monitoring & Analytics

Monitor network performance and analyze traffic patterns:

# Network performance monitoring
import asyncio
import time
import psutil
import json
from collections import defaultdict, deque

class NetworkMonitor:
def __init__(self):
self.metrics = defaultdict(deque)
self.max_samples = 100

def collect_metrics(self):
"""Collect current network metrics."""
net_io = psutil.net_io_counters()
connections = psutil.net_connections()

current_time = time.time()

# Collect basic metrics
metrics = {
'timestamp': current_time,
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv,
'errin': net_io.errin,
'errout': net_io.errout,
'dropin': net_io.dropin,
'dropout': net_io.dropout,
'active_connections': len([c for c in connections if c.status == 'ESTABLISHED'])
}

# Calculate rates if we have previous data
if self.metrics['bytes_sent']:
prev_sample = self.metrics['bytes_sent'][-1]
time_diff = current_time - prev_sample['timestamp']

if time_diff > 0:
metrics['bytes_sent_rate'] = (metrics['bytes_sent'] - prev_sample['bytes_sent']) / time_diff
metrics['bytes_recv_rate'] = (metrics['bytes_recv'] - prev_sample['bytes_recv']) / time_diff

# Store metrics (keep only recent samples)
for key, value in metrics.items():
self.metrics[key].append({'timestamp': current_time, key: value})
if len(self.metrics[key]) > self.max_samples:
self.metrics[key].popleft()

return metrics

def get_statistics(self, duration_minutes=5):
"""Get network statistics for specified duration."""
cutoff_time = time.time() - (duration_minutes * 60)

stats = {}
for metric_name, samples in self.metrics.items():
recent_samples = [s for s in samples if s['timestamp'] > cutoff_time]

if recent_samples:
values = [s[metric_name] for s in recent_samples if metric_name in s]
if values:
stats[metric_name] = {
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'current': values[-1] if values else 0,
'samples': len(values)
}

return stats

def detect_anomalies(self, threshold_multiplier=2.0):
"""Detect network anomalies based on historical data."""
anomalies = []
current_metrics = self.collect_metrics()
historical_stats = self.get_statistics(duration_minutes=30)

for metric in ['bytes_sent_rate', 'bytes_recv_rate', 'active_connections']:
if metric in current_metrics and metric in historical_stats:
current_value = current_metrics[metric]
historical_avg = historical_stats[metric]['avg']

if current_value > historical_avg * threshold_multiplier:
anomalies.append({
'metric': metric,
'current_value': current_value,
'historical_avg': historical_avg,
'deviation_factor': current_value / historical_avg if historical_avg > 0 else 0,
'severity': 'high' if current_value > historical_avg * 3 else 'medium'
})

return anomalies

# Global monitor instance
network_monitor = NetworkMonitor()

async def handler(request):
data = await request.json()
action = data.get('action', 'get_metrics')

if action == 'get_metrics':
metrics = network_monitor.collect_metrics()
return {'metrics': metrics}

elif action == 'get_statistics':
duration = data.get('duration_minutes', 5)
stats = network_monitor.get_statistics(duration)
return {'statistics': stats, 'duration_minutes': duration}

elif action == 'detect_anomalies':
threshold = data.get('threshold_multiplier', 2.0)
anomalies = network_monitor.detect_anomalies(threshold)
return {'anomalies': anomalies, 'count': len(anomalies)}

else:
return {'error': 'Invalid action'}

Explore more network integration patterns in our Network & Telecommunications documentation.