YouTip LogoYouTip

Python Reasoning Planning

Python Implementation of Reasoning and Planning |

ReAct Framework

ReAct (Reasoning + Acting) is a framework that combines reasoning and acting.

In the ReAct mode, the Agent alternates between reasoning and acting, forming a closed loop.

Reasoning guides action, and the results of actions feed back into the reasoning process, looping repeatedly.

ReAct simulates human problem-solving processes: think before taking action, observe the results after acting, and then continue reasoning based on new information.

Working Principle

The core idea of ReAct can be summarized as a three-stage cycle:

  1. Reasoning Stage: The Agent generates thoughts about the next step based on the current context.
  2. Action Stage: The Agent executes the selected tool or action.
  3. Observation Stage: The Agent receives the result of the action, updates the context, and returns to the first stage to continue reasoning.

Code Implementation

Below is a simplified implementation of a ReAct Agent:

Basic Implementation of ReAct Agent

class ReActAgent:

"""

 ReAct framework implementation for an Agent

 Core mechanism: Reasoning -> Acting -> Observing -> Reasoning (loop)

 """
def __init__ (self, llm, tools, max_iterations=5):
    # LLM instance for reasoning and generation
    self.llm= llm
    # List of available tools
    self.tools= tools
    # Maximum iterations to prevent infinite loops
    self.max_iterations= max_iterations

def run(self, task):
    """

 Execute ReAct loop

 :param task: User task description
 :return: Execution result

 """
    # Initialize context with task and history
    context ={
        "task": task,
        "steps": [], # Record of executed steps
        "observations": [] # Record of observations
    }
    for i in range(self.max_iterations):
        # Phase 1: Reasoning - Generate next action based on current context
        reasoning =self.llm.reason(context)
        # Check if action is needed or direct answer should be returned
        if reasoning.needs_action:
            # Select tool to use
            action = reasoning.select_tool(self.tools)
            # Execute tool and get observation
            observation = action.execute(reasoning.tool_input)
            # Add observation to context
            context["observations"].append(observation)
        else:
            # No action needed, return answer from reasoning
            return reasoning.answer
    # Reached maximum iterations without completion
    return"Reached maximum iteration limit"

Typical Application Scenarios

The ReAct pattern is particularly suitable for the following scenarios:

  • Tasks that require exploration and information collection in an environment before completion.
  • Intelligent search scenarios: The Agent needs to search for information first, then reason and summarize based on the results.
  • Dialogue-based Q&A: The Agent needs to clarify requirements through multi-turn conversations and gather information.
  • Complex problem solving: Problems that cannot be solved in one step and require multiple intermediate steps.

Note: The advantage of the ReAct pattern lies in its flexibility, allowing dynamic adjustment of subsequent actions based on execution results at each step. However, this also means the execution path may be unstable, making it suitable for exploratory tasks.


Chain of Thought (CoT)

Chain of Thought is a technique that prompts models to show their step-by-step reasoning process.

CoT does not let the model directly output the answer but guides it to first display the reasoning steps before reaching the final conclusion.

Why Chain of Thought is Needed

There are several issues with letting the model directly output answers:

  • Intermediate steps of complex reasoning are easily ignored or skipped.
  • It's difficult to locate where errors occur.
  • Users cannot understand the model's thinking process.

Chain of Thought solves these problems by requiring the model to show its reasoning process.

Zero-shot CoT

Zero-shot CoT is a method that stimulates step-by-step reasoning capabilities without examples.

You only need to add a trigger phrase like "Let's think step by step" at the end of the prompt.

Zero-shot CoT Example

# Prompt template for Zero-shot CoT
 prompt ="""

 Question: Xiao Ming has 5 apples, Xiao Hong gave him 3 more, and Xiao Ming ate 2. How many are left?

 Let's think step by step:

 """

# Call language model
 response = llm.generate(prompt)

# Model output example:
# Step 1: Calculate total number after receiving apples
# Xiao Ming originally had 5 apples, Xiao Hong gave him 3
# 5 + 3 = 8 apples
#
# Step 2: Calculate remaining after eating
# Xiao Ming ate 2 apples
# 8 - 2 = 6 apples
#
# Conclusion: 6 apples remain

Few-shot CoT

Few-shot CoT helps the model learn specific reasoning patterns by providing examples with detailed reasoning processes.

When Zero-shot CoT performs poorly, Few-shot CoT can be tried.

Few-shot CoT Example

# Prompt template for Few-shot CoT
 prompt ="""

 Example 1:
 Question: Xiao Zhang has 10 yuan, bought 3 books at 2 yuan each, how much is left?
 Let's think step by step:
 - Xiao Zhang originally had 10 yuan
 - Each book costs 2 yuan, bought 3 books, spent 3 Γ— 2 = 6 yuan
 - 10 - 6 = 4 yuan
 Answer: 4 yuan left

 Example 2:
 Question: A cat catches 2 mice per hour, how many in 8 hours?
 Let's think step by step:
 - Catch 2 mice per hour
 - In 8 hours caught 8 Γ— 2 = 16 mice
 Answer: Caught 16 mice

 Question: {user_question}
 Let's think step by step:

 """
response = llm.generate(prompt)

Advantages of Chain of Thought

Chain of Thought technology brings three core values:

  • Traceability: Decomposes complex reasoning into traceable intermediate steps, facilitating understanding of the reasoning process.
  • Explainability: Enhances model explainability, helping users know where the answer comes from.
  • Accuracy Improvement: Significantly improves accuracy in complex reasoning tasks, especially in mathematical and logical tasks.

Tree of Thoughts (ToT)

Tree of Thoughts is an extension of Chain of Thought, no longer limited to linear reasoning.

ToT explores multiple possible paths at each reasoning node, forming a tree structure.

This allows the Agent to perform multi-path exploration, backtracking, and global evaluation.

Difference from Chain of Thought

  • CoT uses linear reasoning, where each step depends on the previous conclusion, suitable for problems with clear paths.
  • ToT uses spatial reasoning, considering multiple possible branches simultaneously, suitable for problems requiring exploration and planning.

Code Implementation

Basic Implementation of ToT Agent

class ToTAgent:

"""

 Tree of Thoughts Agent implementation

 Core mechanism: Generate multiple candidate branches at each node, evaluate them, and select the best to continue

 """
def __init__ (self, llm, max_depth=4, beam_size=3):
    # LLM instance
    self.llm= llm
    # Maximum search depth
    self.max_depth= max_depth
    # Number of candidate nodes retained per layer (beam width)
    self.beam_size= beam_size

def solve(self, problem):
    """

 Solve problem using ToT framework

 :param problem: Problem description
 :return: Optimal solution

 """
    # Create root node with problem as initial state
    root = ThoughtNode(problem, depth=0)
    # Initialize list of frontier nodes (nodes to be expanded)
    frontier =
    # Expand layer by layer
    for depth in range(self.max_depth):
        # Store all candidate nodes
        all_candidates =[]
        # Traverse all frontier nodes
        for node in frontier:
            # Generate multiple candidate next steps for current node
            candidates =self.llm.generate_thoughts(
                node.content,# Current node content
                n=self.beam_size# Number to generate
            )
            # Create new nodes and add to candidate list
            for cand in candidates:
                all_candidates.append(
                    ThoughtNode(cand, depth + 1, parent=node)
                )
        # Evaluate all candidate nodes
        evaluated =self.evaluator.rank(all_candidates)
        # Select top beam_size nodes as frontier for next round
        frontier = evaluated[:self.beam_size]
        # Check if solution found
        if self.is_solution(frontier):
            break
    # Backtrack to find optimal solution
    return self.backtrack_best(frontier)

Application Scenarios

ToT is particularly suitable for scenarios requiring choices or planning:

  • Creative writing: Generate multiple story directions, evaluate, and choose the best.
  • Strategic planning: Evaluate potential outcomes of multiple action plans.
  • Complex decision-making: Decision problems requiring consideration of multiple possibilities.

Task Planning and MCTS

Monte Carlo Tree Search (MCTS) is a heuristic search algorithm used for complex decision-making problems.

MCTS is widely applied in Agent planning, especially in game AI and complex task planning.

Core Idea

MCTS evaluates the potential value of each decision node by simulating random games.

It doesn't evaluate all possible paths but guides the search direction through sampling and statistics.

Four Steps of MCTS

  1. Selection: Start from the root node, recursively select the best child node until reaching a leaf node. Use UCB (Upper Confidence Bound) formula during selection to balance exploration and exploitation.
  2. Expansion: Add one or more child nodes to the leaf node.
  3. Simulation: Starting from the new node, randomly simulate the game until it ends.
  4. Backpropagation: Update statistical information of all nodes along the simulation path.

Code Implementation

MCTS Planner Implementation

import math

class MCTSNode:

"""

 MCTS tree node

 Stores node state, statistical data, and relationships with children

 """
def __init__ (self, state, parent=None, action=None):
    # Current state
    self.state= state
    # Reference to parent node
    self.parent= parent
    # Action from parent node to this node
    self.action= action
    # List of child nodes
    self.children=[]
    # Visit count of this node
    self.visit_count=0
    # Cumulative reward value of this node
    self.reward=0.0

def is_fully_expanded(self):
    """Check if all possible child nodes have been expanded"""
    return len(self.children)>0

def is_terminal(self):
    """Check if it's a terminal node (game over or goal achieved)"""
    return self.state.is_terminal()

def uct_child(self):
    """

 Select best child node using UCT formula

 UCT = reward/visits + C * sqrt(ln(parent_visits)/visits)
 C is exploration constant, typically set to sqrt(2)

 """
    # Exploration constant, balancing exploration and exploitation
    C =math.sqrt(2)
    return max(
        self.children,
        key=lambda c: c.reward / c.visit_count + 
        C * math.sqrt(math.log(self.visit_count) / c.visit_count)
    )

class MCTSPlanner:

"""

 MCTS planner

 Uses Monte Carlo Tree Search to generate action plans for Agents

 """
def __init__ (self, simulation_limit=1000, exploration_constant=1.41):
    # Maximum simulation count
    self.simulation_limit= simulation_limit
    # Exploration constant
    self.exploration_constant= exploration_constant

def plan(self, initial_state):
    """

 Plan from initial state

 :param initial_state: Initial state
 :return: Optimal action

 """
    # Create root node
    root = MCTSNode(initial_state)
    # Perform multiple simulations
    for _ in range(self.simulation_limit):
        # 1. Selection: Select best child node from root down to leaf node
        node =self._selection(root)
        # 2. Expansion: If not terminal, expand with a new node
        if not node.is_terminal():
            node =self._expansion(node)
        # 3. Simulation: Randomly simulate from new node to termination
        reward =self._simulation(node)
        # 4. Backpropagation: Update statistics of all nodes on path
        self._backpropagation(node, reward)
    # Return action corresponding to best child of root node
    return root.best_child().action

def _selection(self, node):
    """Selection phase: Select best child node"""
    while node.is_fully_expanded()and not node.is_terminal():
        node = node.uct_child()
    return node

def _expansion(self, node):
    """Expansion phase: Add new child nodes"""
    # Generate all possible actions
    possible_actions = node.state.get_possible_actions()
    # Create a child node for each action
    for action in possible_actions:
        new_state = node.state.apply_action(action)
        child = MCTSNode(new_state, parent=node, action=action)
        node.children.append(child)
    # Return a random child node (could also use deterministic strategy)
    return node.children

def _simulation(self, node):
    """Simulation phase: Randomly simulate to game end"""
    state = node.state
    while not state.is_terminal():
        # Randomly choose action
        actions = state.get_possible_actions()
        action =random.choice(actions)
        state = state.apply_action(action)
    # Return final reward
    return state.get_reward()

def _backpropagation(self, node, reward):
    """Backpropagation: Update statistics"""
    while node is not None:
        node.visit_count +=1
        node.reward += reward
        node = node.parent

Note: MCTS has high computational cost, suitable for scenarios requiring deep planning with clear termination conditions. For tasks with high real-time requirements, simulation counts may need to be limited or other methods used.


Reflexion (Self-reflection)

Reflexion is a technique that gives Agents self-reflection capabilities.

By adding a reflection mechanism to the Agent, it can analyze reasons for failure after failure, adjust strategies, and retry.

Core Idea

Agents not only execute actions but also observe results and reflect: Why did I fail? How should I improve next time?

This capability is crucial for continuous learning and self-improvement.

Code Implementation

Reflexion Agent Implementation

class ReflexionAgent:

"""

 Agent with self-reflection capability

 Core mechanism: Execute -> Review -> Reflect -> Retry

 """
def __init__ (self, actor, reviewer, max_retries=3):
    # Actor: responsible for executing specific tasks
    self.actor= actor
    # Reviewer: responsible for evaluating execution results
    self.reviewer= reviewer
    # Maximum retry attempts
    self.max_retries= max_retries

def run(self, task):
    """

 Execute task with self-reflection

 :param task: Task description
 :return: Execution result

 """
    # Maintain execution history
    history =[]
    for attempt in range(self.max_retries):
        # Phase 1: Try to execute task
        result =self.actor.execute(task, history)
        # Record this attempt
        history.append({
            "attempt": attempt,
            "result": result
        })
        # Phase 2: Review result
        feedback =self.reviewer.evaluate(task, result, history)
        # Phase 3: Reflect and decide whether to retry
        if not self.should_retry(feedback):
            return result
    # If all retries exhausted, return last result
    return result
← Multi Agent SystemReasoning Planning β†’