import json import logging import math import os import random import sys import time import asyncio from json import JSONDecodeError from logging.handlers import TimedRotatingFileHandler from statistics import median, mean, mode import requests from dotenv import load_dotenv from requests import RequestException from web3 import Web3 from web3.exceptions import BlockNotFound, Web3Exception, Web3ValidationError from web3_multi_provider import MultiProvider load_dotenv() for package in ('web3', 'web3_multi_provider', 'urllib3',): logging.getLogger(package).setLevel(logging.ERROR) web3 = Web3(MultiProvider(json.load(open('./data/rpc_servers.json')))) gas_multiplier = float(os.getenv('GAS_MULTIPLIER')) rapid_gas_fee_limit = int(os.getenv('GAS_FEE_RAPID_LIMIT')) gas_cache_seconds = int(os.getenv('GAS_CACHE_SECONDS')) wallet_a_address = os.getenv('WALLET_A_ADDRESS') wallet_b_address = os.getenv('WALLET_B_ADDRESS') wallet_c_address = os.getenv('WALLET_C_ADDRESS') def apply_estimated_gas(tx, attempts=18): while attempts > 0: try: if 'gas' not in tx: tx['gas'] = web3.eth.estimate_gas(tx) except Exception as e: logging.debug(e) attempts -= 1 time.sleep(1) else: return tx def apply_gas_multiplier(tx, multiplier=None): if not multiplier: multiplier = os.getenv('GAS_MULTIPLIER') try: multiplier = float(multiplier) except ValueError: raise ValueError("Invalid float for GAS_MULTIPLIER") else: tx['gas'] = int(tx['gas'] * multiplier) if 'maxFeePerGas' in tx: tx['maxFeePerGas'] = int(tx['maxFeePerGas'] * multiplier) return tx def apply_median_gas_strategy(tx, tx_amount=100): median_gas_price = get_average_gas_prices('median', tx_amount)['gas_price'] tx['maxFeePerGas'] = int(web3.to_wei(median_gas_price, 'wei')) tx['maxPriorityFeePerGas'] = web3.to_wei(500, 'gwei') return tx def approve_token_spending(account, token_address, spender_address, amount, attempts=18): token_contract = load_contract(token_address) token_info = get_token_info(token_address) token_amount = to_token_decimals(amount, token_info['decimals']) if token_contract.functions.allowance(account.address, spender_address).call() < token_amount: try: tx = token_contract.functions.approve(spender_address, token_amount).build_transaction({ 'nonce': get_nonce(account.address), 'from': account.address }) return broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Failed to approve {} ({})".format(error, token_info['name'], token_info['symbol'])) return False def broadcast_transaction(account, tx, auto_gas=True, attempts=18): tx_hash = None tx['chainId'] = 369 if not auto_gas: tx = apply_estimated_gas(tx) tx = apply_median_gas_strategy(tx) tx = apply_gas_multiplier(tx) logging.debug("Broadcasting TX: {}".format(tx)) _attempts = attempts while _attempts > 0: try: signed_tx = web3.eth.account.sign_transaction(tx, private_key=account.key) tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction) except Exception as e: logging.debug(e) if "insufficient funds" in str(e): logging.error("Not enough gas for this TX: {}".format(tx)) return False elif "nonce too low" in str(e): tx['nonce'] = get_nonce(account.address) continue elif "could not replace existing tx" in str(e): tx['gas'] = int(tx['gas'] * 1.0369) if 'maxFeePerGas' in tx: tx['maxFeePerGas'] = int(tx['maxFeePerGas'] * 1.0369) if 'maxPriorityFeePerGas' in tx: tx['maxPriorityFeePerGas'] = int(tx['maxPriorityFeePerGas'] * 1.0369) continue elif "already known" in str(e): pass else: _attempts -= 1 time.sleep(1) if not tx_hash: _attempts -= 1 time.sleep(1) if _attempts != 0: logging.debug("Rebroadcasting TX ... {}".format(attempts - _attempts)) continue else: try: tx_receipt = web3.eth.wait_for_transaction_receipt(tx_hash, timeout=10) except Exception as e: logging.debug(e) _attempts -= 1 time.sleep(1) if _attempts != 0: logging.debug("Rebroadcasting TX ... {}".format(attempts - _attempts)) else: logging.debug("Confirmed TX: {}".format(tx_receipt)) return tx_receipt return False def convert_tokens(account, token0_address, token1_address, output_amount, attempts=18): # check if conversion route exists routes_functions = json.load(open('./data/routes.json')) if token0_address not in routes_functions[token1_address]['functions'].keys(): raise Exception("Route not available for {} to {}".format(token0_address, token1_address)) # get the cost required to convert tokens cost = routes_functions[token1_address]['costs'][token0_address] tokens_required = cost * output_amount if (tokens_balance := get_token_balance(token0_address, account.address)) < tokens_required: logging.error("Need {} more tokens".format(tokens_required - tokens_balance)) return False # call the buy function with amount or default to no args call_function = routes_functions[token1_address]['functions'][token0_address] approve_token_spending(account, token0_address, token1_address, get_token_supply(token0_address, True)) token1_contract = load_contract(token1_address, load_contract_abi(token1_address)) amount = to_token_decimals(output_amount, token1_contract.functions.decimals().call()) try: tx = getattr(token1_contract.functions, call_function)(int(amount)).build_transaction({ "from": account.address, "nonce": get_nonce(account.address) }) except Web3ValidationError as e: if 'positional arguments with type(s) `int`' in str(e): for i in range(0, amount): # cancel the rest of this loop if the gas price is too damn high if get_mempool_gas_prices('rapid', gas_cache_seconds) > rapid_gas_fee_limit: logging.warning("Gas fees are too high") return None try: tx = getattr(token1_contract.functions, call_function)().build_transaction({ "from": account.address, "nonce": get_nonce(account.address) }) success = broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error( "{}. Failed to convert using {}".format(error, routes_functions[token1_address]['label'])) else: if success: logging.info("Called {}({}) from {}".format( call_function, amount, routes_functions[token1_address]['label'] )) else: logging.warning("Failed to call {}({}) from {}".format( call_function, amount, routes_functions[token1_address]['label'] )) return False else: raise Web3ValidationError(e) return True else: try: success = broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Failed to convert using {}".format(error, routes_functions[token1_address]['label'])) return False else: if success: logging.info("Called {}({}) from {}".format( call_function, amount, routes_functions[token1_address]['label'] )) return True else: logging.warning("Failed to call {}({}) from {}".format( call_function, amount, routes_functions[token1_address]['label'] )) return False def convert_tokens_multi(account, multi_address, token0_address, token1_address, iterations, attempts=18): # check if conversion route exists or is disabled routes_functions = json.load(open('./data/routes.json')) if token0_address not in routes_functions[multi_address]['functions'].keys(): raise Exception("Route not available for {} to {} in {}".format(token0_address, token1_address, multi_address)) elif routes_functions[multi_address]['functions'][token0_address][0] == '#': raise Exception("Route is disabled for {} to {} in {}".format(token0_address, token1_address, multi_address)) # get the cost required to convert tokens cost = routes_functions[multi_address]['costs'][token0_address] mints = routes_functions[multi_address]['mints'] or 1 tokens_cost = cost * iterations * mints tokens_required = round(tokens_cost, 15) # check if the wallet has enough tokens to convert if tokens_required > (tokens_balance := get_token_balance(token0_address, account.address)): logging.error("Need {} more tokens".format(tokens_required - tokens_balance)) return False # approve the tokens required to convert and determine how many loops approve_token_spending(account, token0_address, multi_address, get_token_supply(token0_address, True)) loops = math.floor(iterations / routes_functions[multi_address]['max_iterations']) if iterations % routes_functions[multi_address]['max_iterations'] != 0: loops += 1 # start calling multi mints for i in list(range(0, loops)): # cancel the rest of this loop if the gas price is too damn high if get_mempool_gas_prices('rapid', gas_cache_seconds) > rapid_gas_fee_limit: logging.warning("Gas fees are too high") return None if i + 1 < loops or iterations % routes_functions[multi_address]['max_iterations'] == 0: # do max iterations during loop call_iterations = routes_functions[multi_address]['max_iterations'] else: # on final loop run the remaining iterations call_iterations = iterations % routes_functions[multi_address]['max_iterations'] # call the multi mint function with iterations based on tokens minted call_function = routes_functions[multi_address]['functions'][token0_address] multi_contract = load_contract(multi_address, load_contract_abi(multi_address)) try: tx = getattr(multi_contract.functions, call_function)(call_iterations).build_transaction({ "from": account.address, "nonce": get_nonce(account.address) }) success = broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Failed to convert using {}".format(error, routes_functions[multi_address]['label'])) else: if success: logging.info("Called {}({}) from {}".format( call_function, call_iterations, routes_functions[multi_address]['label'] )) else: logging.warning("Failed to call {}({}) from {}".format( call_function, call_iterations, routes_functions[multi_address]['label'] )) return False return True def estimate_swap_result(router_name, token0_address, token1_address, token0_amount, attempts=18): routers = json.load(open('./data/routers.json')) router_contract = load_contract(routers[router_name][0], routers[router_name][1]) token0_info = get_token_info(token0_address) while attempts > 0: try: expected_output_amounts = router_contract.functions.getAmountsOut( int(token0_amount * 10 ** token0_info['decimals']), [token0_address, token1_address] ).call() except Exception as e: logging.debug(e) attempts -= 1 time.sleep(1) else: return expected_output_amounts return [] def from_token_decimals(amount, decimals): return amount / 10 ** decimals def generate_wallet(amount): addresses = [] for i in list(range(0, amount)): account = Web3().eth.account.create() keystore = Web3().eth.account.encrypt(account.key.hex(), os.getenv('SECRET')) folder = "./data/wallets/{}".format(account.address) os.makedirs("data/wallets", exist_ok=True) os.makedirs(folder, exist_ok=True) open("{}/keystore".format(folder), 'w').write(json.dumps(keystore, indent=4)) addresses.append(account) return addresses def get_abi_from_blockscout(address, attempts=18): while attempts > 0: try: r = requests.get("https://api.scan.pulsechain.com/api/v2/smart-contracts/{}".format(address)) r.raise_for_status() except RequestException: attempts -= 1 if attempts > 0: time.sleep(1) continue else: raise RequestException else: resp = r.json() if 'abi' in resp.keys(): return resp['abi'] else: return [] def get_average_gas_prices(average='median', tx_amount=100, attempts=18): if not (latest_block := get_block('latest', False, attempts)): return {} else: latest_block = latest_block['number'] gas_limit = [] gas_prices = [] for block_number in range(latest_block, latest_block - tx_amount, -1): if not (block := get_block(block_number, True)): return {} for _tx in block['transactions']: gas_limit.append(_tx['gas']) gas_prices.append(_tx['gasPrice']) if len(gas_prices) >= tx_amount: break match average: case 'mean': average_gas_limit = mean(gas_limit[:tx_amount]) average_gas_price = mean(gas_prices[:tx_amount]) case 'median': average_gas_limit = median(gas_limit[:tx_amount]) average_gas_price = median(gas_prices[:tx_amount]) case 'mode': average_gas_limit = mode(gas_limit[:tx_amount]) average_gas_price = mode(gas_prices[:tx_amount]) case _: raise Exception('Invalid average type') return { "gas_limit": average_gas_limit, "gas_price": average_gas_price } async def estimate_mempool_gas_prices(): pending = web3.eth.get_block('pending', full_transactions=True) gas_prices = [] for tx in pending['transactions']: if 'maxFeePerGas' in tx: gas_prices.append(web3.from_wei(tx['maxFeePerGas'], 'gwei')) elif 'gasPrice' in tx: gas_prices.append(web3.from_wei(tx['gasPrice'], 'gwei')) if not gas_prices: return None gas_prices.sort() very_slow = gas_prices[int(len(gas_prices) * 0.1)] # 10th percentile slow = gas_prices[int(len(gas_prices) * 0.25)] # 25th percentile standard = gas_prices[int(len(gas_prices) * 0.5)] # 50th percentile (median) fast = gas_prices[int(len(gas_prices) * 0.70)] # 70th percentile rapid = gas_prices[int(len(gas_prices) * 0.80)] # 80th percentile instant = gas_prices[int(len(gas_prices) * 0.90)] # 90th percentile return { 'very_slow': float(round(very_slow, 2)), 'slow': float(round(slow, 2)), 'standard': float(round(standard, 2)), 'fast': float(round(fast, 2)), 'rapid': float(round(rapid, 2)), 'instant': float(round(instant, 2)), 'avg': float(round(mean(gas_prices), 2)), 'median': float(round(median(gas_prices), 2)), 'lowest': float(round(min(gas_prices), 2)), 'highest': float(round(max(gas_prices), 2)), 'tx_count': len(gas_prices), 'timestamp': time.time() } def get_mempool_gas_prices(speed=None, cache_interval_seconds=10): speeds = ('rapid', 'fast', 'standard', 'slow',) os.makedirs(cache_folder := './data/cache/', exist_ok=True) gas = {} gas_file = "{}/mempool_gas.json".format(cache_folder) try: gas = json.load(open(gas_file)) except (JSONDecodeError, FileNotFoundError): pass if not gas or not gas['timestamp'] or (gas['timestamp'] + cache_interval_seconds < time.time()): try: _gas = asyncio.run(estimate_mempool_gas_prices()) except Exception as e: if not gas: logging.debug(e) return 5555 * 10 ** 369 else: if not _gas and not gas: logging.debug("No gas data") return 5555 * 10 ** 369 elif _gas: gas = _gas open(gas_file, 'w').write(json.dumps(gas, indent=4)) if type(speed) is str: try: return float(gas[speed]) except KeyError: raise KeyError("No such speed as '{}' in gas price data {}".format(speed, list(speeds))) return {speed: float(price) for speed, price in gas.items() if speed in speeds} def get_block(number, full_transactions=False, attempts=18): if type(number) is str and number not in ('latest',) and type(number) is not int: raise ValueError("Invalid block number") while attempts > 0: try: return web3.eth.get_block(number, full_transactions=full_transactions) except Exception as e: logging.debug(e) time.sleep(1) attempts -= 1 return None def get_last_block_base_fee(attempts=18): if latest_block := get_block('latest', False, attempts): base_fee = latest_block['baseFeePerGas'] return float(round(web3.from_wei(base_fee, 'gwei'), 2)) else: return -1 def get_nonce(address, attempts=18): while attempts > 0: try: return web3.eth.get_transaction_count(web3.to_checksum_address(address)) except Exception as e: logging.debug(e) time.sleep(1) attempts -= 1 return -1 def get_pls_balance(address, decimals=False, attempts=18): while attempts > 0: try: balance = web3.eth.get_balance(address) except Exception as e: logging.debug(e) time.sleep(1) attempts -= 1 else: if decimals: return balance else: return from_token_decimals(balance, 18) return -1 def get_token_balance(token_address, wallet_address, decimals=False): token_contract = load_contract(token_address) token_info = get_token_info(token_address) token_balance = token_contract.functions.balanceOf(wallet_address).call() if decimals: return token_balance else: return float(round(from_token_decimals(token_balance, token_info['decimals']), 15)) def get_token_info(token_address, attempts=18): os.makedirs(token_folder := "./data/tokens".format(token_address), exist_ok=True) token_info_file = "{}/{}.json".format(token_folder, token_address) if os.path.isfile(token_info_file): token_info = json.load(open(token_info_file)) if token_info['decimals'] is not None: return token_info token_name, token_symbol, token_decimals = None, None, None token_contract = load_contract(token_address) _attempts = attempts while _attempts > 0: try: token_name = token_contract.functions.name().call() except Web3Exception: _attempts -= 1 continue else: break _attempts = attempts while _attempts > 0: try: token_symbol = token_contract.functions.symbol().call() except Web3Exception: _attempts -= 1 continue else: break _attempts = attempts while _attempts > 0: try: token_decimals = token_contract.functions.decimals().call() except Web3Exception: _attempts -= 1 continue else: break token_info = {"name": token_name, "symbol": token_symbol, "decimals": token_decimals} open(token_info_file, 'w').write(json.dumps(token_info, indent=4)) return token_info def get_token_supply(token_address, decimals=False): token_contract = load_contract(token_address) token_info = get_token_info(token_address) token_supply = token_contract.functions.totalSupply().call() if decimals: return token_supply else: return float(round(from_token_decimals(token_supply, token_info['decimals']), 15)) def interpret_exception_message(e): logging.debug(e) if 'insufficient funds for gas * price + value' in str(e): return 'Not enough PLS' elif 'transfer amount exceeds balance' in str(e): return 'Not enough tokens' return e def load_contract(address, abi=None): if not abi: abi = load_contract_abi(address) if not abi: abi = json.load(open('./data/abi/ERC20.json')) return web3.eth.contract(address=address, abi=abi) def load_contract_abi(address): try: abi = json.load(open("./data/abi/{}.json".format(address))) except FileNotFoundError: try: abi = get_abi_from_blockscout(address) except Exception as e: logging.debug(e) raise FileNotFoundError("Download a copy of the abi from Blockscout to this folder") else: if abi: open("./data/abi/{}.json".format(address), 'w').write(json.dumps(abi, indent=4)) else: raise FileNotFoundError("No abi found for this contract") return abi def load_wallet(address, secret): file_path = "./data/wallets/{}/keystore".format(address) if not os.path.exists(file_path): raise FileNotFoundError("Can't find your wallet keystore for address: {}".format(address)) keystore = "\n".join([line.strip() for line in open(file_path, 'r+')]) private_key = web3.eth.account.decrypt(keystore, secret) return web3.eth.account.from_key(private_key) def log_end_loop(delay): if delay: logging.info("Waiting for {} seconds...".format(delay)) time.sleep(delay) logging.info("-" * 50) def mint_tokens(account, token_address, amount, attempts=18): rng_functions = json.load(open('./data/rng.json')) if token_address not in rng_functions: raise Exception("Mint/RNG function not available for {}".format(token_address)) call_function = random.choice(list(rng_functions[token_address]['functions'])) token_contract = load_contract(token_address, load_contract_abi(token_address)) token_info = get_token_info(token_address) loops = math.ceil(amount / rng_functions[token_address]['mints']) for i in list(range(0, loops)): tx = getattr(token_contract.functions, call_function)().build_transaction({ "from": account.address, "nonce": get_nonce(account.address) }) try: success = broadcast_transaction(account, tx, False, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{} to mint {}".format(error, rng_functions[token_address]['label'])) return False else: if success: logging.info("Called mint function for {} ({})".format(token_info['name'], token_info['symbol'])) else: logging.warning( "Failed to call mint function for {} ({})".format(token_info['name'], token_info['symbol'])) return False return True def sample_exchange_rate(router_name, token_address, quote_address, attempts=18): while attempts > 0: token_result = estimate_swap_result(router_name, token_address, quote_address, 1) if len(token_result) == 0: attempts -= 1 time.sleep(1) continue else: return token_result[1] return None def send_pls(account, to_address, amount, attempts=18): tx = { 'nonce': get_nonce(account.address), 'from': account.address, 'to': to_address, 'value': to_token_decimals(amount, 18), } try: return broadcast_transaction(account, tx, False, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Could not send to {}".format(error, to_address)) return False def send_tokens(account, token_address, to_address, amount, attempts=18): token_contract = load_contract(token_address) token_info = get_token_info(token_address) try: tx = token_contract.functions.transfer( to_address, to_token_decimals(amount, token_info['decimals']) ).build_transaction({ 'nonce': get_nonce(account.address), 'from': account.address }) return broadcast_transaction(account, tx, False, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Could not send {} ({}) to {}".format( error, token_info['name'], token_info['symbol'], to_address )) return False def set_logging(filename='app', level='INFO', backup_count=7): if hasattr(logging, level.upper()): os.makedirs('./data/logs/', exist_ok=True) logging.basicConfig( format='%(asctime)s %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S', level=getattr(logging, level.upper()), handlers=[ TimedRotatingFileHandler( "./data/logs/{}.log".format(filename), when="midnight", interval=1, backupCount=backup_count ), logging.StreamHandler(sys.stdout) ] ) return True raise Exception("Invalid logging level") def swap_tokens(account, router_name, token_route, estimated_swap_result, slippage_percent, to_address=None, taxed=False, attempts=18): routers = json.load(open('./data/routers.json')) router_contract = load_contract(routers[router_name][0], routers[router_name][1]) approve_token_spending(account, token_route[0], routers[router_name][0], estimated_swap_result[0]) if token_route[-1] == "0xA1077a294dDE1B09bB078844df40758a5D0f9a27": tx = router_contract.functions.swapExactTokensForETH( estimated_swap_result[0], estimated_swap_result[1] - round(estimated_swap_result[1] * (slippage_percent / 100)), token_route, to_address or account.address, int(time.time()) + (60 * 3) ) tx_params = { "from": account.address, "nonce": get_nonce(account.address) } elif token_route[0] == "0xA1077a294dDE1B09bB078844df40758a5D0f9a27": if taxed: swap_function = router_contract.functions.swapExactETHForTokensSupportingFeeOnTransferTokens else: swap_function = router_contract.functions.swapExactETHForTokens tx = swap_function( 0, token_route, to_address or account.address, int(time.time()) + (60 * 3) ) tx_params = { "from": account.address, "nonce": get_nonce(account.address), "value": estimated_swap_result[0] } else: if taxed: swap_function = router_contract.functions.swapExactTokensForETHSupportingFeeOnTransferTokens else: swap_function = router_contract.functions.swapExactTokensForETH tx = swap_function( estimated_swap_result[0], estimated_swap_result[1] - (estimated_swap_result[1] * slippage_percent), token_route, to_address or account.address, int(time.time()) + (60 * 3) ) tx_params = { "from": account.address, "nonce": get_nonce(account.address) } try: tx = tx.build_transaction(tx_params) return broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{}. Failed to swap".format(error)) return False def to_token_decimals(amount, decimals): amount = str(amount) if '.' in amount: decimals -= len(str(amount).split('.')[1]) return int(str(amount).replace('.', '') + '0' * decimals) def unwrap_pls(account, amount, attempts=18): wpls_contract = load_contract("0xA1077a294dDE1B09bB078844df40758a5D0f9a27") try: tx = wpls_contract.functions.withdraw(to_token_decimals(amount, 18)).build_transaction({ "from": account.address, "nonce": get_nonce(account.address) }) return broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{} to unwrap PLS".format(error)) return False def wrap_pls(account, amount, attempts=18): wpls_contract = load_contract("0xA1077a294dDE1B09bB078844df40758a5D0f9a27") try: tx = wpls_contract.functions.deposit().build_transaction({ "from": account.address, "nonce": get_nonce(account.address), "value": to_token_decimals(amount, 18) }) return broadcast_transaction(account, tx, True, attempts) except Exception as e: if error := interpret_exception_message(e): logging.error("{} to wrap PLS".format(error)) return False