Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
AnythingLLM Pipe
Function ID
anythingllm_pipe
Creator
@halu
Downloads
202+
OpenAI Compatible Endpoints Pipe for AnythingLLM
Get
README
No README available
Function Code
Show
""" title: OpenAI Compatible Endpoints Pipe for AnythingLLM author: halu version: 0.1.0 license: MIT last_updated:2024-11-19 description: This Python code serves as a pipe to connect OpenAI Compatible Endpoints with the AnythingLLM API, enabling seamless interaction and utilization of the services provided by AnythingLLM within an OpenAI-like framework. Additionally, it realizes the streaming/non-streaming information transmission between AnythingLLM and the Open WebUI by the way of OpenAI Compatible Endpoints here. """ import os import json from pydantic import BaseModel, Field import requests import time from typing import List, Union, Iterator # Set DEBUG to True to enable detailed logging DEBUG = False class Pipe: class Valves(BaseModel): AnythingLLM_url: str = Field( default="http://host.docker.internal:3001/api/v1/openai/chat/completions" ) # Your AnythingLLM_url AnythingLLM_API_KEY: str = Field(default="") # Your AnythingLLM API key DEFAULT_MODEL: str = Field(default="") # Your model(workspace_name) def __init__(self): self.id = "finance" self.type = "manifold" self.name = "MyFinance: " self.valves = self.Valves( **{ "AnythingLLM_url": os.getenv( "AnythingLLM_url", "http://host.docker.internal:3001/api/v1/openai/chat/completions", ), "AnythingLLM_API_KEY": os.getenv("AnythingLLM_API_KEY", ""), "DEFAULT_MODEL": os.getenv("AnythingLLM_DEFAULT_MODEL", ""), } ) def get_AnythingLLM_models(self): """Return available models - for AnythingLLM we'll return a fixed list""" return [{"id": "finance", "name": "finance"}] def pipes(self) -> List[dict]: return self.get_AnythingLLM_models() def pipe(self, body: dict) -> Union[str, Iterator[str]]: try: # Use default model ID since AnythingLLM has a single endpoint model_id = self.valves.DEFAULT_MODEL messages = [] # OpenAI Compatible Endpoints # Process messages including system, user, and assistant messages for message in body["messages"]: if isinstance(message.get("content"), list): # For OpenAI, we'll join multiple content parts into a single text text_parts = [] for content in message["content"]: if content["type"] == "text": text_parts.append(content["text"]) elif content["type"] == "image_url": # OpenAI might not support image inputs - add a note about the image text_parts.append(f"[Image: {content['image_url']['url']}]") messages.append( {"role": message["role"], "content": " ".join(text_parts)} ) else: # Handle simple text messages messages.append( {"role": message["role"], "content": message["content"]} ) if DEBUG: print("OpenAI API request:") print(" Model:", model_id) print(" Messages:", json.dumps(messages, indent=2)) url = self.valves.AnythingLLM_url headers = { "accept": "*/*", "Authorization": f"Bearer {self.valves.AnythingLLM_API_KEY}", "Content-Type": "application/json", } # Prepare the API call parameters data = { "messages": messages, "model": model_id, "stream": body.get("stream", True), "temperature": body.get("temperature", 0.7), } # Add stop sequences if provided if body.get("stop"): data["stop"] = body["stop"] if body.get("stream", False): return self.stream_response(url, headers, data) else: response = requests.post(url, headers=headers, json=data) return json.loads(response.text)["choices"][0]["message"].get("content") except Exception as e: if DEBUG: print(f"Error in pipe method: {e}") return f"Error: {e}" # streaming requests def stream_response(self, url, headers, data): try: # Send streaming requests with requests.post( url, headers=headers, json=data, stream=True, timeout=(2, 30), verify=False, ) as response: # Check the response status if response.status_code != 200: raise Exception( f"HTTP Error {response.status_code}: {response.text}" ) # Iterate on the response flow for line in response.iter_lines(): if line: line = line.decode("utf-8") # Decode the response content if line.startswith("data: "): try: # Parsing JSON data data = json.loads(line[6:]) # If there is content, extract and return if data["choices"][0]["delta"].get("content"): yield data["choices"][0]["delta"]["content"] time.sleep(0.01) # Control the frequency of requests except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}"