Whitepaper
Docs
Sign In
Function
Function
pipe
v0.0.1
llama-vision-pipe-test
Function ID
llama_vision_pipe_test
Creator
@nigelc
Downloads
73+
Llama Vision Pipe - Ollama Integration
Get
README
No README available
Function Code
Show
""" title: Llama Vision Pipe - Ollama Integration authors: [Nigel] funding_url: https://github.com/open-webui version: 0.0.1 required_open_webui_version: 0.5.0 license: MIT User: [Text + Image] System: 1. Llama3.2-Vision reads and generates image descriptions 2. Combines image descriptions with user text 3. Sends combined content to specified Ollama model 4. Ollama model responds back """ import os import json import time import logging import requests import aiohttp import re from typing import List, Union, Generator, Iterator, Dict, Optional, AsyncIterator, Tuple from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class CacheEntry: def __init__(self, description: str): self.description = description self.timestamp = time.time() class Pipe: SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024 # 100MB total REQUEST_TIMEOUT = (3.05, 60) CACHE_EXPIRATION = 30 * 60 # 30 minutes in seconds class Valves(BaseModel): OLLAMA_BASE_URL: str = Field( default=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"), description="Your Ollama Base URL" ) VISION_MODEL: str = Field( default=os.getenv("VISION_MODEL", "llama2-vision"), description="Vision model for image processing" ) THINK_XML_TAG: str = Field( default=os.getenv("THINK_XML_TAG", "thinking"), description="XML tag used for thinking content" ) def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "llama-vision" self.name = "llama-vision/" self.valves = self.Valves() self.request_id = None self.image_cache = {} @staticmethod def get_model_id(model_name: str) -> str: """Extract just the base model name from any format""" return model_name.replace(".", "/").split("/")[-1] async def get_ollama_models(self) -> List[Dict[str, str]]: """Fetch available models from Ollama API""" try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.valves.OLLAMA_BASE_URL}/api/tags", timeout=10 ) as response: if response.status == 200: models_data = await response.json() return [ {"id": model["name"], "name": model["name"]} for model in models_data.get("models", []) ] return [] except Exception as e: logging.error(f"Error getting models: {e}") return [] def pipes(self) -> List[dict]: return self.get_ollama_models() def clean_expired_cache(self): """Remove expired entries from cache""" current_time = time.time() expired_keys = [ key for key, entry in self.image_cache.items() if current_time - entry.timestamp > self.CACHE_EXPIRATION ] for key in expired_keys: del self.image_cache[key] def extract_images_and_text(self, message: Dict) -> Tuple[List[Dict], str]: """Extract images and text from a message.""" images = [] text_parts = [] content = message.get("content", "") if isinstance(content, list): for item in content: if item.get("type") == "text": text_parts.append(item.get("text", "")) elif item.get("type") == "image_url": images.append(item) else: text_parts.append(content) return images, " ".join(text_parts) async def process_image_with_llama_vision( self, image_data: Dict, __event_emitter__=None ) -> str: """Process a single image with Llama Vision and return its description.""" try: # Clean expired cache entries self.clean_expired_cache() # Create cache key image_url = image_data.get("image_url", {}).get("url", "") image_key = image_url if image_url.startswith("data:image"): image_key = image_url.split(",", 1)[1] if "," in image_url else "" # Check cache if image_key in self.image_cache: logging.info(f"Using cached image description for {image_key[:30]}...") return self.image_cache[image_key].description if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Processing new image...", "done": False, }, } ) # Prepare the request for Ollama vision model prompt = "Give a clear and detailed description of this image." if image_url.startswith("data:image"): image_data = image_url.split(",", 1)[1] if "," in image_url else "" vision_request = { "model": self.valves.VISION_MODEL, "prompt": prompt, "images": [image_data] } else: # For URLs, we need to download the image first async with aiohttp.ClientSession() as session: async with session.get(image_url) as response: if response.status == 200: image_bytes = await response.read() import base64 image_base64 = base64.b64encode(image_bytes).decode() vision_request = { "model": self.valves.VISION_MODEL, "prompt": prompt, "images": [image_base64] } else: raise ValueError(f"Failed to fetch image: {response.status}") # Make request to Ollama vision model async with aiohttp.ClientSession() as session: async with session.post( f"{self.valves.OLLAMA_BASE_URL}/api/generate", json=vision_request ) as response: if response.status == 200: result = await response.json() description = result.get("response", "") # Cache the result self.image_cache[image_key] = CacheEntry(description) # Limit cache size (keep 100 most recent) if len(self.image_cache) > 100: oldest_key = min( self.image_cache.keys(), key=lambda k: self.image_cache[k].timestamp ) del self.image_cache[oldest_key] return description else: raise ValueError(f"Vision model request failed: {response.status}") except Exception as e: logging.error(f"Error processing image: {e}") return f"Error processing image: {str(e)}" async def process( self, messages: List[Dict], model: str, stream: bool = True, **kwargs ) -> Union[AsyncIterator[str], str]: """Process messages with images through Llama Vision and Ollama""" try: # Extract the last message for processing last_message = messages[-1] images, text = self.extract_images_and_text(last_message) # Process images if present image_descriptions = [] for image in images: description = await self.process_image_with_llama_vision(image) if description: image_descriptions.append(description) # Combine image descriptions with text combined_prompt = text if image_descriptions: combined_prompt = f"Image Description(s):\n{chr(10).join(image_descriptions)}\n\nUser Message:\n{text}" # Prepare messages for Ollama ollama_messages = messages[:-1] ollama_messages.append({"role": "user", "content": combined_prompt}) # Make request to Ollama model request_data = { "model": model, "messages": ollama_messages, "stream": stream } async with aiohttp.ClientSession() as session: async with session.post( f"{self.valves.OLLAMA_BASE_URL}/api/chat", json=request_data ) as response: if stream: async for line in response.content: if line: try: data = json.loads(line) if "message" in data: yield data["message"]["content"] except json.JSONDecodeError: continue else: result = await response.json() return result.get("message", {}).get("content", "") except Exception as e: error_msg = f"Error in processing: {str(e)}" logging.error(error_msg) if stream: yield error_msg else: return error_msg "