Skip to main content

Medical diagnosis workflow

A full agent workflow using @workflow, @task, and @tool with concurrent specialist agents.
from respan_tracing import RespanTelemetry, workflow, task, tool
from openai import OpenAI

telemetry = RespanTelemetry(api_key="your-api-key")
client = OpenAI()

@tool(name="load_medical_report")
def load_medical_report(path: str):
    with open(path, "r") as f:
        return f.read()

@task(name="specialist_analysis")
def specialist_analysis(role: str, report: str):
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"You are a {role}. Analyze this report."},
            {"role": "user", "content": report},
        ],
    )
    return response.choices[0].message.content

@task(name="final_diagnosis")
def final_diagnosis(analyses: dict):
    combined = "\n".join(f"{k}: {v}" for k, v in analyses.items())
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Synthesize these specialist analyses into a final diagnosis."},
            {"role": "user", "content": combined},
        ],
    )
    return response.choices[0].message.content

@workflow(name="medical_diagnosis_workflow")
def run_diagnosis(report_path: str):
    report = load_medical_report(report_path)
    analyses = {
        "Cardiologist": specialist_analysis("cardiologist", report),
        "Pulmonologist": specialist_analysis("pulmonologist", report),
    }
    return final_diagnosis(analyses)

Update span with customer metadata

Attach user identifiers and custom metadata to spans during execution.
from respan_tracing import RespanTelemetry, get_client, workflow
from openai import OpenAI

telemetry = RespanTelemetry(api_key="your-api-key")

@workflow(name="user_query")
def user_query(user_id: str, prompt: str):
    client = get_client()
    client.update_current_span(
        respan_params={
            "customer_identifier": user_id,
            "metadata": {"source": "api", "tier": "premium"},
        }
    )
    response = OpenAI().chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.choices[0].message.content

user_query("user-123", "Explain tracing")

Events and exception handling

Add structured events and record caught exceptions for debugging.
from respan_tracing import RespanTelemetry, get_client, workflow, task
from opentelemetry.trace import StatusCode

telemetry = RespanTelemetry(api_key="your-api-key")

@task(name="validate_data")
def validate_data(data):
    client = get_client()
    try:
        if not data or len(str(data)) < 3:
            raise ValueError("Data is too short")
        client.update_current_span(
            status=StatusCode.OK,
            attributes={"validation.result": "success"},
        )
        return f"validated_{data}"
    except Exception as e:
        client.record_exception(e)
        client.update_current_span(attributes={"validation.result": "failed"})
        raise

@workflow(name="data_processing")
def data_processing(data):
    client = get_client()
    client.add_event("processing_started", {"data_size": len(str(data))})
    result = validate_data(data)
    client.add_event("processing_completed")
    return result

Multiple exporters with processor routing

Route different spans to different destinations using named processors.
from respan_tracing import RespanTelemetry, workflow, task
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

class FileExporter(SpanExporter):
    def __init__(self, filename: str):
        self.filename = filename

    def export(self, spans):
        with open(self.filename, "a") as f:
            for span in spans:
                f.write(f"{span.name}: {span.status}\n")
        return SpanExportResult.SUCCESS

# Default Respan exporter + named "debug" file exporter
telemetry = RespanTelemetry(api_key="your-api-key")
telemetry.add_processor(
    exporter=FileExporter("debug_traces.log"),
    name="debug",
)

@task(name="debug_only_task", processors="debug")
def debug_only_task():
    return "only goes to file exporter"

@task(name="everywhere_task")
def everywhere_task():
    return "goes to both Respan and file exporter"

@workflow(name="routing_example")
def routing_example():
    debug_only_task()
    everywhere_task()

Span buffering and batch export

Collect spans into a buffer for manual inspection and deferred export.
from respan_tracing import RespanTelemetry, get_client

telemetry = RespanTelemetry(api_key="your-api-key")
client = get_client()

with client.get_span_buffer("batch-trace-001") as buffer:
    buffer.create_span("step_1", {"status": "completed"})
    buffer.create_span("step_2", {"status": "completed"})
    buffer.create_span("step_3", {"status": "failed"})

    print(f"Buffered spans: {buffer.get_span_count()}")  # 3
    collected = buffer.get_all_spans()

# Export all at once
client.process_spans(collected)

Manual span creation

Use the OpenTelemetry tracer directly for fine-grained span control.
from respan_tracing import RespanTelemetry, get_client, workflow

telemetry = RespanTelemetry(api_key="your-api-key")

@workflow(name="manual_spans")
def manual_spans():
    client = get_client()
    tracer = client.get_tracer()

    with tracer.start_as_current_span("data_fetch") as span:
        span.set_attribute("source", "database")
        data = [1, 2, 3]

    with tracer.start_as_current_span("data_transform") as span:
        span.set_attribute("transform", "double")
        result = [x * 2 for x in data]

    return result

manual_spans()

Context-based attributes

Apply Respan attributes to all spans within a context block.
from respan_tracing import RespanTelemetry, workflow, task, respan_span_attributes

telemetry = RespanTelemetry(api_key="your-api-key")

@task(name="inner_task")
def inner_task():
    return "processed"

@workflow(name="context_workflow")
def context_workflow(user_id: str):
    with respan_span_attributes({
        "customer_identifier": user_id,
        "trace_group_identifier": "experiment-v2",
        "metadata": {"team": "ml"},
    }):
        return inner_task()

context_workflow("user-789")

Multi-LLM provider workflow

Trace calls across multiple LLM providers in a single workflow. Use block_instruments to suppress noisy HTTP-level spans.
import asyncio
from openai import OpenAI
from anthropic import Anthropic
from respan_tracing import RespanTelemetry, get_client
from respan_tracing.decorators import workflow, task
from respan_tracing.instruments import Instruments

telemetry = RespanTelemetry(
    app_name="multi-llm",
    api_key="your-api-key",
    block_instruments={Instruments.REQUESTS, Instruments.URLLIB3},
)

openai_client = OpenAI()
anthropic_client = Anthropic()

@task(name="openai_call")
async def openai_call(prompt: str) -> str:
    resp = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=50,
    )
    return resp.choices[0].message.content

@task(name="anthropic_call")
async def anthropic_call(prompt: str) -> str:
    resp = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=50,
        messages=[{"role": "user", "content": prompt}],
    )
    return resp.content[0].text

@workflow(name="multi_llm_workflow")
async def multi_llm_workflow(prompt: str):
    client = get_client()
    client.update_current_span(
        respan_params={
            "customer_identifier": "multi_llm_user",
            "metadata": {"prompt": prompt},
        }
    )
    openai_result = await openai_call(prompt)
    anthropic_result = await anthropic_call(prompt)
    return {"openai": openai_result, "anthropic": anthropic_result}

asyncio.run(multi_llm_workflow("Say hello briefly."))
telemetry.flush()

Cost and pricing updates

Set custom cost and unit pricing on spans for accurate cost tracking.
from respan_tracing import RespanTelemetry, get_client, workflow, task

telemetry = RespanTelemetry(api_key="your-api-key")

@task(name="tracked_generation")
def tracked_generation():
    client = get_client()
    # After an LLM call, update cost and pricing
    prompt_tokens, completion_tokens = 120, 80
    prompt_unit_price = 0.0000025   # per token
    completion_unit_price = 0.000010  # per token
    cost = prompt_tokens * prompt_unit_price + completion_tokens * completion_unit_price

    client.update_current_span(
        attributes={
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "total_request_tokens": prompt_tokens + completion_tokens,
            "cost": cost,
            "prompt_unit_price": prompt_unit_price,
            "completion_unit_price": completion_unit_price,
        }
    )

Customer details and TTFT

Attach customer email/name and track time-to-first-token (TTFT) via metadata.
import time
from respan_tracing import RespanTelemetry, get_client, workflow, task

telemetry = RespanTelemetry(api_key="your-api-key")

@task(name="customer_aware_task")
def customer_aware_task():
    client = get_client()

    # Set customer details
    client.update_current_span(
        respan_params={
            "customer_email": "user@example.com",
            "customer_name": "Demo User",
        }
    )

    # Track TTFT
    start = time.perf_counter()
    # ... wait for first token from LLM ...
    ttft = round(time.perf_counter() - start, 3)
    client.update_current_span(
        respan_params={
            "metadata": {"time_to_first_token": ttft}
        }
    )

Debug logging

Enable verbose SDK logging to troubleshoot tracing issues.
from respan_tracing import RespanTelemetry

telemetry = RespanTelemetry(
    api_key="your-api-key",
    app_name="debug_app",
    log_level="DEBUG",
)
Set log_level="DEBUG" to see detailed output from the tracer, processors, and exporters.