Whitepaper
Docs
Sign In
Function
Function
pipe
v0.3.0
OpenRouter CoT
Function ID
openrouter_cot
Creator
@aloxaf
Downloads
66+
Show the streaming CoT for OpenRouter thinking model like DeepSeek R1 or Claude 3.7 Sonnet
Get
README
No README available
Function Code
Show
""" title: OpenRouter CoT author: Aloxaf description: Show the streaming CoT for OpenRouter thinking model like DeepSeek R1 or Claude 3.7 Sonnet version: 0.3.0 license: MIT 0.3.0: - Support OpenWebUI reasoning_effort as a number or a valid effort level(low, medium, high) - Gracefully handle cancelled requests 0.2.2: - Better error handling """ import asyncio import logging import json from typing import AsyncGenerator import httpx from httpx_sse import EventSource from pydantic import BaseModel, Field log = logging.getLogger(__name__) log.setLevel(logging.INFO) class Pipe: class Valves(BaseModel): OPENROUTER_BASE_URL: str = Field( default="https://openrouter.ai/api/v1", description="The base URL of OpenRouter", ) OPENROUTER_API_KEY: str = Field( default="", description="The API key of OpenRouter", ) OPENROUTER_MODEL: str = Field( default="deepseek/deepseek-r1,anthropic/claude-3-7-sonnet", description="The model name", ) DEFAULT_REASONING_EFFORT: str = Field( default="medium", description="The default reasoning effort, can be a number(max tokens) or a valid effort level(low, medium, high)", ) def __init__(self): self.valves = self.Valves() def pipes(self): models = self.valves.OPENROUTER_MODEL.split(",") return [ { "id": model, "name": model, } for model in models ] async def pipe(self, body: dict) -> AsyncGenerator[str, None]: if not self.valves.OPENROUTER_API_KEY: yield "Error: OPENROUTER_API_KEY is not set" return headers = { "Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}", "Content-Type": "application/json", } model_id = body["model"].split(".", 1)[-1] payload = {**body, "model": model_id, "include_reasoning": True} thinking = False log.info(f"model: {model_id}") try: if reasoning_effort := payload.get("reasoning_effort"): payload["reasoning"] = self._parse_reasoning_effort(reasoning_effort) del payload["reasoning_effort"] else: payload["reasoning"] = self._parse_reasoning_effort( self.valves.DEFAULT_REASONING_EFFORT ) async with httpx.AsyncClient(http2=True) as client: async with client.stream( "POST", f"{self.valves.OPENROUTER_BASE_URL}/chat/completions", headers=headers, json=payload, ) as response: if response.status_code != 200: description = await response.aread() yield f"HTTP {response.status_code} Error:\n{description.decode()}" return async for event in EventSource(response).aiter_sse(): if event.data == "[DONE]": return data = json.loads(event.data) if reasoning := data["choices"][0]["delta"].get("reasoning"): if not thinking: thinking = True yield "<thinking>" yield reasoning elif content := data["choices"][0]["delta"].get("content"): if thinking: thinking = False yield "</thinking>" yield content elif error := data["choices"][0].get("error"): if thinking: thinking = False yield "</thinking>" yield f"Error: {error}" return else: log.info(data) except Exception as e: yield f"Error: {e}" except asyncio.CancelledError: if thinking: yield "</thinking>" def _parse_reasoning_effort(self, reasoning_effort: str) -> dict: if reasoning_effort in ["low", "medium", "high"]: return {"effort": reasoning_effort} elif reasoning_effort.isdigit(): return {"max_tokens": int(reasoning_effort)} else: raise ValueError("Invalid reasoning effort")