786 lines
30 KiB
Python
Executable file
786 lines
30 KiB
Python
Executable file
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import asyncio
|
|
from json import JSONDecodeError
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from statistics import median, mean, mode
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from requests import RequestException
|
|
from web3 import Web3
|
|
from web3.exceptions import BlockNotFound, Web3Exception, Web3ValidationError
|
|
from web3_multi_provider import MultiProvider
|
|
|
|
load_dotenv()
|
|
for package in ('web3', 'web3_multi_provider', 'urllib3',):
|
|
logging.getLogger(package).setLevel(logging.ERROR)
|
|
|
|
web3 = Web3(MultiProvider(json.load(open('./data/rpc_servers.json'))))
|
|
|
|
gas_multiplier = float(os.getenv('GAS_MULTIPLIER'))
|
|
rapid_gas_fee_limit = int(os.getenv('GAS_FEE_RAPID_LIMIT'))
|
|
gas_cache_seconds = int(os.getenv('GAS_CACHE_SECONDS'))
|
|
wallet_a_address = os.getenv('WALLET_A_ADDRESS')
|
|
wallet_b_address = os.getenv('WALLET_B_ADDRESS')
|
|
wallet_c_address = os.getenv('WALLET_C_ADDRESS')
|
|
|
|
def apply_estimated_gas(tx, attempts=18):
|
|
while attempts > 0:
|
|
try:
|
|
if 'gas' not in tx:
|
|
tx['gas'] = web3.eth.estimate_gas(tx)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
attempts -= 1
|
|
time.sleep(1)
|
|
else:
|
|
return tx
|
|
|
|
|
|
def apply_gas_multiplier(tx, multiplier=None):
|
|
if not multiplier:
|
|
multiplier = os.getenv('GAS_MULTIPLIER')
|
|
try:
|
|
multiplier = float(multiplier)
|
|
except ValueError:
|
|
raise ValueError("Invalid float for GAS_MULTIPLIER")
|
|
else:
|
|
tx['gas'] = int(tx['gas'] * multiplier)
|
|
if 'maxFeePerGas' in tx:
|
|
tx['maxFeePerGas'] = int(tx['maxFeePerGas'] * multiplier)
|
|
return tx
|
|
|
|
|
|
def apply_median_gas_strategy(tx, tx_amount=100):
|
|
median_gas_price = get_average_gas_prices('median', tx_amount)['gas_price']
|
|
tx['maxFeePerGas'] = int(web3.to_wei(median_gas_price, 'wei'))
|
|
tx['maxPriorityFeePerGas'] = web3.to_wei(500, 'gwei')
|
|
return tx
|
|
|
|
|
|
def approve_token_spending(account, token_address, spender_address, amount, attempts=18):
|
|
token_contract = load_contract(token_address)
|
|
token_info = get_token_info(token_address)
|
|
token_amount = to_token_decimals(amount, token_info['decimals'])
|
|
if token_contract.functions.allowance(account.address, spender_address).call() < token_amount:
|
|
try:
|
|
tx = token_contract.functions.approve(spender_address, token_amount).build_transaction({
|
|
'nonce': get_nonce(account.address),
|
|
'from': account.address
|
|
})
|
|
return broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Failed to approve {} ({})".format(error, token_info['name'], token_info['symbol']))
|
|
return False
|
|
|
|
|
|
def broadcast_transaction(account, tx, auto_gas=True, attempts=18):
|
|
tx_hash = None
|
|
tx['chainId'] = 369
|
|
if not auto_gas:
|
|
tx = apply_estimated_gas(tx)
|
|
tx = apply_median_gas_strategy(tx)
|
|
tx = apply_gas_multiplier(tx)
|
|
logging.debug("Broadcasting TX: {}".format(tx))
|
|
_attempts = attempts
|
|
while _attempts > 0:
|
|
try:
|
|
signed_tx = web3.eth.account.sign_transaction(tx, private_key=account.key)
|
|
tx_hash = web3.eth.send_raw_transaction(signed_tx.rawTransaction)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
if "insufficient funds" in str(e):
|
|
logging.error("Not enough gas for this TX: {}".format(tx))
|
|
return False
|
|
elif "nonce too low" in str(e):
|
|
tx['nonce'] = get_nonce(account.address)
|
|
continue
|
|
elif "could not replace existing tx" in str(e):
|
|
tx['gas'] = int(tx['gas'] * 1.0369)
|
|
if 'maxFeePerGas' in tx:
|
|
tx['maxFeePerGas'] = int(tx['maxFeePerGas'] * 1.0369)
|
|
if 'maxPriorityFeePerGas' in tx:
|
|
tx['maxPriorityFeePerGas'] = int(tx['maxPriorityFeePerGas'] * 1.0369)
|
|
continue
|
|
elif "already known" in str(e):
|
|
pass
|
|
else:
|
|
_attempts -= 1
|
|
time.sleep(1)
|
|
if not tx_hash:
|
|
_attempts -= 1
|
|
time.sleep(1)
|
|
if _attempts != 0:
|
|
logging.debug("Rebroadcasting TX ... {}".format(attempts - _attempts))
|
|
continue
|
|
else:
|
|
try:
|
|
tx_receipt = web3.eth.wait_for_transaction_receipt(tx_hash, timeout=10)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
_attempts -= 1
|
|
time.sleep(1)
|
|
if _attempts != 0:
|
|
logging.debug("Rebroadcasting TX ... {}".format(attempts - _attempts))
|
|
else:
|
|
logging.debug("Confirmed TX: {}".format(tx_receipt))
|
|
return tx_receipt
|
|
return False
|
|
|
|
|
|
def convert_tokens(account, token0_address, token1_address, output_amount, attempts=18):
|
|
# check if conversion route exists
|
|
routes_functions = json.load(open('./data/routes.json'))
|
|
if token0_address not in routes_functions[token1_address]['functions'].keys():
|
|
raise Exception("Route not available for {} to {}".format(token0_address, token1_address))
|
|
|
|
# get the cost required to convert tokens
|
|
cost = routes_functions[token1_address]['costs'][token0_address]
|
|
tokens_required = cost * output_amount
|
|
if (tokens_balance := get_token_balance(token0_address, account.address)) < tokens_required:
|
|
logging.error("Need {} more tokens".format(tokens_required - tokens_balance))
|
|
return False
|
|
|
|
# call the buy function with amount or default to no args
|
|
call_function = routes_functions[token1_address]['functions'][token0_address]
|
|
approve_token_spending(account, token0_address, token1_address, get_token_supply(token0_address, True))
|
|
token1_contract = load_contract(token1_address, load_contract_abi(token1_address))
|
|
amount = to_token_decimals(output_amount, token1_contract.functions.decimals().call())
|
|
try:
|
|
tx = getattr(token1_contract.functions, call_function)(int(amount)).build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
})
|
|
except Web3ValidationError as e:
|
|
if 'positional arguments with type(s) `int`' in str(e):
|
|
for i in range(0, amount):
|
|
# cancel the rest of this loop if the gas price is too damn high
|
|
if get_mempool_gas_prices('rapid', gas_cache_seconds) > rapid_gas_fee_limit:
|
|
logging.warning("Gas fees are too high")
|
|
return None
|
|
try:
|
|
tx = getattr(token1_contract.functions, call_function)().build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
})
|
|
success = broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error(
|
|
"{}. Failed to convert using {}".format(error, routes_functions[token1_address]['label']))
|
|
else:
|
|
if success:
|
|
logging.info("Called {}({}) from {}".format(
|
|
call_function,
|
|
amount,
|
|
routes_functions[token1_address]['label']
|
|
))
|
|
else:
|
|
logging.warning("Failed to call {}({}) from {}".format(
|
|
call_function,
|
|
amount,
|
|
routes_functions[token1_address]['label']
|
|
))
|
|
return False
|
|
else:
|
|
raise Web3ValidationError(e)
|
|
return True
|
|
else:
|
|
try:
|
|
success = broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Failed to convert using {}".format(error, routes_functions[token1_address]['label']))
|
|
return False
|
|
else:
|
|
if success:
|
|
logging.info("Called {}({}) from {}".format(
|
|
call_function,
|
|
amount,
|
|
routes_functions[token1_address]['label']
|
|
))
|
|
return True
|
|
else:
|
|
logging.warning("Failed to call {}({}) from {}".format(
|
|
call_function,
|
|
amount,
|
|
routes_functions[token1_address]['label']
|
|
))
|
|
return False
|
|
|
|
|
|
def convert_tokens_multi(account, multi_address, token0_address, token1_address, iterations, attempts=18):
|
|
# check if conversion route exists or is disabled
|
|
routes_functions = json.load(open('./data/routes.json'))
|
|
if token0_address not in routes_functions[multi_address]['functions'].keys():
|
|
raise Exception("Route not available for {} to {} in {}".format(token0_address, token1_address, multi_address))
|
|
elif routes_functions[multi_address]['functions'][token0_address][0] == '#':
|
|
raise Exception("Route is disabled for {} to {} in {}".format(token0_address, token1_address, multi_address))
|
|
|
|
# get the cost required to convert tokens
|
|
cost = routes_functions[multi_address]['costs'][token0_address]
|
|
mints = routes_functions[multi_address]['mints'] or 1
|
|
tokens_cost = cost * iterations * mints
|
|
tokens_required = round(tokens_cost, 15)
|
|
|
|
# check if the wallet has enough tokens to convert
|
|
if tokens_required > (tokens_balance := get_token_balance(token0_address, account.address)):
|
|
logging.error("Need {} more tokens".format(tokens_required - tokens_balance))
|
|
return False
|
|
# approve the tokens required to convert and determine how many loops
|
|
approve_token_spending(account, token0_address, multi_address, get_token_supply(token0_address, True))
|
|
loops = math.floor(iterations / routes_functions[multi_address]['max_iterations'])
|
|
if iterations % routes_functions[multi_address]['max_iterations'] != 0:
|
|
loops += 1
|
|
# start calling multi mints
|
|
for i in list(range(0, loops)):
|
|
# cancel the rest of this loop if the gas price is too damn high
|
|
if get_mempool_gas_prices('rapid', gas_cache_seconds) > rapid_gas_fee_limit:
|
|
logging.warning("Gas fees are too high")
|
|
return None
|
|
if i + 1 < loops or iterations % routes_functions[multi_address]['max_iterations'] == 0:
|
|
# do max iterations during loop
|
|
call_iterations = routes_functions[multi_address]['max_iterations']
|
|
else:
|
|
# on final loop run the remaining iterations
|
|
call_iterations = iterations % routes_functions[multi_address]['max_iterations']
|
|
# call the multi mint function with iterations based on tokens minted
|
|
call_function = routes_functions[multi_address]['functions'][token0_address]
|
|
multi_contract = load_contract(multi_address, load_contract_abi(multi_address))
|
|
try:
|
|
tx = getattr(multi_contract.functions, call_function)(call_iterations).build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
})
|
|
success = broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Failed to convert using {}".format(error, routes_functions[multi_address]['label']))
|
|
else:
|
|
if success:
|
|
logging.info("Called {}({}) from {}".format(
|
|
call_function,
|
|
call_iterations,
|
|
routes_functions[multi_address]['label']
|
|
))
|
|
else:
|
|
logging.warning("Failed to call {}({}) from {}".format(
|
|
call_function,
|
|
call_iterations,
|
|
routes_functions[multi_address]['label']
|
|
))
|
|
return False
|
|
return True
|
|
|
|
|
|
def estimate_swap_result(router_name, token0_address, token1_address, token0_amount, attempts=18):
|
|
routers = json.load(open('./data/routers.json'))
|
|
router_contract = load_contract(routers[router_name][0], routers[router_name][1])
|
|
token0_info = get_token_info(token0_address)
|
|
while attempts > 0:
|
|
try:
|
|
expected_output_amounts = router_contract.functions.getAmountsOut(
|
|
int(token0_amount * 10 ** token0_info['decimals']),
|
|
[token0_address, token1_address]
|
|
).call()
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
attempts -= 1
|
|
time.sleep(1)
|
|
else:
|
|
return expected_output_amounts
|
|
return []
|
|
|
|
|
|
def from_token_decimals(amount, decimals):
|
|
return amount / 10 ** decimals
|
|
|
|
|
|
def generate_wallet(amount):
|
|
addresses = []
|
|
for i in list(range(0, amount)):
|
|
account = Web3().eth.account.create()
|
|
keystore = Web3().eth.account.encrypt(account.key.hex(), os.getenv('SECRET'))
|
|
folder = "./data/wallets/{}".format(account.address)
|
|
os.makedirs("data/wallets", exist_ok=True)
|
|
os.makedirs(folder, exist_ok=True)
|
|
open("{}/keystore".format(folder), 'w').write(json.dumps(keystore, indent=4))
|
|
addresses.append(account)
|
|
return addresses
|
|
|
|
|
|
def get_abi_from_blockscout(address, attempts=18):
|
|
while attempts > 0:
|
|
try:
|
|
r = requests.get("https://api.scan.pulsechain.com/api/v2/smart-contracts/{}".format(address))
|
|
r.raise_for_status()
|
|
except RequestException:
|
|
attempts -= 1
|
|
if attempts > 0:
|
|
time.sleep(1)
|
|
continue
|
|
else:
|
|
raise RequestException
|
|
else:
|
|
resp = r.json()
|
|
if 'abi' in resp.keys():
|
|
return resp['abi']
|
|
else:
|
|
return []
|
|
|
|
|
|
def get_average_gas_prices(average='median', tx_amount=100, attempts=18):
|
|
if not (latest_block := get_block('latest', False, attempts)):
|
|
return {}
|
|
else:
|
|
latest_block = latest_block['number']
|
|
gas_limit = []
|
|
gas_prices = []
|
|
for block_number in range(latest_block, latest_block - tx_amount, -1):
|
|
if not (block := get_block(block_number, True)):
|
|
return {}
|
|
for _tx in block['transactions']:
|
|
gas_limit.append(_tx['gas'])
|
|
gas_prices.append(_tx['gasPrice'])
|
|
if len(gas_prices) >= tx_amount:
|
|
break
|
|
match average:
|
|
case 'mean':
|
|
average_gas_limit = mean(gas_limit[:tx_amount])
|
|
average_gas_price = mean(gas_prices[:tx_amount])
|
|
case 'median':
|
|
average_gas_limit = median(gas_limit[:tx_amount])
|
|
average_gas_price = median(gas_prices[:tx_amount])
|
|
case 'mode':
|
|
average_gas_limit = mode(gas_limit[:tx_amount])
|
|
average_gas_price = mode(gas_prices[:tx_amount])
|
|
case _:
|
|
raise Exception('Invalid average type')
|
|
return {
|
|
"gas_limit": average_gas_limit,
|
|
"gas_price": average_gas_price
|
|
}
|
|
|
|
|
|
|
|
async def estimate_mempool_gas_prices():
|
|
pending = web3.eth.get_block('pending', full_transactions=True)
|
|
gas_prices = []
|
|
for tx in pending['transactions']:
|
|
if 'maxFeePerGas' in tx:
|
|
gas_prices.append(web3.from_wei(tx['maxFeePerGas'], 'gwei'))
|
|
elif 'gasPrice' in tx:
|
|
gas_prices.append(web3.from_wei(tx['gasPrice'], 'gwei'))
|
|
if not gas_prices:
|
|
return None
|
|
gas_prices.sort()
|
|
very_slow = gas_prices[int(len(gas_prices) * 0.1)] # 10th percentile
|
|
slow = gas_prices[int(len(gas_prices) * 0.25)] # 25th percentile
|
|
standard = gas_prices[int(len(gas_prices) * 0.5)] # 50th percentile (median)
|
|
fast = gas_prices[int(len(gas_prices) * 0.70)] # 70th percentile
|
|
rapid = gas_prices[int(len(gas_prices) * 0.80)] # 80th percentile
|
|
instant = gas_prices[int(len(gas_prices) * 0.90)] # 90th percentile
|
|
return {
|
|
'very_slow': float(round(very_slow, 2)),
|
|
'slow': float(round(slow, 2)),
|
|
'standard': float(round(standard, 2)),
|
|
'fast': float(round(fast, 2)),
|
|
'rapid': float(round(rapid, 2)),
|
|
'instant': float(round(instant, 2)),
|
|
'avg': float(round(mean(gas_prices), 2)),
|
|
'median': float(round(median(gas_prices), 2)),
|
|
'lowest': float(round(min(gas_prices), 2)),
|
|
'highest': float(round(max(gas_prices), 2)),
|
|
'tx_count': len(gas_prices),
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
def get_mempool_gas_prices(speed=None, cache_interval_seconds=10):
|
|
speeds = ('rapid', 'fast', 'standard', 'slow',)
|
|
os.makedirs(cache_folder := './data/cache/', exist_ok=True)
|
|
gas = {}
|
|
gas_file = "{}/mempool_gas.json".format(cache_folder)
|
|
|
|
try:
|
|
gas = json.load(open(gas_file))
|
|
except (JSONDecodeError, FileNotFoundError):
|
|
pass
|
|
if not gas or not gas['timestamp'] or (gas['timestamp'] + cache_interval_seconds < time.time()):
|
|
try:
|
|
_gas = asyncio.run(estimate_mempool_gas_prices())
|
|
except Exception as e:
|
|
if not gas:
|
|
logging.debug(e)
|
|
return 5555 * 10 ** 369
|
|
else:
|
|
if not _gas and not gas:
|
|
logging.debug("No gas data")
|
|
return 5555 * 10 ** 369
|
|
elif _gas:
|
|
gas = _gas
|
|
open(gas_file, 'w').write(json.dumps(gas, indent=4))
|
|
if type(speed) is str:
|
|
try:
|
|
return float(gas[speed])
|
|
except KeyError:
|
|
raise KeyError("No such speed as '{}' in gas price data {}".format(speed, list(speeds)))
|
|
return {speed: float(price) for speed, price in gas.items() if speed in speeds}
|
|
|
|
|
|
def get_block(number, full_transactions=False, attempts=18):
|
|
if type(number) is str and number not in ('latest',) and type(number) is not int:
|
|
raise ValueError("Invalid block number")
|
|
while attempts > 0:
|
|
try:
|
|
return web3.eth.get_block(number, full_transactions=full_transactions)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
time.sleep(1)
|
|
attempts -= 1
|
|
return None
|
|
|
|
|
|
def get_last_block_base_fee(attempts=18):
|
|
if latest_block := get_block('latest', False, attempts):
|
|
base_fee = latest_block['baseFeePerGas']
|
|
return float(round(web3.from_wei(base_fee, 'gwei'), 2))
|
|
else:
|
|
return -1
|
|
|
|
|
|
def get_nonce(address, attempts=18):
|
|
while attempts > 0:
|
|
try:
|
|
return web3.eth.get_transaction_count(web3.to_checksum_address(address))
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
time.sleep(1)
|
|
attempts -= 1
|
|
return -1
|
|
|
|
|
|
def get_pls_balance(address, decimals=False, attempts=18):
|
|
while attempts > 0:
|
|
try:
|
|
balance = web3.eth.get_balance(address)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
time.sleep(1)
|
|
attempts -= 1
|
|
else:
|
|
if decimals:
|
|
return balance
|
|
else:
|
|
return from_token_decimals(balance, 18)
|
|
return -1
|
|
|
|
|
|
def get_token_balance(token_address, wallet_address, decimals=False):
|
|
token_contract = load_contract(token_address)
|
|
token_info = get_token_info(token_address)
|
|
token_balance = token_contract.functions.balanceOf(wallet_address).call()
|
|
if decimals:
|
|
return token_balance
|
|
else:
|
|
return float(round(from_token_decimals(token_balance, token_info['decimals']), 15))
|
|
|
|
|
|
def get_token_info(token_address, attempts=18):
|
|
os.makedirs(token_folder := "./data/tokens".format(token_address), exist_ok=True)
|
|
token_info_file = "{}/{}.json".format(token_folder, token_address)
|
|
if os.path.isfile(token_info_file):
|
|
token_info = json.load(open(token_info_file))
|
|
if token_info['decimals'] is not None:
|
|
return token_info
|
|
token_name, token_symbol, token_decimals = None, None, None
|
|
token_contract = load_contract(token_address)
|
|
_attempts = attempts
|
|
while _attempts > 0:
|
|
try:
|
|
token_name = token_contract.functions.name().call()
|
|
except Web3Exception:
|
|
_attempts -= 1
|
|
continue
|
|
else:
|
|
break
|
|
_attempts = attempts
|
|
while _attempts > 0:
|
|
try:
|
|
token_symbol = token_contract.functions.symbol().call()
|
|
except Web3Exception:
|
|
_attempts -= 1
|
|
continue
|
|
else:
|
|
break
|
|
_attempts = attempts
|
|
while _attempts > 0:
|
|
try:
|
|
token_decimals = token_contract.functions.decimals().call()
|
|
except Web3Exception:
|
|
_attempts -= 1
|
|
continue
|
|
else:
|
|
break
|
|
token_info = {"name": token_name, "symbol": token_symbol, "decimals": token_decimals}
|
|
open(token_info_file, 'w').write(json.dumps(token_info, indent=4))
|
|
return token_info
|
|
|
|
|
|
def get_token_supply(token_address, decimals=False):
|
|
token_contract = load_contract(token_address)
|
|
token_info = get_token_info(token_address)
|
|
token_supply = token_contract.functions.totalSupply().call()
|
|
if decimals:
|
|
return token_supply
|
|
else:
|
|
return float(round(from_token_decimals(token_supply, token_info['decimals']), 15))
|
|
|
|
|
|
def interpret_exception_message(e):
|
|
logging.debug(e)
|
|
if 'insufficient funds for gas * price + value' in str(e):
|
|
return 'Not enough PLS'
|
|
elif 'transfer amount exceeds balance' in str(e):
|
|
return 'Not enough tokens'
|
|
return e
|
|
|
|
|
|
def load_contract(address, abi=None):
|
|
if not abi:
|
|
abi = load_contract_abi(address)
|
|
if not abi:
|
|
abi = json.load(open('./data/abi/ERC20.json'))
|
|
return web3.eth.contract(address=address, abi=abi)
|
|
|
|
|
|
def load_contract_abi(address):
|
|
try:
|
|
abi = json.load(open("./data/abi/{}.json".format(address)))
|
|
except FileNotFoundError:
|
|
try:
|
|
abi = get_abi_from_blockscout(address)
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
raise FileNotFoundError("Download a copy of the abi from Blockscout to this folder")
|
|
else:
|
|
if abi:
|
|
open("./data/abi/{}.json".format(address), 'w').write(json.dumps(abi, indent=4))
|
|
else:
|
|
raise FileNotFoundError("No abi found for this contract")
|
|
return abi
|
|
|
|
|
|
def load_wallet(address, secret):
|
|
file_path = "./data/wallets/{}/keystore".format(address)
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError("Can't find your wallet keystore for address: {}".format(address))
|
|
keystore = "\n".join([line.strip() for line in open(file_path, 'r+')])
|
|
private_key = web3.eth.account.decrypt(keystore, secret)
|
|
return web3.eth.account.from_key(private_key)
|
|
|
|
|
|
def log_end_loop(delay):
|
|
if delay:
|
|
logging.info("Waiting for {} seconds...".format(delay))
|
|
time.sleep(delay)
|
|
logging.info("-" * 50)
|
|
|
|
|
|
def mint_tokens(account, token_address, amount, attempts=18):
|
|
rng_functions = json.load(open('./data/rng.json'))
|
|
if token_address not in rng_functions:
|
|
raise Exception("Mint/RNG function not available for {}".format(token_address))
|
|
call_function = random.choice(list(rng_functions[token_address]['functions']))
|
|
token_contract = load_contract(token_address, load_contract_abi(token_address))
|
|
token_info = get_token_info(token_address)
|
|
loops = math.ceil(amount / rng_functions[token_address]['mints'])
|
|
for i in list(range(0, loops)):
|
|
tx = getattr(token_contract.functions, call_function)().build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
})
|
|
try:
|
|
success = broadcast_transaction(account, tx, False, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{} to mint {}".format(error, rng_functions[token_address]['label']))
|
|
return False
|
|
else:
|
|
if success:
|
|
logging.info("Called mint function for {} ({})".format(token_info['name'], token_info['symbol']))
|
|
else:
|
|
logging.warning(
|
|
"Failed to call mint function for {} ({})".format(token_info['name'], token_info['symbol']))
|
|
return False
|
|
return True
|
|
|
|
|
|
def sample_exchange_rate(router_name, token_address, quote_address, attempts=18):
|
|
while attempts > 0:
|
|
token_result = estimate_swap_result(router_name, token_address, quote_address, 1)
|
|
if len(token_result) == 0:
|
|
attempts -= 1
|
|
time.sleep(1)
|
|
continue
|
|
else:
|
|
return token_result[1]
|
|
return None
|
|
|
|
|
|
def send_pls(account, to_address, amount, attempts=18):
|
|
tx = {
|
|
'nonce': get_nonce(account.address),
|
|
'from': account.address,
|
|
'to': to_address,
|
|
'value': to_token_decimals(amount, 18),
|
|
}
|
|
try:
|
|
return broadcast_transaction(account, tx, False, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Could not send to {}".format(error, to_address))
|
|
return False
|
|
|
|
|
|
def send_tokens(account, token_address, to_address, amount, attempts=18):
|
|
token_contract = load_contract(token_address)
|
|
token_info = get_token_info(token_address)
|
|
try:
|
|
tx = token_contract.functions.transfer(
|
|
to_address,
|
|
to_token_decimals(amount, token_info['decimals'])
|
|
).build_transaction({
|
|
'nonce': get_nonce(account.address),
|
|
'from': account.address
|
|
})
|
|
return broadcast_transaction(account, tx, False, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Could not send {} ({}) to {}".format(
|
|
error,
|
|
token_info['name'],
|
|
token_info['symbol'],
|
|
to_address
|
|
))
|
|
return False
|
|
|
|
|
|
def set_logging(filename='app', level='INFO', backup_count=7):
|
|
if hasattr(logging, level.upper()):
|
|
os.makedirs('./data/logs/', exist_ok=True)
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(name)s %(levelname)s %(message)s',
|
|
datefmt='%H:%M:%S',
|
|
level=getattr(logging, level.upper()),
|
|
handlers=[
|
|
TimedRotatingFileHandler(
|
|
"./data/logs/{}.log".format(filename),
|
|
when="midnight",
|
|
interval=1,
|
|
backupCount=backup_count
|
|
),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
return True
|
|
raise Exception("Invalid logging level")
|
|
|
|
|
|
def swap_tokens(account, router_name, token_route, estimated_swap_result, slippage_percent, to_address=None, taxed=False, attempts=18):
|
|
routers = json.load(open('./data/routers.json'))
|
|
router_contract = load_contract(routers[router_name][0], routers[router_name][1])
|
|
approve_token_spending(account, token_route[0], routers[router_name][0], estimated_swap_result[0])
|
|
if token_route[-1] == "0xA1077a294dDE1B09bB078844df40758a5D0f9a27":
|
|
tx = router_contract.functions.swapExactTokensForETH(
|
|
estimated_swap_result[0],
|
|
estimated_swap_result[1] - round(estimated_swap_result[1] * (slippage_percent / 100)),
|
|
token_route,
|
|
to_address or account.address,
|
|
int(time.time()) + (60 * 3)
|
|
)
|
|
tx_params = {
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
}
|
|
elif token_route[0] == "0xA1077a294dDE1B09bB078844df40758a5D0f9a27":
|
|
if taxed:
|
|
swap_function = router_contract.functions.swapExactETHForTokensSupportingFeeOnTransferTokens
|
|
else:
|
|
swap_function = router_contract.functions.swapExactETHForTokens
|
|
tx = swap_function(
|
|
0,
|
|
token_route,
|
|
to_address or account.address,
|
|
int(time.time()) + (60 * 3)
|
|
)
|
|
tx_params = {
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address),
|
|
"value": estimated_swap_result[0]
|
|
}
|
|
else:
|
|
if taxed:
|
|
swap_function = router_contract.functions.swapExactTokensForETHSupportingFeeOnTransferTokens
|
|
else:
|
|
swap_function = router_contract.functions.swapExactTokensForETH
|
|
tx = swap_function(
|
|
estimated_swap_result[0],
|
|
estimated_swap_result[1] - (estimated_swap_result[1] * slippage_percent),
|
|
token_route,
|
|
to_address or account.address,
|
|
int(time.time()) + (60 * 3)
|
|
)
|
|
tx_params = {
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
}
|
|
try:
|
|
tx = tx.build_transaction(tx_params)
|
|
return broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{}. Failed to swap".format(error))
|
|
return False
|
|
|
|
|
|
def to_token_decimals(amount, decimals):
|
|
amount = str(amount)
|
|
if '.' in amount:
|
|
decimals -= len(str(amount).split('.')[1])
|
|
return int(str(amount).replace('.', '') + '0' * decimals)
|
|
|
|
|
|
def unwrap_pls(account, amount, attempts=18):
|
|
wpls_contract = load_contract("0xA1077a294dDE1B09bB078844df40758a5D0f9a27")
|
|
try:
|
|
tx = wpls_contract.functions.withdraw(to_token_decimals(amount, 18)).build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address)
|
|
})
|
|
return broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{} to unwrap PLS".format(error))
|
|
return False
|
|
|
|
|
|
def wrap_pls(account, amount, attempts=18):
|
|
wpls_contract = load_contract("0xA1077a294dDE1B09bB078844df40758a5D0f9a27")
|
|
try:
|
|
tx = wpls_contract.functions.deposit().build_transaction({
|
|
"from": account.address,
|
|
"nonce": get_nonce(account.address),
|
|
"value": to_token_decimals(amount, 18)
|
|
})
|
|
return broadcast_transaction(account, tx, True, attempts)
|
|
except Exception as e:
|
|
if error := interpret_exception_message(e):
|
|
logging.error("{} to wrap PLS".format(error))
|
|
return False
|