Multi-Turn Conversations & Agent Threads in Microsoft Agent Framework – Part 5

Part 5 of the Microsoft Agent Framework Series

Real-world AI applications rarely consist of single-turn interactions. Customer support, technical assistance, research workflows — they all require ongoing conversations with context preserved across multiple exchanges.

In this article, we’ll explore Agent Threads — the powerful state management system in Microsoft Agent Framework that enables sophisticated multi-turn conversations.

Thread Lifecycle Management

PhaseDescriptionCode
CreationStart a new conversationthread = agent.get_new_thread()
ActiveOngoing conversationawait agent.run(msg, thread)
PersistenceSave for laterthread.save() / storage adapter
ResumeContinue laterthread = await storage.load(id)
CleanupEnd sessionthread.clear()

Context Window Management

As conversations grow, you may exceed the model’s context window. Strategies to manage this:

Summarization Strategy

async def manage_long_conversation(agent, thread, max_messages=20):
    """Summarize old messages when thread gets too long."""
    
    if len(thread.messages) > max_messages:
        # Get older messages for summarization
        old_messages = thread.messages[:-10]  # Keep last 10 intact
        old_content = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages])
        
        # Create summary using a separate call
        summary_result = await agent.run(
            f"Summarize this conversation context concisely: {old_content}"
        )
        
        # Replace old messages with summary
        thread.messages = [
            {"role": "system", "content": f"Previous context summary: {summary_result.text}"}
        ] + thread.messages[-10:]
        
        print(f"Context summarized. Messages reduced from {max_messages + 1} to {len(thread.messages)}")
    
    return thread

# Usage in conversation loop
async def conversation_with_summarization():
    agent = client.create_agent(name="LongConversationBot", instructions="...")
    thread = agent.get_new_thread()
    
    while True:
        user_input = input("You: ")
        if user_input.lower() == "exit":
            break
        
        # Manage context before each response
        thread = await manage_long_conversation(agent, thread)
        
        result = await agent.run(user_input, thread)
        print(f"Assistant: {result.text}")

.NET / C# Implementation

using Azure.Identity;
using OpenAI;
using Microsoft.Agents.AI;

namespace MAF.Part05.MultiTurn;

/// 
/// Part 5: Multi-Turn Conversations in .NET
/// 
public class MultiTurnDemo
{
    public static async Task Main(string[] args)
    {
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
            ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT not set");

        var agent = new AzureOpenAIClient(
                new Uri(endpoint),
                new DefaultAzureCredential())
            .GetOpenAIResponseClient("gpt-4o")
            .CreateAIAgent(
                name: "AssistantBot",
                instructions: "You are a helpful assistant. Remember context from the conversation.");

        // Create a thread for multi-turn conversation
        var thread = agent.GetNewThread();

        Console.WriteLine("=== Multi-Turn Conversation Demo ===\n");

        // First turn
        Console.WriteLine("User: I'm Bob, and I need help with Azure.");
        var result1 = await agent.RunAsync("I'm Bob, and I need help with Azure.", thread);
        Console.WriteLine($"Assistant: {result1}\n");

        // Second turn - agent remembers the context
        Console.WriteLine("User: What cloud platform am I asking about?");
        var result2 = await agent.RunAsync("What cloud platform am I asking about?", thread);
        Console.WriteLine($"Assistant: {result2}\n");  // Will mention Azure

        // Third turn - agent still has context
        Console.WriteLine("User: Remind me of my name?");
        var result3 = await agent.RunAsync("Remind me of my name?", thread);
        Console.WriteLine($"Assistant: {result3}\n");  // Will say Bob

        Console.WriteLine("=== Demo Complete ===");
    }
}

Thread Persistence

For production applications, persist threads to storage:

import json
import redis
from datetime import datetime
from typing import Optional

class RedisThreadStore:
    """Persist agent threads to Redis for recovery and session management."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.ttl = 86400 * 7  # 7 days expiration
    
    async def save_thread(self, session_id: str, thread) -> None:
        """Save thread state to Redis."""
        data = {
            "messages": thread.messages,
            "metadata": {
                "created": datetime.now().isoformat(),
                "message_count": len(thread.messages),
                "last_updated": datetime.now().isoformat()
            }
        }
        self.redis.setex(
            f"agent:thread:{session_id}", 
            self.ttl, 
            json.dumps(data)
        )
        print(f"Thread saved: {session_id} ({len(thread.messages)} messages)")
    
    async def load_thread(self, session_id: str, agent) -> Optional[object]:
        """Load thread from Redis, or create new if not found."""
        data = self.redis.get(f"agent:thread:{session_id}")
        
        if data:
            parsed = json.loads(data)
            thread = agent.get_new_thread()
            thread.messages = parsed["messages"]
            print(f"Thread loaded: {session_id} ({len(thread.messages)} messages)")
            return thread
        
        print(f"No existing thread found for {session_id}, creating new")
        return agent.get_new_thread()
    
    async def delete_thread(self, session_id: str) -> bool:
        """Delete a thread from Redis."""
        result = self.redis.delete(f"agent:thread:{session_id}")
        return result > 0
    
    async def list_sessions(self, pattern: str = "agent:thread:*") -> list:
        """List all active session IDs."""
        keys = self.redis.keys(pattern)
        return [k.decode().split(":")[-1] for k in keys]

# Usage example
async def persistent_conversation():
    store = RedisThreadStore()
    agent = client.create_agent(name="PersistentBot", instructions="...")
    
    session_id = "user-12345"
    
    # Load existing thread or create new
    thread = await store.load_thread(session_id, agent)
    
    # Run conversation
    result = await agent.run("Continue where we left off", thread)
    print(f"Assistant: {result.text}")
    
    # Save after each interaction
    await store.save_thread(session_id, thread)

.NET / C# Implementation

using Microsoft.Agents.AI;

public async Task ManageLongConversation(IAgent agent, IThread thread, int maxMessages = 20)
{
    // Retrieve messages (hypothetical accessor)
    var messages = await thread.GetMessagesAsync(); 
    
    if (messages.Count > maxMessages)
    {
        Console.WriteLine($"Summarizing history... ({messages.Count} messages)");
        
        // Keep last 10 messages intact
        var recentMessages = messages.TakeLast(10).ToList();
        var olderMessages = messages.Take(messages.Count - 10).ToList();
        
        // Create summarization prompt
        var oldContent = string.Join("\n", olderMessages.Select(m => $"{m.Role}: {m.Content}"));
        var summaryPrompt = $"Summarize this conversation context concisely:\n{oldContent}";
        
        // Generate summary using the agent
        var summaryResult = await agent.RunAsync(summaryPrompt);
        
        // Update Thread: Clear and Replace
        await thread.ClearAsync();
        await thread.AddMessageAsync(new Message(Role.System, $"Previous context summary: {summaryResult}"));
        
        // Re-add recent messages
        foreach(var msg in recentMessages)
        {
            await thread.AddMessageAsync(msg);
        }
    }
}

Human-in-the-Loop Patterns

Threads enable approval workflows where human review is required:

import asyncio
from dataclasses import dataclass
from typing import Optional
from enum import Enum

class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

@dataclass
class ActionRequest:
    action: str
    params: dict
    requires_approval: bool
    reason: str

# In-memory store for pending approvals (use Redis/DB in production)
pending_approvals = {}

async def check_if_approval_needed(action: str, params: dict) -> ActionRequest:
    """Determine if an action requires human approval."""
    high_risk_actions = ["delete_account", "refund_over_100", "modify_permissions"]
    
    requires_approval = action in high_risk_actions
    reason = f"Action '{action}' is classified as high-risk" if requires_approval else ""
    
    return ActionRequest(
        action=action,
        params=params,
        requires_approval=requires_approval,
        reason=reason
    )

async def request_human_approval(
    session_id: str,
    action_request: ActionRequest,
    thread,
    thread_store
) -> dict:
    """Pause workflow and request human approval."""
    
    # Save thread state
    await thread_store.save_thread(session_id, thread)
    
    # Store pending approval
    approval_id = f"approval-{session_id}-{action_request.action}"
    pending_approvals[approval_id] = {
        "session_id": session_id,
        "action": action_request.action,
        "params": action_request.params,
        "status": ApprovalStatus.PENDING,
        "reason": action_request.reason
    }
    
    # Notify approver (email, Slack, Teams, etc.)
    await notify_approver(approval_id, action_request)
    
    return {
        "status": "pending_approval",
        "approval_id": approval_id,
        "message": f"Action '{action_request.action}' requires approval. ID: {approval_id}"
    }

async def notify_approver(approval_id: str, action_request: ActionRequest):
    """Send notification to human approver."""
    print(f"\n🔔 APPROVAL REQUIRED")
    print(f"   ID: {approval_id}")
    print(f"   Action: {action_request.action}")
    print(f"   Params: {action_request.params}")
    print(f"   Reason: {action_request.reason}")
    print(f"   Approve with: handle_approval('{approval_id}', True)")

async def handle_approval(
    approval_id: str,
    approved: bool,
    agent,
    thread_store
) -> str:
    """Process human approval decision and resume workflow."""
    
    if approval_id not in pending_approvals:
        return f"Approval {approval_id} not found"
    
    approval = pending_approvals[approval_id]
    session_id = approval["session_id"]
    
    # Load the saved thread
    thread = await thread_store.load_thread(session_id, agent)
    
    if approved:
        # Execute the approved action
        result = f"Action '{approval['action']}' executed successfully"
        approval["status"] = ApprovalStatus.APPROVED
        
        # Inform agent of approval
        response = await agent.run(
            f"The action was approved and executed. Result: {result}",
            thread
        )
    else:
        approval["status"] = ApprovalStatus.REJECTED
        response = await agent.run(
            "The action was rejected by the approver. Please suggest alternatives.",
            thread
        )
    
    # Save updated thread
    await thread_store.save_thread(session_id, thread)
    
    # Cleanup
    del pending_approvals[approval_id]
    
    return response.text

.NET / C# Implementation

using Microsoft.Agents.AI;
    
public class HumanInLoopWorkflow
{
    public enum ApprovalStatus { Pending, Approved, Rejected }
    
    public record ApprovalRequest(string Id, string Action, object Data, ApprovalStatus Status = ApprovalStatus.Pending);
    
    private readonly Dictionary<string, ApprovalRequest> _pending = new();
    
    public async Task<string> ProcessAction(object agent, string request, object thread)
    {
        // ... (Implementation detailing approval logic)
        return "NEEDS_APPROVAL";
    }
}

.NET / C# Implementation

using System.Text.Json;
using StackExchange.Redis;

namespace MAF.Part05.Persistence;

/// 
/// Part 5: Redis Thread Persistence in .NET
/// 
public class RedisThreadStore
{
    private readonly IDatabase _redis;
    private readonly TimeSpan _ttl = TimeSpan.FromDays(7);

    public RedisThreadStore(string connectionString = "localhost:6379")
    {
        var connection = ConnectionMultiplexer.Connect(connectionString);
        _redis = connection.GetDatabase();
    }

    public async Task SaveThreadAsync(string sessionId, object thread)
    {
        var data = new
        {
            Messages = GetMessages(thread),
            Metadata = new
            {
                Created = DateTime.UtcNow,
                LastUpdated = DateTime.UtcNow
            }
        };

        var json = JsonSerializer.Serialize(data);
        await _redis.StringSetAsync($"agent:thread:{sessionId}", json, _ttl);
        Console.WriteLine($"Thread saved: {sessionId}");
    }

    public async Task LoadThreadAsync(string sessionId, dynamic agent)
    {
        var data = await _redis.StringGetAsync($"agent:thread:{sessionId}");

        if (data.HasValue)
        {
            var parsed = JsonSerializer.Deserialize(data!);
            var thread = agent.GetNewThread();
            // Restore messages to thread
            Console.WriteLine($"Thread loaded: {sessionId}");
            return thread;
        }

        Console.WriteLine($"No thread found for {sessionId}, creating new");
        return agent.GetNewThread();
    }

    public async Task DeleteThreadAsync(string sessionId)
    {
        return await _redis.KeyDeleteAsync($"agent:thread:{sessionId}");
    }

    public async Task> ListSessionsAsync(string pattern = "agent:thread:*")
    {
        var server = _redis.Multiplexer.GetServer(_redis.Multiplexer.GetEndPoints()[0]);
        var keys = server.Keys(pattern: pattern);
        return keys.Select(k => k.ToString().Split(':').Last()).ToList();
    }

    private static List GetMessages(object thread)
    {
        // Extract messages from thread
        var prop = thread.GetType().GetProperty("Messages");
        return prop?.GetValue(thread) as List ?? new List();
    }

    private record ThreadData(List Messages, object Metadata);
}

Best Practices

PracticeRecommendation
Thread per sessionCreate one thread per user session, not per message
Context limitsMonitor message count; summarize when approaching limits
PersistenceUse Redis, Cosmos DB, or SQL for production
TTLSet expiration for inactive threads (e.g., 7 days)
CleanupClear threads on logout or session end

📦 Source Code

All code examples from this article series are available on GitHub:

👉 https://github.com/nithinmohantk/microsoft-agent-framework-series-examples

Clone the repository to follow along:

git clone https://github.com/nithinmohantk/microsoft-agent-framework-series-examples.git
cd microsoft-agent-framework-series-examples

Human-in-the-Loop Workflow (C#)


Series Navigation


References


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.