Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
Lambda Labs Inferencing Manifold Pipe (EXPERIMENTAL)
Function ID
lambda_labs_inferencing_manifold_pipe
Creator
@barzin
Downloads
4+
An experimental pipe for Lambda Labs' new inferencing service.
Get
README
Function Code
Show
""" WARNING: This pipe was vibe-coded and is experimental! It may require some tuning to work. title: Lambda Labs Pipe authors: barzin author_url: https://github.com/BarzinL funding_url: https://ko-fi.com/barzin version: 0.1.0 required_open_webui_version: 0.5.10 # Assuming same requirement, adjust if needed license: AGPL-3.0-or-later NOTE: Please set your Open-WebUI Function ID to "lambda_labs_inferencing_manifold_pipe" if this does not work. You will need to set the LAMBDA_API_KEY environment variable. """ import os import requests import json import time from typing import List, Union, Generator, Iterator, Dict, AsyncIterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message import base64 # Ensure base64 is imported for image processing class Pipe: class Valves(BaseModel): # Updated valve for Lambda API Key LAMBDA_API_KEY: str = Field(default="") def __init__(self): self.type = "manifold" # Updated ID and name for Lambda self.id = "lambda" self.name = "lambda/" # Updated valve instantiation self.valves = self.Valves(**{"LAMBDA_API_KEY": os.getenv("LAMBDA_API_KEY", "")}) # Updated base URL for Lambda self.base_url = "https://api.lambda.ai/v1/" # Keep image settings for potential future vision models from Lambda self.MAX_IMAGE_SIZE = 20 * 1024 * 1024 # 20MB per image self.SUPPORTED_IMAGE_TYPES = [ "image/jpeg", "image/png", "image/gif", "image/webp", ] # Keep request timeout self.REQUEST_TIMEOUT = (3.05, 60) # (connect timeout, read timeout) def get_lambda_models(self) -> List[dict]: # Updated model list for Lambda Labs # Added the requested model. Add more models here as needed. return [ { "id": "llama-4-maverick-17b-128e-instruct-fp8", "name": "Llama 4 Maverick 17B Instruct FP8", # User-friendly name "supports_vision": False, # This model does not support vision # Add other relevant metadata if available/needed, e.g., context length }, # Example: If Lambda offered a vision model later, you might add: # { # "id": "lambda-vision-model-id", # "name": "Lambda Vision v1", # "supports_vision": True, # }, ] def pipes(self) -> List[dict]: # Return the list of Lambda models return self.get_lambda_models() def _get_model_info(self, model_id: str) -> dict | None: """Helper function to get model info by ID.""" for model in self.get_lambda_models(): if model["id"] == model_id: return model return None def process_content(self, content: Union[str, List[dict]], model_id: str) -> str: """Processes message content, handling text and images if the model supports vision.""" if isinstance(content, str): return content processed_content = "" model_info = self._get_model_info(model_id) supports_vision = ( model_info.get("supports_vision", False) if model_info else False ) for item in content: if item["type"] == "text": processed_content += item["text"] elif item["type"] == "image_url": if supports_vision: try: # OpenAI vision format expects base64 directly in the message structure, # but some APIs might expect different formats (like <image> tags). # Let's stick closer to OpenAI format for vision if Lambda supports it. # For now, since the requested model isn't vision, this part is less critical, # but we keep the processing logic. # Note: The exact format might need adjustment based on Lambda's vision API spec. # This implementation assumes base64 encoding is sufficient. base64_image = self.process_image(item) # Instead of adding <image> tag, prepare for OpenAI format if needed later. # For now, just append a placeholder or skip if not supported. # Since the current model doesn't support vision, we can arguably skip adding anything. # processed_content += f"\n[Image data for {item['image_url']['url']}]\n" # Placeholder print( f"Warning: Image provided for non-vision model '{model_id}'. Skipping image." ) except Exception as e: print(f"Error processing image: {e}") # Decide how to handle image processing errors, e.g., raise or just log. processed_content += f"\n[Error processing image: {e}]\n" else: # Handle images sent to non-vision models gracefully print( f"Warning: Image provided for non-vision model '{model_id}'. Skipping image." ) processed_content += ( f"\n[Image ignored - model does not support vision]\n" ) return processed_content def process_image(self, image_data: Dict) -> str: """Processes an image URL (data URI or web URL) and returns base64 encoded string.""" image_url_data = image_data["image_url"]["url"] if image_url_data.startswith("data:image"): # Handle data URI try: header, base64_data = image_url_data.split(",", 1) # Extract mime type (e.g., 'image/jpeg') mime_type = header.split(":")[1].split(";")[0] if mime_type not in self.SUPPORTED_IMAGE_TYPES: raise ValueError(f"Unsupported image MIME type: {mime_type}") # Estimate size (base64 is ~4/3 * original size) # This is a rough check; actual byte size might differ slightly image_size = len(base64_data) * 3 / 4 if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size ({image_size / (1024*1024):.2f}MB) exceeds limit ({self.MAX_IMAGE_SIZE / (1024*1024)}MB)" ) return base64_data except Exception as e: raise ValueError(f"Error processing data URI: {e}") else: # Handle web URL try: # Use HEAD request first to check size and type without downloading full image response = requests.head( image_url_data, allow_redirects=True, timeout=self.REQUEST_TIMEOUT[0], ) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) content_type = response.headers.get("content-type", "").split(";")[0] if content_type not in self.SUPPORTED_IMAGE_TYPES: raise ValueError(f"Unsupported image content type: {content_type}") content_length = int(response.headers.get("content-length", 0)) if content_length == 0: # Content-Length might be missing; proceed but be wary print( f"Warning: Content-Length missing for image URL: {image_url_data}. Proceeding with download." ) elif content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size ({content_length / (1024*1024):.2f}MB) exceeds limit ({self.MAX_IMAGE_SIZE / (1024*1024)}MB)" ) # Download the image img_response = requests.get( image_url_data, timeout=self.REQUEST_TIMEOUT[1] ) img_response.raise_for_status() # Re-check size if Content-Length was missing or potentially inaccurate if ( content_length == 0 and len(img_response.content) > self.MAX_IMAGE_SIZE ): raise ValueError( f"Downloaded image size ({len(img_response.content) / (1024*1024):.2f}MB) exceeds limit ({self.MAX_IMAGE_SIZE / (1024*1024)}MB)" ) return base64.b64encode(img_response.content).decode("utf-8") except requests.exceptions.RequestException as e: raise ValueError( f"Failed to fetch or process image from URL '{image_url_data}': {e}" ) except Exception as e: raise ValueError( f"An unexpected error occurred while processing image URL '{image_url_data}': {e}" ) async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, Generator, Iterator, AsyncIterator]: # Check for Lambda API Key if not self.valves.LAMBDA_API_KEY: error_msg = "Error: LAMBDA_API_KEY is required. Please set it in your environment variables or Open WebUI settings." if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) # Return the error message directly for non-streaming or as the first part of a generator for streaming # For simplicity here, just returning the string. Open WebUI should handle this. return error_msg try: # Extract system message and user messages system_message, messages = pop_system_message(body["messages"]) # Get the model ID, stripping the pipe prefix model_id = body["model"].replace( "lambda_labs_inferencing_manifold_pipe.", "" ) # Updated prefix # Find model info to check for vision support model_info = self._get_model_info(model_id) if not model_info: raise ValueError(f"Model '{model_id}' not found in pipe configuration.") supports_vision = model_info.get("supports_vision", False) # Process messages for the API payload processed_messages = [] for message in messages: role = message["role"] content = message.get("content", "") # Handle content processing (text/image separation) # Standard OpenAI format for vision expects content as a list of dicts # [{'type': 'text', 'text': '...'}, {'type': 'image_url', 'image_url': {'url': 'data:...'}}] if isinstance(content, list): processed_content_list = [] has_image = False for item in content: if item["type"] == "text": processed_content_list.append( {"type": "text", "text": item["text"]} ) elif item["type"] == "image_url": has_image = True if supports_vision: try: base64_image = self.process_image(item) # Construct image part in OpenAI format processed_content_list.append( { "type": "image_url", "image_url": { # Prepend data URI scheme if not already present "url": f"data:{self.SUPPORTED_IMAGE_TYPES[0]};base64,{base64_image}" # Corrected line: Always format as data URI }, } ) except Exception as e: print( f"Error processing image for API payload: {e}" ) # Add a text note about the error instead of the image processed_content_list.append( { "type": "text", "text": f"[Error processing image: {e}]", } ) else: print( f"Warning: Image provided for non-vision model '{model_id}'. Skipping image." ) processed_content_list.append( { "type": "text", "text": "[Image ignored - model does not support vision]", } ) # If the model supports vision OR if no images were present, add the message if supports_vision or not has_image: processed_messages.append( {"role": role, "content": processed_content_list} ) else: # If model doesn't support vision but images were present, send only text parts text_only_content = "".join( [ item["text"] for item in processed_content_list if item["type"] == "text" ] ) processed_messages.append( {"role": role, "content": text_only_content} ) elif isinstance(content, str): # If content is just a string, pass it directly processed_messages.append({"role": role, "content": content}) # Add system message if present if system_message: processed_messages.insert( 0, {"role": "system", "content": str(system_message["content"])} ) # Construct the payload for Lambda API (OpenAI format) payload = { "model": model_id, # Use the specific model ID for Lambda "messages": processed_messages, "temperature": body.get( "temperature", 0.7 ), # Default temp can be adjusted "max_tokens": body.get("max_tokens", 4096), # Default max_tokens "stream": body.get("stream", False), # Add other OpenAI compatible parameters if needed (e.g., top_p, presence_penalty) "top_p": body.get("top_p", 1.0), # "stop": body.get("stop", None), # Optional stop sequences } # Filter out None values from payload which might cause issues payload = {k: v for k, v in payload.items() if v is not None} headers = { # Use Lambda API Key "Authorization": f"Bearer {self.valves.LAMBDA_API_KEY}", "Content-Type": "application/json", } # Construct the full API URL url = f"{self.base_url}chat/completions" # Handle streaming vs non-streaming response if payload["stream"]: # Use the existing streaming logic, assuming Lambda follows OpenAI SSE format return self._stream_with_ui( url, headers, payload, body, __event_emitter__ ) else: # Use the existing non-streaming logic return self.non_stream_response(url, headers, payload) except ValueError as ve: # Handle specific ValueErrors (like image processing or model not found) error_msg = f"Configuration or Input Error: {str(ve)}" print(error_msg) if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) return error_msg # Return error string except Exception as e: # General error handling error_msg = f"An unexpected error occurred: {str(e)}" import traceback print( f"{error_msg}\n{traceback.format_exc()}" ) # Log detailed traceback for debugging if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) # Return error string return error_msg async def _stream_with_ui( self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None ) -> AsyncIterator[str]: """Handles streaming responses and updates UI via event emitter.""" try: response = requests.post( url, headers=headers, json=payload, stream=True, timeout=self.REQUEST_TIMEOUT, ) response.raise_for_status() # New buffer-based approach for SSE parsing buffer = b"" for chunk in response.iter_content(chunk_size=1024): buffer += chunk while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) line = line.strip() if not line: continue # Handle both SSE and raw JSON formats if line.startswith(b"data: "): line = line[6:].decode("utf-8") else: line = line.decode("utf-8") if line == "[DONE]": break try: data = json.loads(line) if "choices" in data and len(data["choices"]) > 0: chunk = ( data["choices"][0].get("delta", {}).get("content", "") ) if chunk: yield chunk except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") # Signal completion via event emitter if provided if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Stream completed successfully", "done": True, }, } ) except requests.exceptions.RequestException as e: error_msg = f"Stream connection error: {str(e)}" print(error_msg) if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) yield f"Error: {error_msg}" # Yield error message back except Exception as e: import traceback error_msg = f"Unexpected stream error: {str(e)}" print(f"{error_msg}\n{traceback.format_exc()}") if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) yield f"Error: {error_msg}" # Yield error message back except requests.exceptions.RequestException as e: error_msg = f"Request failed: {str(e)} - {response.text if 'response' in locals() else ''}" print(error_msg) yield f"Error: {error_msg}" def non_stream_response(self, url: str, headers: dict, payload: dict) -> str: """Handles non-streaming API requests.""" try: response = requests.post( url, headers=headers, json=payload, timeout=self.REQUEST_TIMEOUT ) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) res = response.json() # Standard OpenAI non-stream response structure if "choices" in res and len(res["choices"]) > 0: message = res["choices"][0].get("message", {}) content = message.get("content") if content: return content else: # Handle cases where content might be missing or null print( f"Warning: No content found in non-stream response choice: {res['choices'][0]}" ) return "[No content received]" else: # Handle unexpected response structure print(f"Warning: Unexpected non-stream response structure: {res}") return f"[Error: Unexpected response format - {str(res)[:100]}]" # Return partial error info except requests.exceptions.RequestException as e: error_msg = f"Non-stream request failed: {str(e)}" print(error_msg) return f"Error: {error_msg}" except Exception as e: import traceback error_msg = f"Unexpected error during non-stream request: {str(e)}" print(f"{error_msg}\n{traceback.format_exc()}") return f"Error: {error_msg}"