Skip to main content

Function Calling (Streaming + Parallel Calls)

In the previous tutorial, we learned the basics for defining and executing functions using our chat completions API.

In this tutorial, we will introduce more advanced use cases:

  • Streaming function calls
  • Passing multiple functions
  • Executing function calls in parallel

For low-latency contexts, streaming and parallel calls are especially helpful.

Defining our functions

First, we will define two functions we want to execute in parallel: sleep and dream.

Our goal is to use the dream function to make an API call to the Telnyx chat completions endpoint while we sleep.

We will also re-use the func_to_tool helped function we defined in the previous tutorial to easily convert between our Python functions and the JSON we need to pass to the tools field for our chat completions API.

Note

Make sure you have set the TELNYX_API_KEY environment variable

import asyncio
import inspect
import json
import os
from openai import AsyncOpenAI
from pydantic import create_model

# Configuration
API_KEY = os.getenv("TELNYX_API_KEY")
BASE_URL = "https://api.telnyx.com/v2/ai"
MODEL = "meta-llama/Meta-Llama-3.1-70B-Instruct"

client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)

async def sleep(seconds: int):
"""Sleep for a given number of seconds."""
await asyncio.sleep(seconds)
return f"I slept for {seconds} seconds!"

async def dream(subject: str):
"""Dream about a given subject."""
chat_completion = await client.chat.completions.create(
model=MODEL,
messages=[
{
"role": "user",
"content": f"BRIEFLY (one sentence max) describe a dream about {subject}"
}
]
)
return chat_completion.choices[0].message.content

def func_to_tool(f):
"""Convert a function to a tool JSON schema."""
kw = {
n: (o.annotation, ... if o.default == inspect.Parameter.empty else o.default)
for n, o in inspect.signature(f).parameters.items()
}
schema = create_model(f.__name__, **kw).model_json_schema()
tool_json = {
"type": "function",
"function": {
"name": schema["title"],
"description": inspect.getdoc(f),
"parameters": schema
}
}
return tool_json

Parsing Streaming Tools + Executing Tasks in Parallel

Next we will define a few functions to help us parse and execute tasks in parallel.

handle_tool_calls

The handle_tool_calls function will iterate over streamed chunks from the chat completions endpoint. The language model may invoke multiple tool calls to be executed in parallel and will differentiate them using the index attribute on the chunk.

As we progress through the stream, we will build our local copy of this list of function calls in the tool_calls list.

The first chunk of a new tool call will contain the name of the function. This enables you to give early feedback to users that a function will be executed. In this example, we simply print the name of the function when it is detected.

As we build the arguments from the streamed chunks, we attempt to parse what we have built as JSON. Once we have a valid JSON object, we create an async task to be scheduled for execution (if we have not already done so).

NB: Telnyx guarantees valid JSON is returned for tool calls, so you don't have to worry about lengthy retries or fuzzy matching.

execute_tasks

This function executes the tasks from the previous function and returns the results as they are completed, enabling users to receive feedback as soon as possible.

func_wrapper

This is a trivial helper function that exposes the tool call ID and function name to execute_tasks

async def func_wrapper(func, tool_call_id, **kwargs):
"""Wrap a function to return its ID + name when executed."""
result = await func(**kwargs)
return tool_call_id, func.__name__, result

async def execute_tasks(tasks):
"""Execute asynchronous tasks and collect their results."""
results = []
for task in asyncio.as_completed(tasks):
tool_call_id, func_name, result = await task
print(f"Executed {func_name}, results: {result}")
results.append(
{
"tool_call_id": tool_call_id,
"role": "tool",
"name": func_name,
"content": result,
}
)
return results

async def handle_tool_calls(chat_completion, function_map):
"""Handle streaming tool calls from chat completion."""
tool_calls = []
tasks = []
tasked_tool_ids = set()

async for chunk in chat_completion:
delta = chunk.choices[0].delta
if delta and delta.tool_calls:
# We have detected tool calls from the LLM
tcchunklist = delta.tool_calls
for tcchunk in tcchunklist:
index = tcchunk.index or 0
if len(tool_calls) <= index:
# Based on the index, we have a new tool call
tool_calls.append(
{
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": ""
}
}
)
tc = tool_calls[index]

if tcchunk.id:
tc["id"] += tcchunk.id
if tcchunk.function.name:
tc["function"]["name"] += tcchunk.function.name
print(f"Detected function: {tcchunk.function.name}")
if tcchunk.function.arguments:
tc["function"]["arguments"] += tcchunk.function.arguments
try:
kwargs = json.loads(tc["function"]["arguments"])
except json.JSONDecodeError:
# We don't have the full arguments JSON yet
continue
else:
if tc["id"] not in tasked_tool_ids:
func_name = tc["function"]["name"]
print(f"Executing {func_name} with {kwargs}")
wrapped_func = func_wrapper(function_map[func_name], tc["id"], **kwargs)
task = asyncio.create_task(wrapped_func)
tasks.append(task)
tasked_tool_ids.add(tc["id"])

return tool_calls, tasks

Putting it all together

With our helper functions defined, we are ready to stream and execute multiple function calls in parallel. In this code, we:

  • Ask the language model to sleep and dream at the same time
  • Execute the returned tool calls in parallel
  • Provide the results back to the language model and get a final response
async def main():
prompt = "Take a quick 10 second power nap and dream about Telnyx. Then write a haiku about it!"
messages = [{"role": "user", "content": prompt}]
print(f"Prompt: {prompt}")

functions = [sleep, dream]
function_map = {f.__name__: f for f in functions}
tools = [func_to_tool(func) for func in functions]

chat_completion = await client.chat.completions.create(
model=MODEL,
messages=messages,
tools=tools,
tool_choice="required",
stream=True
)

tool_calls, tasks = await handle_tool_calls(chat_completion, function_map)

messages.append(
{
"role": "assistant",
"tool_calls": tool_calls,
}
)

task_results = await execute_tasks(tasks)
messages.extend(task_results)

print("Sending results back to LLM...")
print()
second_chat_completion = await client.chat.completions.create(
model=MODEL,
messages=messages,
stream=True,
)

async for chunk in second_chat_completion:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()

if __name__ == "__main__":
asyncio.run(main())

The output of the print statements in this script will look something like this.

Notice that sleep was detected and executed first, but dream still returned results first.

Prompt: Take a quick 10 second power nap and dream about Telnyx. Then write a haiku about it!
Detected function: sleep
Executing sleep with {'seconds': 10}
Detected function: dream
Executing dream with {'subject': 'Telnyx'}
Executed dream, results: In my dream, I was walking through a futuristic cityscape where Telnyx's logo was emblazoned on skyscrapers, and I could hear the hum of millions of concurrent voice calls and messages being transmitted seamlessly through their network.
Executed sleep, results: I slept for 10 seconds!
Sending results back to LLM...

Here is a haiku about Telnyx:

Telnyx city glows
Voices whisper through the air
Connected we stand