"""
title: Enhanced Reasoning Chain Pipe
author: Sam Paniagua
version: 0.5.3
"""
import json
from time import time
from pydantic import BaseModel, Field
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Awaitable, Any, AsyncGenerator
import asyncio
from fastapi import Request
from open_webui.utils.misc import get_last_user_message
from open_webui.routers.ollama import generate_chat_completion as ollama_chat_completion
from open_webui.routers.openai import generate_chat_completion as openai_chat_completion
import logging
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.set_name("enhanced_reasoning_chain_pipe")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
@dataclass
class User:
id: str
email: str
name: str
role: str
class Pipe:
class Valves(BaseModel):
THINKING_MODEL: str = Field(
default="your_thinking_model_id_here",
description="Model used for reasoning steps. Separate multiple models with a comma."
)
USE_OPENAI_API_THINKING_MODEL: bool = Field(
default=False,
description="Use OpenAI API for thinking model if True, Ollama if False."
)
RESPONDING_MODEL: str = Field(
default="your_responding_model_id_here",
description="Model used for final response generation."
)
USE_OPENAI_API_RESPONDING_MODEL: bool = Field(
default=False,
description="Use OpenAI API for responding model if True, Ollama if False."
)
ENABLE_SHOW_THINKING_TRACE: bool = Field(
default=False,
description="Toggle visibility of reasoning trace."
)
MAX_THINKING_TIME: int = Field(
default=120,
description="Maximum time in seconds for reasoning steps."
)
def __init__(self):
self.type = "manifold"
self.valves = self.Valves()
self.total_thinking_tokens = 0
self.max_thinking_time_reached = False
self.__user__ = None
self._json_buffer = ""
def pipes(self):
name = "reasoning-"
for model in self.valves.THINKING_MODEL.split(","):
name += model.strip().split(":")[0] + "-"
name = name[:-1] + "-to-" + self.valves.RESPONDING_MODEL.strip().split(":")[0]
return [{"name": name, "id": name}]
def get_chunk_content(self, chunk: bytes):
self._json_buffer += chunk.decode("utf-8")
while True:
newline_index = self._json_buffer.find("\n")
if newline_index == -1:
break
line = self._json_buffer[:newline_index].strip()
self._json_buffer = self._json_buffer[newline_index + 1:]
if not line:
continue
try:
chunk_data = json.loads(line)
if "message" in chunk_data and "content" in chunk_data["message"]:
yield chunk_data["message"]["content"]
if chunk_data.get("done", False):
break
except json.JSONDecodeError as e:
logger.error(f'ChunkDecodeError: unable to parse "{line[:100]}": {e}')
self._json_buffer = line + "\n" + self._json_buffer
break
async def get_response(self, model: str, messages: List[Dict[str, str]], thinking: bool, stream: bool):
use_openai_api = (
self.valves.USE_OPENAI_API_THINKING_MODEL
if thinking
else self.valves.USE_OPENAI_API_RESPONDING_MODEL
)
generate_completion = openai_chat_completion if use_openai_api else ollama_chat_completion
response = await generate_completion(
self.__request__,
{"model": model, "messages": messages, "stream": stream},
user=self.__user__,
)
return response
async def get_completion(self, model: str, messages: list, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None):
response = None
try:
thinking = False
stream = False
response = await self.get_response(model, messages, thinking, stream)
if not response:
return "**No content available**"
if "choices" in response and response["choices"]:
return response["choices"][0]["message"]["content"]
if "message" in response and "content" in response["message"]:
return response["message"]["content"]
return "**No content available**"
except Exception as e:
await self.set_status_end(f"Error: Is {model} a valid model? ({e})", __event_emitter__)
finally:
if response and hasattr(response, "close"):
await response.close()
async def stream_response(self, model: str, messages: List[Dict[str, str]], thinking: bool, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None) -> AsyncGenerator[str, None]:
start_time = time()
try:
stream = True
response = await self.get_response(model, messages, thinking, stream)
while True:
chunk = await response.body_iterator.read(1024)
if not chunk:
break
for part in self.get_chunk_content(chunk):
yield part
if thinking and (time() - start_time > self.valves.MAX_THINKING_TIME):
logger.info(f'Max thinking time reached for model "{model}"')
self.max_thinking_time_reached = True
break
if self.max_thinking_time_reached:
await response.close()
return
except Exception as e:
api = "OpenAI" if (thinking and self.valves.USE_OPENAI_API_THINKING_MODEL) else "Ollama"
category = "Thinking" if thinking else "Responding"
await self.set_status_end(f"{category} Error: Invalid model {model} in {api} API ({e})", __event_emitter__)
finally:
if response and hasattr(response, "close"):
await response.close()
async def run_step(self, model: str, messages: list, prompt: str, thinking: bool, step_name: str, title_name: str, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None) -> str:
messages = json.loads(json.dumps(messages))
messages[-1] = {"role": "user", "content": prompt}
await self.send_data(f"\n### {title_name}\n", thinking, __event_emitter__)
response_text = ""
num_tokens = 0
async for chunk in self.stream_response(model.strip(), messages, thinking, __event_emitter__):
response_text += chunk
num_tokens += 1
await self.send_data(chunk, thinking, __event_emitter__)
await self.set_status(f"{step_name} ({num_tokens} tokens)", __event_emitter__)
if thinking:
self.total_thinking_tokens += num_tokens
return response_text.strip()
async def run_thinking(self, k: int, n: int, model: str, messages: list, query: str, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None) -> str:
thinking_with = f"with {model}" if n == 1 else f"with {model} {k}/{n}"
prompt = """You are an advanced reasoning model. Analyze the query step-by-step and output your reasoning in the specified format.
User Query:
{query}
Respond in the following format:
<reasoning>
Step 1: Understanding the query
[Your understanding here]
Step 2: Identifying key concepts
[Key concepts here]
Step 3: Generating multiple reasoning paths
- Path 1: [Reasoning path 1]
- Path 2: [Reasoning path 2]
...
Step 4: Evaluating and critiquing paths
[Critique each path, identify strengths/weaknesses, and select the best]
Step 5: Final reasoning chain
[Best reasoning chain]
</reasoning>
""".format(query=query)
reasoning = await self.run_step(model, messages, prompt, True, f"Reasoning {thinking_with}", f"{model} Reasoning", __event_emitter__)
await self.set_status(f"Completed reasoning {thinking_with}", __event_emitter__)
await asyncio.sleep(0.2)
return reasoning
async def run_responding(self, messages: list, query: str, reasonings: list, is_final_step: bool, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None) -> str:
await self.set_status("Synthesizing final response...", __event_emitter__)
reasonings_section = "\n".join([f"Reasoning {i+1}:\n{reasoning}\n" for i, reasoning in enumerate(reasonings)])
prompt = f"""You are tasked with generating a final response based on multiple reasoning chains.
Available Reasoning Chains:
{reasonings_section}
User Query:
{query}
Follow these steps:
1. Review each reasoning chain and extract key insights.
2. Critique the chains for accuracy and completeness.
3. Synthesize a response by combining the best elements.
4. Ensure the response is concise, logical, and directly addresses the query.
Provide only the final synthesized response.
"""
response_text = await self.run_step(self.valves.RESPONDING_MODEL.strip(), messages, prompt, not is_final_step, "Generating response", "Final Response", __event_emitter__)
await asyncio.sleep(0.2)
return response_text
async def run_thinking_pipeline(self, k: int, models: list, messages: list, query: str, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None) -> str:
return await self.run_thinking(k + 1, len(models), models[k], messages, query, __event_emitter__)
async def pipe(self, body: dict, __user__: dict, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]], __request__: Request, __task__=None) -> str:
self.__user__ = User(**__user__)
self.__request__ = __request__
messages = body["messages"]
query = get_last_user_message(messages)
if __task__ is None:
start_time = time()
models = self.valves.THINKING_MODEL.split(",")
reasonings = [await self.run_thinking_pipeline(i, models, messages, query, __event_emitter__) for i in range(len(models))]
total_duration = int(time() - start_time)
await self.run_responding(messages, query, reasonings, True, __event_emitter__)
status_msg = (
f"Reasoned with {self.total_thinking_tokens} tokens in max time {total_duration}s"
if self.max_thinking_time_reached
else f"Reasoned with {self.total_thinking_tokens} tokens in {total_duration}s"
)
await self.set_status_end(status_msg, __event_emitter__)
return ""
else:
return await self.get_completion(self.valves.RESPONDING_MODEL.strip(), messages, __event_emitter__)
async def set_status(self, description: str, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None):
await __event_emitter__({"type": "status", "data": {"description": description, "done": False}})
async def send_data(self, data: str, thinking: bool, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None):
if not thinking or self.valves.ENABLE_SHOW_THINKING_TRACE:
await __event_emitter__({"type": "message", "data": {"content": data, "role": "assistant-thinking" if thinking else "assistant"}})
async def set_status_end(self, data: str, __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None):
await __event_emitter__({"type": "status", "data": {"description": data, "done": True}})