Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.2
YandexGPT
Function ID
yandexgpt
Creator
@kyaroslav83
Downloads
96+
Yandex Manifold Pipe
Get
README
No README available
Function Code
Show
""" title: Yandex GPT Manifold Pipe authors: kyaroslav83 version: 0.1.2 required_open_webui_version: 0.3.17 license: MIT This function is available only from russian IP, you need to set the environment variable PROXY_URL ("http://user:pass@proxyhost:port" or "http://proxyhost:port") """ import os import json import asyncio import aiohttp from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: class Valves(BaseModel): YANDEX_API_KEY: str = Field(default="") YANDEX_CATALOG_ID: str = Field(default="") PROXY_URL: str = Field(default="") def __init__(self): self.type = "manifold" self.id = "yandex" self.name = "yandex/" self.valves = self.Valves( **{ "YANDEX_API_KEY": os.getenv("YANDEX_API_KEY", ""), "YANDEX_CATALOG_ID": os.getenv("YANDEX_CATALOG_ID", ""), "PROXY_URL": os.getenv("PROXY_URL", ""), } ) def get_yandex_models(self): return [ {"id": "yandexgpt-lite", "name": "YandexGPT-Lite"}, {"id": "yandexgpt", "name": "YandexGPT"}, ] def pipes(self) -> List[dict]: return self.get_yandex_models() def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] for message in messages: processed_messages.append( {"role": message["role"], "text": message.get("content", "")} ) if system_message: processed_messages.insert( 0, {"role": "system", "text": str(system_message)} ) # Determine the correct model URI if body["model"] == "yandexgpt-lite": model_uri = f"gpt://{self.valves.YANDEX_CATALOG_ID}/yandexgpt-lite/latest" else: # default to yandexgpt model_uri = f"gpt://{self.valves.YANDEX_CATALOG_ID}/yandexgpt/latest" payload = { "modelUri": model_uri, "completionOptions": { "stream": body.get("stream", True), "temperature": body.get("temperature", 0.7), "maxTokens": body.get("max_tokens", 4096), }, "messages": processed_messages, } headers = { "Authorization": f"Api-Key {self.valves.YANDEX_API_KEY}", "x-folder-id": self.valves.YANDEX_CATALOG_ID, "Content-Type": "application/json", } url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion" try: if body.get("stream", True): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" async def stream_response(self, url, headers, payload): try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, proxy=self.valves.PROXY_URL ) as response: if response.status != 200: raise Exception( f"HTTP Error {response.status}: {await response.text()}" ) full_response = "" async for line in response.content: if line: try: data = json.loads(line) chunk = data["result"]["alternatives"][0]["message"][ "text" ] new_content = chunk[len(full_response) :] if new_content: yield new_content full_response += new_content # Delay to avoid overwhelming the client await asyncio.sleep(0.01) except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" async def non_stream_response(self, url, headers, payload): try: async with aiohttp.ClientSession() as session: async with session.post( url, json=payload, headers=headers, proxy=self.valves.PROXY_URL ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"HTTP Error {response.status}: {error_text}") response_json = await response.json() full_response = str( response_json["result"]["alternatives"][0]["message"]["text"] ) yield full_response except Exception as e: print(f"Error in non_stream_response method: {e}") yield f"Error: {e}"