import argparse
import copy
import os
from typing import Optional, Union, Tuple, Dict, overload
from termcolor import colored
import cybertensor
from cybertensor.config import Config
from cybertensor.keypair import Keypair
from cybertensor.utils import is_valid_cybertensor_address_or_public_key
def display_mnemonic_msg(keypair: Keypair, key_type: str):
mnemonic = keypair.mnemonic
mnemonic_green = colored(mnemonic, "green")
print(
colored(
"\nIMPORTANT: Store this mnemonic in a secure (preferable offline place), as anyone "
"who has possession of this mnemonic can use it to regenerate the key and access your tokens. \n",
"red",
)
)
print(
f"The mnemonic to the new {key_type} is:\n"
f"\n"
f"{mnemonic_green}\n"
f"You can use the mnemonic to recreate the key in case it gets lost. "
f"The command to use to regenerate the key using this mnemonic is:\n"
f"ctcli w regen_{key_type} --mnemonic {mnemonic}\n"
)
class Wallet:
@classmethod
def config(cls) -> "Config":
parser = argparse.ArgumentParser()
cls.add_args(parser)
return Config(parser, args=[])
@classmethod
def help(cls):
parser = argparse.ArgumentParser()
cls.add_args(parser)
print(cls.__new__.__doc__)
parser.print_help()
@classmethod
def add_args(cls, parser: argparse.ArgumentParser, prefix: str = None):
prefix_str = "" if prefix is None else prefix + "."
try:
default_name = os.getenv("CT_WALLET_NAME") or "default"
default_hotkey = os.getenv("CT_WALLET_NAME") or "default"
default_path = os.getenv("CT_WALLET_PATH") or "~/.cybertensor/wallets/"
parser.add_argument(
"--" + prefix_str + "wallet.name",
required=False,
default=default_name,
help="The name of the wallet to unlock for running cybertensor "
"(name mock is reserved for mocking this wallet)",
)
parser.add_argument(
"--" + prefix_str + "wallet.hotkey",
required=False,
default=default_hotkey,
help="The name of the wallet's hotkey.",
)
parser.add_argument(
"--" + prefix_str + "wallet.path",
required=False,
default=default_path,
help="The path to your cybertensor wallets",
)
parser.add_argument(
"--no_prompt",
dest="no_prompt",
action="store_true",
help="""Set true to avoid prompting the user.""",
default=False,
)
except argparse.ArgumentError as e:
print(f'ArgumentError {e}')
pass
def __init__(
self,
name: str = None,
hotkey: str = None,
path: str = None,
config: "Config" = None,
):
if config is None:
config = Wallet.config()
self.config = copy.deepcopy(config)
self.config.wallet.name = name or self.config.wallet.get(
"name", cybertensor.defaults.wallet.name
)
self.config.wallet.hotkey = hotkey or self.config.wallet.get(
"hotkey", cybertensor.defaults.wallet.hotkey
)
self.config.wallet.path = path or self.config.wallet.get(
"path", cybertensor.defaults.wallet.path
)
self.name = self.config.wallet.name
self.path = self.config.wallet.path
self.hotkey_str = self.config.wallet.hotkey
self._hotkey = None
self._coldkey = None
self._coldkeypub = None
def __str__(self):
return f"wallet({self.name}, {self.hotkey_str}, {self.path})"
def __repr__(self):
return self.__str__()
def create_if_non_existent(
self, coldkey_use_password: bool = True, hotkey_use_password: bool = False
) -> "Wallet":
return self.create(coldkey_use_password, hotkey_use_password)
def create(
self, coldkey_use_password: bool = True, hotkey_use_password: bool = False
) -> "Wallet":
if (
not self.coldkey_file.exists_on_device()
and not self.coldkeypub_file.exists_on_device()
):
self.create_new_coldkey(n_words=12, use_password=coldkey_use_password)
if not self.hotkey_file.exists_on_device():
self.create_new_hotkey(n_words=12, use_password=hotkey_use_password)
return self
def recreate(
self, coldkey_use_password: bool = True, hotkey_use_password: bool = False
) -> "Wallet":
self.create_new_coldkey(n_words=12, use_password=coldkey_use_password)
self.create_new_hotkey(n_words=12, use_password=hotkey_use_password)
return self
@property
def hotkey_file(self) -> "cybertensor.keyfile":
wallet_path = os.path.expanduser(os.path.join(self.path, self.name))
hotkey_path = os.path.join(wallet_path, "hotkeys", self.hotkey_str)
return cybertensor.keyfile(path=hotkey_path)
@property
def coldkey_file(self) -> "cybertensor.keyfile":
wallet_path = os.path.expanduser(os.path.join(self.path, self.name))
coldkey_path = os.path.join(wallet_path, "coldkey")
return cybertensor.keyfile(path=coldkey_path)
@property
def coldkeypub_file(self) -> "cybertensor.keyfile":
wallet_path = os.path.expanduser(os.path.join(self.path, self.name))
coldkeypub_path = os.path.join(wallet_path, "coldkeypub.txt")
return cybertensor.keyfile(path=coldkeypub_path)
def set_hotkey(
self,
keypair: "cybertensor.Keypair",
encrypt: bool = False,
overwrite: bool = False,
) -> "cybertensor.keyfile":
self._hotkey = keypair
self.hotkey_file.set_keypair(keypair, encrypt=encrypt, overwrite=overwrite)
def set_coldkeypub(
self,
keypair: "cybertensor.Keypair",
encrypt: bool = False,
overwrite: bool = False,
) -> "cybertensor.keyfile":
self._coldkeypub = cybertensor.Keypair(
address=keypair.address, public_key=keypair.public_key
)
self.coldkeypub_file.set_keypair(
self._coldkeypub, encrypt=encrypt, overwrite=overwrite
)
def set_coldkey(
self,
keypair: "cybertensor.Keypair",
encrypt: bool = True,
overwrite: bool = False,
) -> "cybertensor.keyfile":
self._coldkey = keypair
self.coldkey_file.set_keypair(
self._coldkey, encrypt=encrypt, overwrite=overwrite
)
def get_coldkey(self, password: str = None) -> "cybertensor.Keypair":
return self.coldkey_file.get_keypair(password=password)
def get_hotkey(self, password: str = None) -> "cybertensor.Keypair":
return self.hotkey_file.get_keypair(password=password)
def get_coldkeypub(self, password: str = None) -> "cybertensor.Keypair":
return self.coldkeypub_file.get_keypair(password=password)
@property
def hotkey(self) -> "cybertensor.Keypair":
if self._hotkey is None:
self._hotkey = self.hotkey_file.keypair
return self._hotkey
@property
def coldkey(self) -> "cybertensor.Keypair":
if self._coldkey is None:
self._coldkey = self.coldkey_file.keypair
return self._coldkey
@property
def coldkeypub(self) -> "cybertensor.Keypair":
if self._coldkeypub is None:
self._coldkeypub = self.coldkeypub_file.keypair
return self._coldkeypub
def new_coldkey(
self,
n_words: int = 12,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
self.create_new_coldkey(n_words, use_password, overwrite, suppress)
def create_new_coldkey(
self,
n_words: int = 12,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
mnemonic = Keypair.generate_mnemonic(n_words)
keypair = Keypair.create_from_mnemonic(mnemonic)
if not suppress:
display_mnemonic_msg(keypair, "coldkey")
self.set_coldkey(keypair, encrypt=use_password, overwrite=overwrite)
self.set_coldkeypub(keypair, overwrite=overwrite)
return self
def new_hotkey(
self,
n_words: int = 12,
use_password: bool = False,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
self.create_new_hotkey(n_words, use_password, overwrite, suppress)
def create_new_hotkey(
self,
n_words: int = 12,
use_password: bool = False,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
mnemonic = Keypair.generate_mnemonic(n_words)
keypair = Keypair.create_from_mnemonic(mnemonic)
if not suppress:
display_mnemonic_msg(keypair, "hotkey")
self.set_hotkey(keypair, encrypt=use_password, overwrite=overwrite)
return self
def regenerate_coldkeypub(
self,
address: Optional[str] = None,
public_key: Optional[Union[str, bytes]] = None,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
if address is None and public_key is None:
raise ValueError("Either address or public_key must be passed")
if not is_valid_cybertensor_address_or_public_key(
address if address is not None else public_key
):
raise ValueError(
f"Invalid {'address' if address is not None else 'public_key'}"
)
if address is not None:
keypair = Keypair(
address=address,
public_key=public_key,
prefix=cybertensor.__chain_address_prefix__,
)
else:
keypair = Keypair(
address=address,
public_key=public_key,
prefix=cybertensor.__chain_address_prefix__,
)
self.set_coldkeypub(keypair, overwrite=overwrite)
return self
regen_coldkeypub = regenerate_coldkeypub
@overload
def regenerate_coldkey(
self,
mnemonic: Optional[Union[list, str]] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
@overload
def regenerate_coldkey(
self,
seed: Optional[str] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
@overload
def regenerate_coldkey(
self,
json: Optional[Tuple[Union[str, Dict], str]] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
def regenerate_coldkey(
self,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
**kwargs,
) -> "Wallet":
if len(kwargs) == 0:
raise ValueError("Must pass either mnemonic, seed, or json")
mnemonic = kwargs.get("mnemonic", None)
seed = kwargs.get("seed", None)
json = kwargs.get("json", None)
if mnemonic is None and seed is None and json is None:
raise ValueError("Must pass either mnemonic, seed, or json")
if mnemonic is not None:
if isinstance(mnemonic, str):
mnemonic = mnemonic.split()
if len(mnemonic) not in [12, 15, 18, 21, 24]:
raise ValueError(
"Mnemonic has invalid size. This should be 12,15,18,21 or 24 words"
)
keypair = Keypair.create_from_mnemonic(
mnemonic=" ".join(mnemonic),
prefix=cybertensor.__chain_address_prefix__,
)
if not suppress:
display_mnemonic_msg(keypair, "coldkey")
elif seed is not None:
raise ValueError("Not implemented")
else:
raise ValueError("Not implemented")
self.set_coldkey(keypair, encrypt=use_password, overwrite=overwrite)
self.set_coldkeypub(keypair, overwrite=overwrite)
return self
regen_coldkey = regenerate_coldkey
@overload
def regenerate_hotkey(
self,
mnemonic: Optional[Union[list, str]] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
@overload
def regenerate_hotkey(
self,
seed: Optional[str] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
@overload
def regenerate_hotkey(
self,
json: Optional[Tuple[Union[str, Dict], str]] = None,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
) -> "Wallet":
...
def regenerate_hotkey(
self,
use_password: bool = True,
overwrite: bool = False,
suppress: bool = False,
**kwargs,
) -> "Wallet":
if len(kwargs) == 0:
raise ValueError("Must pass either mnemonic, seed, or json")
mnemonic = kwargs.get("mnemonic", None)
seed = kwargs.get("seed", None)
json = kwargs.get("json", None)
if mnemonic is None and seed is None and json is None:
raise ValueError("Must pass either mnemonic, seed, or json")
if mnemonic is not None:
if isinstance(mnemonic, str):
mnemonic = mnemonic.split()
if len(mnemonic) not in [12, 15, 18, 21, 24]:
raise ValueError(
"Mnemonic has invalid size. This should be 12,15,18,21 or 24 words"
)
keypair = Keypair.create_from_mnemonic(
mnemonic=" ".join(mnemonic),
prefix=cybertensor.__chain_address_prefix__,
)
if not suppress:
display_mnemonic_msg(keypair, "hotkey")
elif seed is not None:
raise ValueError("Not implemented")
else:
raise ValueError("Not implemented")
self.set_hotkey(keypair, encrypt=use_password, overwrite=overwrite)
return self
regen_hotkey = regenerate_hotkey