Spaces:
Running
Running
| import re | |
| from dataclasses import dataclass | |
| from typing import List, Optional | |
| import textwrap | |
| class L2MStep: | |
| """Data class representing a single L2M step""" | |
| number: int | |
| question: str # The sub-question for this step | |
| reasoning: str # The reasoning process | |
| answer: str # The answer to this sub-question | |
| class L2MResponse: | |
| """Data class representing a complete L2M response""" | |
| main_question: str | |
| steps: List[L2MStep] | |
| final_answer: Optional[str] = None | |
| def parse_l2m_response(response_text: str, question: str) -> L2MResponse: | |
| """ | |
| Parse L2M response text to extract steps and final answer. | |
| Args: | |
| response_text: The raw response from the API | |
| question: The original question | |
| Returns: | |
| L2MResponse object containing main question, steps, and final answer | |
| """ | |
| # Extract all steps | |
| step_pattern = r'<step number="(\d+)">\s*<question>(.*?)</question>\s*<reasoning>(.*?)</reasoning>\s*<answer>(.*?)</answer>\s*</step>' | |
| steps = [] | |
| for match in re.finditer(step_pattern, response_text, re.DOTALL): | |
| number = int(match.group(1)) | |
| sub_question = match.group(2).strip() | |
| reasoning = match.group(3).strip() | |
| answer = match.group(4).strip() | |
| steps.append(L2MStep( | |
| number=number, | |
| question=sub_question, | |
| reasoning=reasoning, | |
| answer=answer | |
| )) | |
| # Extract final answer | |
| final_answer_pattern = r'<final_answer>\s*(.*?)\s*</final_answer>' | |
| final_answer_match = re.search(final_answer_pattern, response_text, re.DOTALL) | |
| final_answer = final_answer_match.group(1).strip() if final_answer_match else None | |
| # Sort steps by number | |
| steps.sort(key=lambda x: x.number) | |
| return L2MResponse(main_question=question, steps=steps, final_answer=final_answer) | |
| def wrap_text(text: str, max_chars: int = 40, max_lines: int = 4) -> str: | |
| """Wrap text to fit within box constraints with proper line breaks.""" | |
| text = text.replace('\n', ' ').replace('"', "'") | |
| wrapped_lines = textwrap.wrap(text, width=max_chars) | |
| if len(wrapped_lines) > max_lines: | |
| wrapped_lines = wrapped_lines[:max_lines] | |
| wrapped_lines[-1] = wrapped_lines[-1][:max_chars-3] + "..." | |
| return "<br>".join(wrapped_lines) | |
| def create_mermaid_diagram(l2m_response: L2MResponse, config: 'VisualizationConfig') -> str: | |
| """ | |
| Convert L2M steps to Mermaid diagram. | |
| Args: | |
| l2m_response: L2MResponse object containing the reasoning steps | |
| config: VisualizationConfig for text formatting | |
| Returns: | |
| Mermaid diagram markup as a string | |
| """ | |
| diagram = ['<div class="mermaid">', 'graph TD'] | |
| # Add main question node | |
| question_content = wrap_text(l2m_response.main_question, config.max_chars_per_line, config.max_lines) | |
| diagram.append(f' Q["{question_content}"]') | |
| # Add decomposition node | |
| diagram.append(f' D["Problem Decomposition"]') | |
| diagram.append(f' Q --> D') | |
| # Add all step nodes with sub-questions, reasoning, and answers | |
| if l2m_response.steps: | |
| # Connect decomposition to first step | |
| diagram.append(f' D --> S{l2m_response.steps[0].number}') | |
| for i, step in enumerate(l2m_response.steps): | |
| # Create sub-question node | |
| sq_content = wrap_text(f"Q{step.number}: {step.question}", config.max_chars_per_line, config.max_lines) | |
| sq_id = f'S{step.number}' | |
| diagram.append(f' {sq_id}["{sq_content}"]') | |
| # Create reasoning node | |
| r_content = wrap_text(step.reasoning, config.max_chars_per_line, config.max_lines) | |
| r_id = f'R{step.number}' | |
| diagram.append(f' {r_id}["{r_content}"]') | |
| # Create answer node | |
| a_content = wrap_text(f"A{step.number}: {step.answer}", config.max_chars_per_line, config.max_lines) | |
| a_id = f'A{step.number}' | |
| diagram.append(f' {a_id}["{a_content}"]') | |
| # Connect the nodes | |
| diagram.append(f' {sq_id} --> {r_id}') | |
| diagram.append(f' {r_id} --> {a_id}') | |
| # Connect to next step if exists | |
| if i < len(l2m_response.steps) - 1: | |
| next_id = f'S{l2m_response.steps[i + 1].number}' | |
| diagram.append(f' {a_id} --> {next_id}') | |
| # Add final answer node if exists | |
| if l2m_response.final_answer: | |
| final_content = wrap_text(f"Final: {l2m_response.final_answer}", config.max_chars_per_line, config.max_lines) | |
| diagram.append(f' F["{final_content}"]') | |
| if l2m_response.steps: | |
| diagram.append(f' A{l2m_response.steps[-1].number} --> F') | |
| else: | |
| diagram.append(' D --> F') | |
| # Add styles | |
| diagram.extend([ | |
| ' classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px;', | |
| ' classDef question fill:#e3f2fd,stroke:#1976d2,stroke-width:2px;', | |
| ' classDef reasoning fill:#f9f9f9,stroke:#333,stroke-width:2px;', | |
| ' classDef answer fill:#d4edda,stroke:#28a745,stroke-width:2px;', | |
| ' classDef decomp fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px;', | |
| ' class Q,S1,S2,S3,S4,S5 question;', | |
| ' class R1,R2,R3,R4,R5 reasoning;', | |
| ' class A1,A2,A3,A4,A5,F answer;', | |
| ' class D decomp;', | |
| ' linkStyle default stroke:#666,stroke-width:2px;' | |
| ]) | |
| diagram.append('</div>') | |
| return '\n'.join(diagram) |