Whitepaper
Docs
Sign In
Tool
Tool
v1.0
Web3
Tool ID
web3
Creator
@naoufel
Downloads
158+
A tool for interacting with Ethereum or EVM networks using Web3.py and Etherscan API
Get
README
No README available
Tool Code
Show
""" title: Web3 Tool author: naoufel tiwtter: @naouufel github: naouflex version: 1.0 license: MIT requirements: web3 """ import os import json import requests import time from web3 import Web3 from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field from concurrent.futures import ThreadPoolExecutor import concurrent.futures import logging from datetime import datetime class EventEmitter: def __init__(self, emitter=None): self.emitter = emitter async def emit( self, description="Unknown State", status="in_progress", done=False, step_number=None, source=None, ): if self.emitter: message = { "type": "status", "data": { "status": status, "description": description, "done": done, }, } if step_number: message["data"]["step"] = step_number await self.emitter(message) # Send source if provided if source: await self.emitter({"type": "source", "data": source}) class Tools: class Valves(BaseModel): """ Configuration values for Web3 and Etherscan interactions. """ # Network configurations WEB3_RPC: str = Field(default=os.getenv("WEB3_RPC", "https://eth.llamarpc.com")) # Etherscan configuration ETHERSCAN_API_BASE: str = Field(default="https://api.etherscan.io/api") ETHERSCAN_API_KEY: str = Field(default=os.getenv("ETHERSCAN_API_KEY", "")) # Error messages ERROR_MESSAGES: dict = Field( default={ "invalid_address": "Invalid Ethereum address provided", "contract_not_found": "Contract ABI not found", "rpc_error": "Error connecting to Ethereum node", "api_error": "Error accessing Etherscan API", } ) # Add new configuration options CHUNK_SIZE: int = Field(default=10000) MAX_RETRIES: int = Field(default=5) INITIAL_BACKOFF: float = Field(default=0.1) MAX_WORKERS: int = Field(default=4) # Add state reading specific configurations STATE_BATCH_SIZE: int = Field(default=100) STATE_MAX_RETRIES: int = Field(default=3) STATE_BACKOFF: float = Field(default=0.5) STATE_MAX_WORKERS: int = Field(default=4) # Add block-related configurations MAX_BLOCK_RANGE: int = Field(default=1_000_000) # ~1 week of blocks MAX_BLOCK_AGE_DAYS: int = Field(default=365) # ~1 year max age BLOCKS_PER_DAY: int = Field( default=7200 ) # Approximate blocks per day on Ethereum def __init__(self): self.valves = self.Valves() self.w3 = Web3(Web3.HTTPProvider(self.valves.WEB3_RPC)) # Initialize logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def fetch_logs_chunk( self, contract: Any, event_filter: Dict, start_block: int, end_block: int, __event_emitter__=None, ) -> List[Dict]: """ Fetches a chunk of logs with retry logic and error handling. """ emitter = EventEmitter(__event_emitter__) attempt = 0 backoff = self.valves.INITIAL_BACKOFF while attempt < self.valves.MAX_RETRIES: try: time.sleep(backoff) logs = self.w3.eth.get_logs( { "fromBlock": start_block, "toBlock": end_block, "address": contract.address, "topics": ( event_filter["topics"] if "topics" in event_filter else None ), } ) return logs except Exception as e: attempt += 1 backoff *= 2 error_msg = f"Attempt {attempt}/{self.valves.MAX_RETRIES} failed fetching logs for blocks {start_block}-{end_block}: {str(e)}" emitter.emit(error_msg) if attempt == self.valves.MAX_RETRIES: self.logger.error(f"{error_msg}. Maximum retry attempts reached.") return [] self.logger.warning( f"{error_msg}. Retrying in {backoff:.2f} seconds..." ) return [] async def get_contract_abi( self, contract_address: str, __event_emitter__=None, ) -> str: """ Fetches contract ABI from Etherscan. """ emitter = EventEmitter(__event_emitter__) await emitter.emit(f"Fetching ABI for contract {contract_address}...") if not Web3.is_address(contract_address): await emitter.emit(self.valves.ERROR_MESSAGES["invalid_address"], done=True) return self.valves.ERROR_MESSAGES["invalid_address"] try: params = { "module": "contract", "action": "getabi", "address": contract_address, "apikey": self.valves.ETHERSCAN_API_KEY, } response = requests.get(self.valves.ETHERSCAN_API_BASE, params=params) response.raise_for_status() result = response.json() if result["status"] == "1" and result["message"] == "OK": await emitter.emit( "Contract ABI retrieved successfully", done=True, source={ "document": [ json.dumps( { "contract_address": contract_address, "abi": result["result"], "call": f"etherscan.api.getabi(address={contract_address})", } ) ], "metadata": [ { "source": "Etherscan API", "date_accessed": datetime.now().isoformat(), "name": f"Contract ABI for {contract_address}", } ], "source": { "name": "Etherscan", "url": f"https://etherscan.io/address/{contract_address}#code", "type": "contract_abi", }, }, ) return result["result"] else: await emitter.emit( self.valves.ERROR_MESSAGES["contract_not_found"], done=True ) return self.valves.ERROR_MESSAGES["contract_not_found"] except requests.RequestException as e: error_msg = f"Error fetching ABI: {str(e)}" await emitter.emit(error_msg, done=True) return error_msg async def call_contract_function( self, contract_address: str, function_name: str, blocks: List[int] = None, function_args: List[Any] = None, index: Optional[int] = None, sub_index: Optional[int] = None, __event_emitter__=None, ) -> str: """ Enhanced version of contract function calling with parallel processing and better error handling. """ emitter = EventEmitter(__event_emitter__) await emitter.emit(f"Calling {function_name} on contract {contract_address}...") try: # Get contract ABI and create contract instance contract_abi = await self.get_contract_abi(contract_address) if isinstance(contract_abi, str) and contract_abi.startswith("Error"): return contract_abi contract = self.w3.eth.contract( address=Web3.to_checksum_address(contract_address), abi=json.loads(contract_abi), ) # Set default block if not provided if not blocks: blocks = [self.w3.eth.block_number] # Process state calls in parallel batches results = [] with ThreadPoolExecutor( max_workers=self.valves.STATE_MAX_WORKERS ) as executor: futures = [] for block_batch in self._chunk_blocks( blocks, self.valves.STATE_BATCH_SIZE ): futures.append( executor.submit( self._execute_state_batch, contract, function_name, block_batch, function_args, index, sub_index, ) ) for future in concurrent.futures.as_completed(futures): batch_results = future.result() results.extend(batch_results) await emitter.emit( "State calls completed successfully", done=True, source={ "document": [ json.dumps( { "result": result, "call": f"{function_name}({', '.join(map(str, function_args or []))})", "block": result["block"], "contract_address": contract_address, } ) for result in results ], "metadata": [ { "source": f"Contract {contract_address}", "date_accessed": datetime.now().isoformat(), "name": f"{function_name}() at block {result['block']}", } for result in results ], "source": { "name": f"Contract Function: {function_name}", "url": f"https://etherscan.io/address/{contract_address}#readContract", "type": "contract_state", }, }, ) return json.dumps({"results": results}, indent=2) except Exception as e: error_msg = f"Error calling contract function: {str(e)}" self.logger.error(error_msg) await emitter.emit(error_msg, done=True) return error_msg def _chunk_blocks(self, blocks: List[int], chunk_size: int) -> List[List[int]]: """Helper function to split blocks into chunks for parallel processing.""" return [blocks[i : i + chunk_size] for i in range(0, len(blocks), chunk_size)] def _execute_state_batch( self, contract: Any, function_name: str, blocks: List[int], function_args: List[Any] = None, index: Optional[int] = None, sub_index: Optional[int] = None, ) -> List[Dict]: """ Execute a batch of state calls with retry logic. """ results = [] for block in blocks: attempt = 0 backoff = self.valves.STATE_BACKOFF while attempt < self.valves.STATE_MAX_RETRIES: try: time.sleep(backoff) # Get function and execute call function = getattr(contract.functions, function_name) args = function_args or [] value = function(*args).call(block_identifier=block) # Handle indexed results if index is not None: value = value[index] if sub_index is not None: value = value[sub_index] # Get block data block_data = self.w3.eth.get_block(block) result = { "block": block, "block_time": block_data["timestamp"], "contract_address": contract.address, "function_name": function_name, "args": args, "value": self._format_value(value), } results.append(result) break except Exception as e: attempt += 1 backoff *= 2 error_msg = f"Attempt {attempt}/{self.valves.STATE_MAX_RETRIES} failed for block {block}: {str(e)}" if attempt == self.valves.STATE_MAX_RETRIES: self.logger.error( f"{error_msg}. Maximum retry attempts reached." ) results.append( { "block": block, "error": str(e), "contract_address": contract.address, "function_name": function_name, } ) else: self.logger.warning( f"{error_msg}. Retrying in {backoff:.2f} seconds..." ) return results def _format_value(self, value: Any) -> str: """Helper function to format return values consistently.""" if isinstance(value, (bytes, bytearray)): return "0x" + value.hex() elif isinstance(value, (list, tuple)): return [self._format_value(v) for v in value] elif isinstance(value, dict): return {k: self._format_value(v) for k, v in value.items()} elif isinstance(value, (int, float)): return str(value) return str(value) async def get_contract_events( self, contract_address: str, event_name: str, from_block: int, to_block: Optional[int] = None, arg_filters: Dict = None, __event_emitter__=None, ) -> str: """ Enhanced version of get_contract_events with parallel processing and better error handling. """ emitter = EventEmitter(__event_emitter__) await emitter.emit( f"Fetching {event_name} events from contract {contract_address}..." ) try: # Validate block range current_block = self.w3.eth.block_number to_block = to_block or current_block # Check if blocks are too old max_age_in_blocks = ( self.valves.MAX_BLOCK_AGE_DAYS * self.valves.BLOCKS_PER_DAY ) if current_block - from_block > max_age_in_blocks: error_msg = f"From block is too old. Maximum age is {self.valves.MAX_BLOCK_AGE_DAYS} days" await emitter.emit(error_msg, done=True) return error_msg # Check block range size if to_block - from_block > self.valves.MAX_BLOCK_RANGE: error_msg = f"Block range too large. Maximum range is {self.valves.MAX_BLOCK_RANGE} blocks" await emitter.emit(error_msg, done=True) return error_msg # Get contract ABI and create contract instance contract_abi = await self.get_contract_abi(contract_address) if isinstance(contract_abi, str) and contract_abi.startswith("Error"): return contract_abi contract = self.w3.eth.contract( address=Web3.to_checksum_address(contract_address), abi=json.loads(contract_abi), ) # Get event signature event_abi = next( ( e for e in contract.abi if e["type"] == "event" and e["name"] == event_name ), None, ) if not event_abi: error_msg = f"Event {event_name} not found in contract ABI" await emitter.emit(error_msg, done=True) return error_msg # Create event filter event_signature = ( f"{event_name}({','.join(i['type'] for i in event_abi['inputs'])})" ) event_topic = self.w3.keccak(text=event_signature).hex() # Process blocks in parallel chunks all_events = [] with ThreadPoolExecutor(max_workers=self.valves.MAX_WORKERS) as executor: futures = [] for chunk_start in range(from_block, to_block, self.valves.CHUNK_SIZE): chunk_end = min(chunk_start + self.valves.CHUNK_SIZE, to_block) futures.append( executor.submit( self.fetch_logs_chunk, contract, {"topics": [event_topic]}, chunk_start, chunk_end, ) ) for future in concurrent.futures.as_completed(futures): chunk_logs = future.result() processed_events = self.process_events( contract, event_name, chunk_logs, arg_filters ) all_events.extend(processed_events) await emitter.emit( "Events retrieved successfully", done=True, source={ "document": [ json.dumps( { "event": event, "call": f"{event_name}.filter(fromBlock={from_block}, toBlock={to_block})", "contract_address": contract_address, "filters": arg_filters, } ) for event in all_events ], "metadata": [ { "source": f"Contract {contract_address}", "date_accessed": datetime.now().isoformat(), "name": f"{event_name} at block {event['block_number']}", } for event in all_events ], "source": { "name": f"Contract Event: {event_name}", "url": f"https://etherscan.io/address/{contract_address}#events", "type": "contract_events", }, }, ) return json.dumps({"events": all_events}, indent=2) except Exception as e: error_msg = f"Error fetching events: {str(e)}" self.logger.error(error_msg) await emitter.emit(error_msg, done=True) return error_msg def process_events( self, contract: Any, event_name: str, logs: List[Dict], arg_filters: Dict = None ) -> List[Dict]: """ Process raw logs into formatted events with filtering. """ processed_events = [] for log in logs: try: event_data = getattr(contract.events, event_name)().processLog(log) # Apply filters if provided if arg_filters: if not all( event_data["args"].get(key) == value for key, value in arg_filters.items() ): continue processed_event = { "event": event_name, "block_number": log["blockNumber"], "transaction_hash": log["transactionHash"].hex(), "log_index": log["logIndex"], "args": { key: str(value) for key, value in event_data["args"].items() }, } processed_events.append(processed_event) except Exception as e: self.logger.warning( f"Error processing log {log.get('transactionHash', 'unknown').hex()}: {str(e)}" ) continue return processed_events async def get_current_block(self, __event_emitter__=None) -> str: """ Get the current block number from the Ethereum network. """ emitter = EventEmitter(__event_emitter__) await emitter.emit("Fetching current block number...") try: block_number = self.w3.eth.block_number await emitter.emit( "Current block number retrieved successfully", done=True, source={ "document": [ json.dumps( { "current_block": str(block_number), "call": "self.w3.eth.block_number()", } ) ], "metadata": [ { "source": "Ethereum Network", "date_accessed": datetime.now().isoformat(), "name": f"Block #{block_number}", } ], "source": { "name": "Current Block Number", "url": "https://etherscan.io/blocks", "type": "block_number", }, }, ) return json.dumps({"current_block": str(block_number)}) except Exception as e: error_msg = f"Error fetching current block: {str(e)}" self.logger.error(error_msg) await emitter.emit(error_msg, done=True) return error_msg