OpenAI SDK

  1. Sign up — Create an account at platform.respan.ai
  2. Create an API key — Generate one on the API keys page
  3. Add credits or a provider key — Add credits on the Credits page or connect your own provider key on the Integrations page

Add the Docs MCP to your AI coding tool to get help building with Respan. No API key needed.

1{
2 "mcpServers": {
3 "respan-docs": {
4 "url": "https://mcp.respan.ai/mcp/docs"
5 }
6 }
7}

What is OpenAI SDK?

The OpenAI SDK is the official client for OpenAI’s APIs, available for both Python and TypeScript/JavaScript. It supports Chat Completions and the Responses API. Respan can auto-instrument all OpenAI calls for tracing, route them through the Respan gateway for model switching and prompt management, or both.

Setup

1

Install packages

$pip install respan-ai respan-instrumentation-openai openai python-dotenv
2

Set environment variables

$export RESPAN_API_KEY="YOUR_RESPAN_API_KEY"
$export RESPAN_BASE_URL="https://api.respan.ai/api" # optional, this is the default

The Respan API key authenticates both LLM inference (gateway) and telemetry export (tracing).

3

Initialize and run

1import os
2from dotenv import load_dotenv
3
4load_dotenv()
5
6from openai import OpenAI
7from respan import Respan
8from respan_instrumentation_openai import OpenAIInstrumentor
9
10respan = Respan(instrumentations=[OpenAIInstrumentor()])
11
12client = OpenAI(
13 api_key=os.getenv("RESPAN_API_KEY"),
14 base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
15)
16
17response = client.chat.completions.create(
18 model="gpt-4.1-nano",
19 messages=[{"role": "user", "content": "Say hello in three languages."}],
20)
21print(response.choices[0].message.content)
22respan.flush()
4

View your trace

Open the Traces page to see your auto-instrumented LLM spans.

Always call respan.flush() (Python) or await respan.flush() (TypeScript) before your process exits. Without it, pending spans may be lost.

Configuration

ParameterTypeDefaultDescription
api_keystr | NoneNoneFalls back to RESPAN_API_KEY env var.
base_urlstr | NoneNoneFalls back to RESPAN_BASE_URL env var.
instrumentationslist[]Plugin instrumentations to activate (e.g. OpenAIInstrumentor()).
is_auto_instrumentbool | NoneFalseAuto-discover and activate all installed instrumentors via OpenTelemetry entry points.
customer_identifierstr | NoneNoneDefault customer identifier for all spans.
metadatadict | NoneNoneDefault metadata attached to all spans.
environmentstr | NoneNoneEnvironment tag (e.g. "production").

Attributes

Attach customer identifiers, thread IDs, and metadata to spans.

In Respan()

Set defaults at initialization — these apply to all spans.

1from respan import Respan
2from respan_instrumentation_openai import OpenAIInstrumentor
3
4respan = Respan(
5 instrumentations=[OpenAIInstrumentor()],
6 customer_identifier="user_123",
7 metadata={"service": "chat-api", "version": "1.0.0"},
8)

With propagate_attributes

Override per-request using a context scope.

1from respan import Respan, workflow, propagate_attributes
2from respan_instrumentation_openai import OpenAIInstrumentor
3
4respan = Respan(
5 instrumentations=[OpenAIInstrumentor()],
6 metadata={"service": "chat-api", "version": "1.0.0"},
7)
8
9@workflow(name="handle_request")
10def handle_request(user_id: str, question: str):
11 with propagate_attributes(
12 customer_identifier=user_id,
13 thread_identifier="conv_001",
14 metadata={"plan": "pro"}, # merged with default metadata
15 ):
16 response = client.chat.completions.create(
17 model="gpt-4.1-nano",
18 messages=[{"role": "user", "content": question}],
19 )
20 print(response.choices[0].message.content)
AttributeTypeDescription
customer_identifierstrIdentifies the end user in Respan analytics.
thread_identifierstrGroups related messages into a conversation.
metadatadictCustom key-value pairs. Merged with default metadata.

Decorators (optional)

Decorators are not required. All OpenAI calls are auto-traced by the instrumentor. Use @workflow and @task (Python) or withWorkflow and withTask (TypeScript) to add structure when you want to group related calls into a named workflow with nested tasks.

1from respan import Respan, workflow, task
2from respan_instrumentation_openai import OpenAIInstrumentor
3
4respan = Respan(instrumentations=[OpenAIInstrumentor()])
5
6@task(name="generate_outline")
7def outline(topic: str) -> str:
8 response = client.chat.completions.create(
9 model="gpt-4.1-nano",
10 messages=[
11 {"role": "system", "content": "Create a brief outline."},
12 {"role": "user", "content": topic},
13 ],
14 )
15 return response.choices[0].message.content
16
17@workflow(name="content_pipeline")
18def pipeline(topic: str):
19 plan = outline(topic)
20 response = client.chat.completions.create(
21 model="gpt-4.1-nano",
22 messages=[
23 {"role": "system", "content": "Write content from this outline."},
24 {"role": "user", "content": plan},
25 ],
26 )
27 print(response.choices[0].message.content)
28
29pipeline("Benefits of API gateways")
30respan.flush()

Streaming

Streaming responses are auto-traced like regular completions.

1from openai import OpenAI
2from respan import Respan
3from respan_instrumentation_openai import OpenAIInstrumentor
4
5respan = Respan(instrumentations=[OpenAIInstrumentor()])
6
7client = OpenAI(
8 api_key=os.getenv("RESPAN_API_KEY"),
9 base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
10)
11
12stream = client.chat.completions.create(
13 model="gpt-4.1-nano",
14 messages=[{"role": "user", "content": "Write a haiku about Python."}],
15 stream=True,
16)
17
18for chunk in stream:
19 content = chunk.choices[0].delta.content
20 if content:
21 print(content, end="", flush=True)
22print()
23
24respan.flush()

Tool calls

Function calling is auto-traced. Wrap the workflow with @workflow and @task decorators for a structured trace tree.

1import os
2import json
3from openai import OpenAI
4from respan import Respan, workflow, task
5from respan_instrumentation_openai import OpenAIInstrumentor
6
7respan = Respan(instrumentations=[OpenAIInstrumentor()])
8
9client = OpenAI(
10 api_key=os.getenv("RESPAN_API_KEY"),
11 base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
12)
13
14tools = [
15 {
16 "type": "function",
17 "function": {
18 "name": "get_weather",
19 "description": "Get the weather for a city.",
20 "parameters": {
21 "type": "object",
22 "properties": {"city": {"type": "string"}},
23 "required": ["city"],
24 },
25 },
26 }
27]
28
29@task(name="get_weather")
30def get_weather(city: str) -> str:
31 return f"Sunny, 72F in {city}"
32
33@workflow(name="weather_assistant")
34def run(question: str):
35 messages = [{"role": "user", "content": question}]
36
37 response = client.chat.completions.create(
38 model="gpt-4.1-nano",
39 messages=messages,
40 tools=tools,
41 )
42 message = response.choices[0].message
43
44 if message.tool_calls:
45 messages.append(message)
46 for tc in message.tool_calls:
47 args = json.loads(tc.function.arguments)
48 result = get_weather(**args)
49 messages.append(
50 {"role": "tool", "tool_call_id": tc.id, "content": result}
51 )
52
53 final = client.chat.completions.create(
54 model="gpt-4.1-nano",
55 messages=messages,
56 tools=tools,
57 )
58 print(f"Answer: {final.choices[0].message.content}")
59
60run("What's the weather in Paris?")
61respan.flush()

Multi-turn conversations

Multi-turn conversations are auto-traced. Each create() call becomes its own span. Use @workflow to group them into a single trace.

1import os
2from openai import OpenAI
3from respan import Respan, workflow
4from respan_instrumentation_openai import OpenAIInstrumentor
5
6respan = Respan(instrumentations=[OpenAIInstrumentor()])
7
8client = OpenAI(
9 api_key=os.getenv("RESPAN_API_KEY"),
10 base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
11)
12
13@workflow(name="conversation")
14def chat():
15 messages = [
16 {"role": "system", "content": "You are a concise cooking assistant."}
17 ]
18 questions = [
19 "What can I make with eggs and cheese?",
20 "How long does the omelette take?",
21 "Any tips to make it fluffy?",
22 ]
23
24 for question in questions:
25 messages.append({"role": "user", "content": question})
26 response = client.chat.completions.create(
27 model="gpt-4.1-nano",
28 messages=messages,
29 )
30 answer = response.choices[0].message.content
31 messages.append({"role": "assistant", "content": answer})
32 print(f"User: {question}")
33 print(f"Bot: {answer}\n")
34
35chat()
36respan.flush()

Structured output

JSON mode with Pydantic models is auto-traced.

1import os
2from pydantic import BaseModel
3from openai import OpenAI
4from respan import Respan, workflow
5from respan_instrumentation_openai import OpenAIInstrumentor
6
7respan = Respan(instrumentations=[OpenAIInstrumentor()])
8
9client = OpenAI(
10 api_key=os.getenv("RESPAN_API_KEY"),
11 base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
12)
13
14class MovieReview(BaseModel):
15 title: str
16 rating: int
17 summary: str
18 pros: list[str]
19 cons: list[str]
20
21@workflow(name="movie_review")
22def review(movie: str) -> MovieReview:
23 response = client.beta.chat.completions.parse(
24 model="gpt-4.1-nano",
25 messages=[
26 {"role": "system", "content": "You are a film critic. Rate movies 1-10."},
27 {"role": "user", "content": f"Review: {movie}"},
28 ],
29 response_format=MovieReview,
30 )
31 return response.choices[0].message.parsed
32
33result = review("The Matrix")
34print(f"{result.title} — {result.rating}/10")
35print(f"Summary: {result.summary}")
36respan.flush()

Batch API

The Batch API lets you submit large batches of requests for async processing at 50% cost. Use respan.log_batch_results() to log each batch result as an individual traced span in Respan.

Respan also provides a Batch API endpoint for batch processing with tracking parameters.

The Batch API requires a direct OPENAI_API_KEY. It does not go through the Respan gateway.

1import os
2import json
3import time
4from openai import OpenAI
5from respan import Respan, workflow, task
6from respan_instrumentation_openai import OpenAIInstrumentor
7
8respan = Respan(instrumentations=[OpenAIInstrumentor()])
9
10# Batch API requires direct OpenAI access
11client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
12
13@task(name="create_batch_file")
14def create_batch_file() -> str:
15 tasks = []
16 for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"]):
17 tasks.append({
18 "custom_id": f"topic-{i}",
19 "method": "POST",
20 "url": "/v1/chat/completions",
21 "body": {
22 "model": "gpt-4.1-nano",
23 "messages": [
24 {"role": "system", "content": "Explain in one sentence."},
25 {"role": "user", "content": f"What is {topic}?"},
26 ],
27 },
28 })
29
30 file_path = "/tmp/batch_input.jsonl"
31 with open(file_path, "w") as f:
32 for t in tasks:
33 f.write(json.dumps(t) + "\n")
34 return file_path
35
36@task(name="upload_and_submit")
37def upload_and_submit(file_path: str) -> str:
38 batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch")
39 batch = client.batches.create(
40 input_file_id=batch_file.id,
41 endpoint="/v1/chat/completions",
42 completion_window="24h",
43 )
44 return batch.id
45
46@task(name="poll_batch")
47def poll_batch(batch_id: str) -> str:
48 while True:
49 batch = client.batches.retrieve(batch_id)
50 if batch.status == "completed":
51 return batch.output_file_id
52 elif batch.status in ("failed", "expired", "cancelled"):
53 raise RuntimeError(f"Batch {batch.status}")
54 time.sleep(5)
55
56@task(name="download_results")
57def download_results(output_file_id: str):
58 content = client.files.content(output_file_id).content
59 results = [json.loads(line) for line in content.decode().strip().split("\n")]
60
61 with open("/tmp/batch_input.jsonl") as f:
62 requests = [json.loads(line) for line in f]
63
64 # Log each batch result as an individual traced span
65 respan.log_batch_results(requests, results)
66
67 for r in results:
68 print(f"{r['custom_id']}: {r['response']['body']['choices'][0]['message']['content']}")
69
70@workflow(name="batch_pipeline")
71def run():
72 file_path = create_batch_file()
73 batch_id = upload_and_submit(file_path)
74 output_file_id = poll_batch(batch_id)
75 download_results(output_file_id)
76
77run()
78respan.flush()

Async batch (cross-process)

For long-running batches where submission and retrieval happen in separate processes, save the trace_id and pass it to log_batch_results() later to link results back to the original trace.

1import os
2import json
3import time
4from openai import OpenAI
5from respan import Respan, workflow, task, get_client
6from respan_instrumentation_openai import OpenAIInstrumentor
7
8respan = Respan(instrumentations=[OpenAIInstrumentor()])
9client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
10
11# Phase 1: Submit batch and save trace context
12@task(name="create_and_submit")
13def create_and_submit(requests: list) -> dict:
14 file_path = "/tmp/batch_async.jsonl"
15 with open(file_path, "w") as f:
16 for r in requests:
17 f.write(json.dumps(r) + "\n")
18
19 batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch")
20 batch = client.batches.create(
21 input_file_id=batch_file.id,
22 endpoint="/v1/chat/completions",
23 completion_window="24h",
24 )
25
26 # Save trace ID — in production, persist this to a database
27 rc = get_client()
28 trace_id = rc.get_current_trace_id()
29
30 return {"batch_id": batch.id, "trace_id": trace_id, "input_file": file_path}
31
32@workflow(name="batch_submit")
33def submit(requests):
34 return create_and_submit(requests)
35
36# Phase 2: Retrieve results (separate process/job)
37def retrieve_and_log(saved: dict):
38 batch_id = saved["batch_id"]
39
40 while True:
41 batch = client.batches.retrieve(batch_id)
42 if batch.status == "completed":
43 break
44 elif batch.status in ("failed", "expired", "cancelled"):
45 raise RuntimeError(f"Batch {batch.status}")
46 time.sleep(5)
47
48 content = client.files.content(batch.output_file_id).content
49 results = [json.loads(line) for line in content.decode().strip().split("\n")]
50
51 with open(saved["input_file"]) as f:
52 requests = [json.loads(line) for line in f]
53
54 # Log results into the ORIGINAL trace
55 respan.log_batch_results(requests, results, trace_id=saved["trace_id"])
56
57# Run
58requests = [
59 {
60 "custom_id": f"topic-{i}",
61 "method": "POST",
62 "url": "/v1/chat/completions",
63 "body": {
64 "model": "gpt-4.1-nano",
65 "messages": [
66 {"role": "system", "content": "Explain in one sentence."},
67 {"role": "user", "content": f"What is {topic}?"},
68 ],
69 },
70 }
71 for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"])
72]
73
74saved = submit(requests)
75respan.flush()
76
77# Later (could be a different process)...
78retrieve_and_log(saved)
79respan.flush()

Gateway features

The features below require the Gateway or Both setup from Step 3.

Switch models

Change the model parameter to use 250+ models from different providers through the same gateway.

1# OpenAI
2response = client.chat.completions.create(model="gpt-4o", messages=messages)
3
4# Anthropic
5response = client.chat.completions.create(model="claude-sonnet-4-5-20250929", messages=messages)
6
7# Google
8response = client.chat.completions.create(model="gemini-2.5-flash", messages=messages)

See the full model list.

Prompt management

Use Respan prompt management to serve prompt templates from the platform. Use schema_version: 2 for all new integrations.

Chat Completions

Prompt messages are the base layer (system/context). Body messages are appended as runtime user turns.

1PROMPT_ID = "YOUR_PROMPT_ID"
2
3response = client.chat.completions.create(
4 model="gpt-4.1-nano",
5 messages=[
6 {"role": "user", "content": "Add dark mode support to the dashboard"},
7 ],
8 extra_body={
9 "prompt": {
10 "prompt_id": PROMPT_ID,
11 "schema_version": 2,
12 "variables": {
13 "feature_request": "Add dark mode support",
14 },
15 }
16 },
17)
18print(response.choices[0].message.content)

Responses API

For the Responses API, pass prompt config under respan_params. The prompt template becomes instructions and body input is preserved.

1PROMPT_ID = "YOUR_PROMPT_ID"
2
3response = client.responses.create(
4 model="gpt-4.1-nano",
5 input=[
6 {"role": "user", "content": "Add dark mode support to the dashboard"},
7 ],
8 extra_body={
9 "respan_params": {
10 "prompt": {
11 "prompt_id": PROMPT_ID,
12 "schema_version": 2,
13 "variables": {
14 "feature_request": "Add dark mode support",
15 },
16 }
17 }
18 },
19)
20print(response.output_text)

Prompt options

FieldTypeDefaultDescription
prompt_idstrPrompt identifier (required).
schema_versionint1Set 2 for v2 merge semantics (recommended).
versionint | "latest"deployed versionPin a specific version or use "latest" for the most recent draft.
variablesdict{}Key-value pairs for template rendering.
patchdictRuntime overrides for prompt config (v2 only). Cannot include messages or input.

Respan parameters

Pass additional Respan parameters via extra_body for gateway features.

1response = client.chat.completions.create(
2 model="gpt-4.1-nano",
3 messages=[{"role": "user", "content": "Hello"}],
4 extra_body={
5 "customer_identifier": "user_123",
6 "fallback_models": ["gpt-3.5-turbo"],
7 "metadata": {"session_id": "abc123"},
8 "thread_identifier": "conversation_456",
9 },
10)

See Respan parameters for the full list.