Whitepaper
Docs
Sign In
Function
Function
pipe
v0.0.2
OPENAI
Function ID
openai
Creator
@coker
Downloads
100+
OpenAI Responses Pipe
Get
README
No README available
Function Code
Show
""" title: OpenAI Responses Pipe author_url:https://linux.do/u/coker/summary author: coker version: 0.0.2 license: MIT """ from pydantic import BaseModel, Field import httpx import json import random class Pipe: class Valves(BaseModel): NAME_PREFIX: str = Field( default="OpenAI: ", description="Prefix to be added before model names.", ) BASE_URL: str = Field( default="https://api.openai.com/v1", description="Base URL for OpenAI API.", ) API_KEYS: str = Field( default="", description="API keys for OpenAI, use , to split", ) THINKING_EFFORT: str = Field( default="medium", description="low ,medium, high", ) def __init__(self): self.valves = self.Valves() self.openai_response_models = [ "o1-pro", "gpt-4.5-preview", "o1", "chatgpt-4o-latest", "o3-mini", ] self.not_supported_efforts_models = ["chatgpt-4o-latest", "gpt-4.5-preview"] def pipes(self): res = [] if self.openai_response_models: for model in self.openai_response_models: res.append({"name": f"{self.valves.NAME_PREFIX}{model}", "id": model}) return res async def pipe(self, body: dict, __user__: dict): self.key = random.choice(self.valves.API_KEYS.split(",")).strip() print(f"pipe:{__name__}") headers = { "Authorization": f"Bearer {self.key}", "Content-Type": "application/json", } model_id = body["model"][body["model"].find(".") + 1 :] payload = {**body, "model": model_id} payload["stream"] = True if model_id not in self.not_supported_efforts_models: payload["reasoning"] = {"effort": self.valves.THINKING_EFFORT.strip()} if model_id in self.openai_response_models: new_messages = [] messages = body["messages"] for message in messages: try: if message["role"] == "user": if isinstance(message["content"], list): content = [] for i in message["content"]: if i["type"] == "text": content.append( {"type": "input_text", "text": i["text"]} ) elif i["type"] == "image_url": content.append( { "type": "input_image", "image_url": i["image_url"]["url"], } ) new_messages.append({"role": "user", "content": content}) else: new_messages.append( { "role": "user", "content": [ {"type": "input_text", "text": message["content"]} ], } ) elif message["role"] == "assistant": new_messages.append( { "role": "assistant", "content": [ {"type": "output_text", "text": message["content"]} ], } ) elif message["role"] == "system": new_messages.append( { "role": "system", "content": [ {"type": "input_text", "text": message["content"]} ], } ) except Exception: yield f"Error: {message}" return payload.pop("messages") payload["input"] = new_messages else: yield f"Error: Model {model_id} unKnown" return try: async with httpx.AsyncClient(timeout=600) as client: async with client.stream( "POST", f"{self.valves.BASE_URL}/responses", json=payload, headers=headers, ) as response: if response.status_code != 200: error_text = await response.aread() yield f"Error: {response.status_code} {error_text.decode('utf-8')}" return async for line in response.aiter_lines(): if line: try: if line.startswith("data:"): line_data = json.loads(line[5:]) yield line_data["delta"] except Exception: pass except Exception as e: yield f"Error: {e}" return