Whitepaper
Docs
Sign In
Function
Function
filter
v0.3.0
Chat Metrics Advanced with Energy estimate
Function ID
chat_metrics_advanced_with_energy_estimate
Creator
@jevalideca
Downloads
44+
Calculate token usage, cost and energy estimate based on cost
Get
README
No README available
Function Code
Show
""" title: Chat Metrics Advanced with Energy estimate original_author: constLiakos fork_author: @iamg30 fork_author: @jevalideca funding_url: https://github.com/open-webui version: 0.3.0 license: MIT changelog: - 0.0.1 - Initial upload to openwebui community. - 0.0.2 - format, remove unnecessary code - 0.0.3 - add advanced stats: tokens & elapsed time - 0.0.4 - each metric has its enable switch Valve, experimental metrics are disabled by default - 0.1.0 - Major update: Fixed token counting, improved model detection, added token cost estimation, better error handling, and UI improvements - 0.2.0 - Added configurable default model, improved model detection - 0.3.0 - Added Energy estimate """ from pydantic import BaseModel, Field from typing import Optional, Callable, Any, Awaitable, Dict, List, Union, Literal import tiktoken import time import json import re from open_webui.utils.misc import get_last_assistant_message # Token pricing for different models. Token prices will need updated from time to time, and new models may need to be added and old models removed. # Different API providers use different units for their token pricing, and users need to be able to specify the appropriate pricing unit for each model. # Keys are model names used for matching MODEL_PRICING = { # OpenAI models "gpt-o1": {"input": 15.00 / 1000000, "output": 60.00 / 1000000}, "gpt-o1-mini": {"input": 1.10 / 1000000, "output": 4.40 / 1000000}, "gpt-o3-mini": {"input": 1.10 / 1000000, "output": 4.40 / 1000000}, "gpt-4.5": {"input": 75.00 / 1000000, "output": 150.00 / 1000000}, "gpt-4o": {"input": 2.50 / 1000000, "output": 10.00 / 1000000}, "gpt-4o-realtime-preview": {"input": 5.00 / 1000000, "output": 20.00 / 1000000}, "gpt-4o-mini": {"input": 0.15 / 1000000, "output": 0.60 / 1000000}, "gpt-4o-mini-realtime-preview": {"input": 0.60 / 1000000, "output": 2.40 / 1000000}, "gpt-4": {"input": 30.00 / 1000000, "output": 60.00 / 1000000}, "gpt-4-32k": {"input": 60.00 / 1000000, "output": 120.00 / 1000000}, "gpt-4-turbo": {"input": 10.00 / 1000000, "output": 30.00 / 1000000}, "gpt-3.5-turbo": {"input": 0.50 / 1000000, "output": 1.50 / 1000000}, "gpt-3.5-turbo-instruct": {"input": 1.50 / 1000000, "output": 2.00 / 1000000}, "gpt-3.5-turbo-16k-0613": {"input": 3.00 / 1000000, "output": 4.00 / 1000000}, # Anthropic models "claude-3-7-sonnet-latest": {"input": 3.00 / 1000000, "output": 15.00 / 1000000}, "claude-3-5-haiku-latest": {"input": 0.80 / 1000000, "output": 4.00 / 1000000}, "claude-3-5-sonnet-latest": {"input": 3.00 / 1000000, "output": 15.00 / 1000000}, "claude-3-5-sonnet-20240620": {"input": 3.00 / 1000000, "output": 15.00 / 1000000}, "claude-3-opus-latest": {"input": 15.00 / 1000000, "output": 75.00 / 1000000}, "claude-3-sonnet-20240229": {"input": 3.00 / 1000000, "output": 15.00 / 1000000}, "claude-3-haiku-20240307": {"input": 0.25 / 1000000, "output": 1.25 / 1000000}, # GroqCloud models "DeepSeek R1 Distill Llama 70B": { "input": 0.75 / 1000000, "output": 0.99 / 1000000, }, "DeepSeek R1 Distill Qwen 32B 128k": { "input": 0.69 / 1000000, "output": 0.69 / 1000000, }, "Qwen 2.5 32B Instruct 128k": {"input": 0.79 / 1000000, "output": 0.79 / 1000000}, "Qwen 2.5 Coder 32B Instruct 128k": { "input": 0.79 / 1000000, "output": 0.79 / 1000000, }, "Mistral Saba 24B": {"input": 0.79 / 1000000, "output": 0.79 / 1000000}, "Llama 3.2 1B (Preview) 8k": {"input": 0.04 / 1000000, "output": 0.04 / 1000000}, "Llama 3.2 3B (Preview) 8k": {"input": 0.06 / 1000000, "output": 0.06 / 1000000}, "Llama 3.3 70B Versatile 128k": {"input": 0.59 / 1000000, "output": 0.79 / 1000000}, "Llama 3.1 8B Instant 128k": {"input": 0.05 / 1000000, "output": 0.08 / 1000000}, "Llama 3 70B 8k": {"input": 0.59 / 1000000, "output": 0.79 / 1000000}, "Llama 3 8B 8k": {"input": 0.05 / 1000000, "output": 0.08 / 1000000}, "Mixtral 8x7B Instruct 32k": {"input": 0.24 / 1000000, "output": 0.24 / 1000000}, "Gemma 2 9B 8k": {"input": 0.20 / 1000000, "output": 0.20 / 1000000}, "Llama Guard 3 8B 8k": {"input": 0.20 / 1000000, "output": 0.20 / 1000000}, "Llama 3.3 70B SpecDec 8k": {"input": 0.59 / 1000000, "output": 0.99 / 1000000}, # DeepSeek models "deepseek-reasoner": {"input": 0.55 / 1000000, "output": 2.19 / 1000000}, "deepseek-chat": {"input": 0.27 / 1000000, "output": 1.10 / 1000000}, # Perplexity models "sonar": {"input": 1.00 / 1000000, "output": 1.00 / 1000000}, "sonar-pro": {"input": 3.00 / 1000000, "output": 15.00 / 1000000}, "sonar-reasoning": {"input": 1.00 / 1000000, "output": 5.00 / 1000000}, "sonar-reasoning-pro": {"input": 2.00 / 1000000, "output": 8.00 / 1000000}, # Add more below as fit "test-model-pricing": {"input": 100.0 / 1000000, "output": 100.0 / 1000000}, } def get_encoding_for_model(model_name: str) -> tiktoken.Encoding: """Get the appropriate encoding for the specified model.""" try: # Handle different naming conventions model_name = model_name.lower() if "gpt-4o" in model_name or "gpt-4o-mini" in model_name: return tiktoken.get_encoding("o200k_base") elif ( "gpt-4-turbo" in model_name or "gpt-4" in model_name or "gpt-3.5-turbo" in model_name or "codex" not in model_name and "gpt-3" in model_name ): return tiktoken.get_encoding("cl100k_base") elif ( "codex" in model_name or "gpt-3" not in model_name and "text-davinci" in model_name ): return tiktoken.get_encoding("p50k_base") elif "gpt-3" in model_name and "gpt-3.5-turbo" not in model_name: return tiktoken.get_encoding("r50k_base") else: # Fallback to cl100k_base encoding if specific encoding not found return tiktoken.get_encoding("cl100k_base") except Exception: # Fallback to cl100k_base encoding if specific encoding not found return tiktoken.get_encoding("cl100k_base") def num_tokens_from_string(text: str, model_name: str) -> int: """Count the number of tokens in a string.""" if not text: return 0 try: encoding = get_encoding_for_model(model_name) return len(encoding.encode(text)) except Exception as e: print(f"Error counting tokens: {str(e)}") # Fallback estimation: ~4 characters per token return len(text) // 4 def find_matching_model(model_name: str, default_model: str) -> str: """Find the closest matching model in the pricing dictionary.""" if not model_name: return default_model model_name_lower = model_name.lower() # Try exact match first for known_model in MODEL_PRICING: if known_model.lower() == model_name_lower: return known_model # Try substring match for known_model in MODEL_PRICING: if ( known_model.lower() in model_name_lower or model_name_lower in known_model.lower() ): return known_model # Try fuzzy matching with model family names model_families = ["gpt", "claude", "llama", "mistral", "qwen", "gemma", "mixtral"] for family in model_families: if family in model_name_lower: for known_model in MODEL_PRICING: if family in known_model.lower(): return known_model # Return default if no match found return default_model def estimate_cost( input_tokens: int, output_tokens: int, model_name: str, default_model: str ) -> float: """Estimate the cost of the API call based on tokens used.""" # Find the closest matching model matching_model = find_matching_model(model_name, default_model) pricing = MODEL_PRICING.get(matching_model, MODEL_PRICING[default_model]) input_cost = input_tokens * pricing["input"] output_cost = output_tokens * pricing["output"] return input_cost + output_cost class Filter: class Valves(BaseModel): priority: int = Field( default=5, description="Priority level for the filter operations." ) default_model: str = Field( default="Llama 3.3 70B SpecDec 8k", description="Default model to use for token counting and cost estimation when model detection fails", ) elapsed_time: bool = Field( default=True, description="Show elapsed time for response generation", ) tokens_no: bool = Field( default=True, description="Display token counts for input and output", ) tokens_per_sec: bool = Field( default=True, description="Display Tokens per Second generation rate", ) cost_estimate: bool = Field( default=True, description="Show estimated cost of the API call", ) energy_estimate: bool = Field( default=True, description="Estimate energy used based of 50% of cost", ) energy_cost: int = Field( default=0.15, description="Cost of 1 kilowattheure of energy in dollar", ) session_stats: bool = Field( default=True, description="Track total tokens and cost for the session", ) detailed_view: bool = Field( default=False, description="Show detailed breakdown instead of compact view", ) def __init__(self): self.session_energy_estimate = 0 self.valves = self.Valves() self.start_time = None self.session_tokens = {"input": 0, "output": 0} self.session_cost = 0.0 self.request_count = 0 def update_default_model(self, model_name: str) -> bool: """Update the default model used for token counting and cost estimation.""" try: if model_name in MODEL_PRICING: self.valves.default_model = model_name return True else: print( f"Warning: Model '{model_name}' not found in pricing list. Default model not updated." ) return False except Exception as e: print(f"Error updating default model: {str(e)}") return False def inlet(self, body: dict): """Process incoming request before sending to the model.""" self.start_time = time.time() # Count tokens in the user's message if self.valves.tokens_no or self.valves.session_stats: try: # Extract the latest user message user_messages = [ msg for msg in body.get("messages", []) if msg.get("role") == "user" ] if user_messages: latest_user_message = user_messages[-1].get("content", "") model_name = body.get("model", self.valves.default_model) input_tokens = num_tokens_from_string( latest_user_message, model_name ) self.session_tokens["input"] += input_tokens except Exception as e: print(f"Error processing inlet: {str(e)}") return body async def outlet( self, body: dict, __event_emitter__: Callable[[Any], Awaitable[None]], model: Optional[dict] = None, ) -> dict: """Process outgoing response before returning to the user.""" try: # Calculate elapsed time end_time = time.time() elapsed_time = end_time - self.start_time # Get model information model_info = model or {} model_name = model_info.get("id", self.valves.default_model) # Get response message response_message = get_last_assistant_message(body.get("messages", [])) # Count tokens output_tokens = num_tokens_from_string(response_message, model_name) self.session_tokens["output"] += output_tokens # Calculate tokens per second tokens_per_sec = output_tokens / elapsed_time if elapsed_time > 0 else 0 # Calculate cost if self.valves.cost_estimate or self.valves.session_stats: # Use only output tokens for cost calculation here current_cost = estimate_cost( 0, output_tokens, model_name, self.valves.default_model ) self.session_cost += current_cost # Calculate Energy if self.valves.energy_estimate: energy_estimate = self.session_cost * 0.50 / self.valves.energy_cost self.session_energy_estimate += energy_estimate # Increment request counter self.request_count += 1 # Prepare statistics array stats_array = [] if self.valves.detailed_view: # Detailed view with labels if self.valves.elapsed_time: stats_array.append(f"Time: {elapsed_time:.2f}s") if self.valves.tokens_no: stats_array.append(f"Tokens: {output_tokens}") if self.valves.tokens_per_sec: stats_array.append(f"Speed: {tokens_per_sec:.1f} t/s") if self.valves.cost_estimate: stats_array.append(f"Cost: ${current_cost:.6f}") if self.valves.energy_estimate: stats_array.append(f"Energy used: ${energy_estimate:.3f} kWh") if self.valves.session_stats: total_tokens = ( self.session_tokens["input"] + self.session_tokens["output"] ) stats_array.append( f"Session: {total_tokens} tokens (${self.session_cost:.6f}, {self.session_energy_estimate:.3f} kWh)" ) else: # Compact view if self.valves.elapsed_time: stats_array.append(f"{elapsed_time:.2f}s") if self.valves.tokens_no: stats_array.append(f"{output_tokens}t") if self.valves.tokens_per_sec: stats_array.append(f"{tokens_per_sec:.1f}t/s") if self.valves.cost_estimate: stats_array.append(f"${current_cost:.6f}") if self.valves.energy_estimate: stats_array.append(f"Energy used: ${energy_estimate:.6f} kWh") if self.valves.session_stats: total_tokens = ( self.session_tokens["input"] + self.session_tokens["output"] ) stats_array.append( f"Total: {total_tokens}t (${self.session_cost:.6f}, {self.session_energy_estimate:.3f} kWh) " ) # Join stats with separator separator = " | " if self.valves.detailed_view else " · " stats = separator.join(stat for stat in stats_array) # Send statistics through event emitter await __event_emitter__( { "type": "status", "data": { "description": stats, }, } ) except Exception as e: # Handle errors gracefully print(f"Error in Chat Metrics filter: {str(e)}") await __event_emitter__( { "type": "status", "data": { "description": "Chat Metrics: Error calculating stats", }, } ) return body