Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/embit/bip85.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def derive_entropy(root, app_index, path):
"""
Derive app-specific bip85 entropy using path m/83696968'/app_index'/...path'
"""
assert max(path) < HARDENED_INDEX
if max(path) >= HARDENED_INDEX:
raise ValueError("Path elements must be less than 2^31")
derivation = [HARDENED_INDEX + 83696968, HARDENED_INDEX + app_index] + [
p + HARDENED_INDEX for p in path
]
Expand All @@ -27,7 +28,8 @@ def derive_entropy(root, app_index, path):

def derive_mnemonic(root, num_words=12, index=0, language=LANGUAGES.ENGLISH):
"""Derive a new mnemonic with num_words using language (code, wordlist)"""
assert num_words in [12, 18, 24]
if num_words not in [12, 18, 24]:
raise ValueError("Number of words must be 12, 18 or 24")
langcode, wordlist = language
path = [langcode, num_words, index]
entropy = derive_entropy(root, 39, path)
Expand All @@ -49,7 +51,9 @@ def derive_xprv(root, index=0):

def derive_hex(root, num_bytes=32, index=0):
"""Derive raw entropy from 16 to 64 bytes long"""
assert num_bytes <= 64
assert num_bytes >= 16
if num_bytes > 64:
raise ValueError("Number of bytes must not exceed 64")
if num_bytes < 16:
raise ValueError("Number of bytes must be at least 16")
entropy = derive_entropy(root, 128169, [num_bytes, index])
return entropy[:num_bytes]
6 changes: 4 additions & 2 deletions src/embit/descriptor/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def __init__(self, fingerprint: bytes, derivation: list):
def from_string(cls, s: str):
arr = s.split("/")
mfp = unhexlify(arr[0])
assert len(mfp) == 4
if len(mfp) != 4:
raise ArgumentError("Invalid fingerprint length")
arr[0] = "m"
path = "/".join(arr)
derivation = bip32.parse_path(path)
Expand Down Expand Up @@ -315,7 +316,8 @@ def xonly(self):
return self.key.xonly()

def taproot_tweak(self, h=b""):
assert self.taproot
if not self.taproot:
raise ArgumentError("Key is not taproot")
return self.key.taproot_tweak(h)

def serialize(self):
Expand Down
16 changes: 8 additions & 8 deletions src/embit/descriptor/descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def from_string(cls, desc):
@classmethod
def read_from(cls, s):
# starts with sh(wsh()), sh() or wsh()
start = s.read(7)
start = s.read(8)
sh = False
wsh = False
wpkh = False
Expand All @@ -303,30 +303,30 @@ def read_from(cls, s):
taptree = TapTree()
if start.startswith(b"tr("):
taproot = True
s.seek(-4, 1)
s.seek(-5, 1)
elif start.startswith(b"sh(wsh("):
sh = True
wsh = True
s.seek(-1, 1)
elif start.startswith(b"wsh("):
sh = False
wsh = True
s.seek(-3, 1)
elif start.startswith(b"sh(wpkh"):
s.seek(-4, 1)
elif start.startswith(b"sh(wpkh("):
is_miniscript = False
sh = True
wpkh = True
assert s.read(1) == b"("
elif start.startswith(b"wpkh("):
is_miniscript = False
wpkh = True
s.seek(-2, 1)
s.seek(-3, 1)
elif start.startswith(b"pkh("):
is_miniscript = False
s.seek(-3, 1)
s.seek(-4, 1)
elif start.startswith(b"sh("):
sh = True
wsh = False
s.seek(-4, 1)
s.seek(-5, 1)
else:
raise ValueError("Invalid descriptor (starts with '%s')" % start.decode())
# taproot always has a key, and may have taptree miniscript
Expand Down
6 changes: 4 additions & 2 deletions src/embit/ec.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def read_from(cls, stream):

class SchnorrSig(EmbitBase):
def __init__(self, sig):
assert len(sig) == 64
if len(sig) != 64:
raise ECError("Invalid schnorr signature")
self._sig = sig

def write_to(self, stream) -> int:
Expand Down Expand Up @@ -93,7 +94,8 @@ def _xonly(self):

@classmethod
def from_xonly(cls, data: bytes):
assert len(data) == 32
if len(data) != 32:
raise ECError("Invalid xonly pubkey")
return cls.parse(b"\x02" + data)

def schnorr_verify(self, sig, msg_hash) -> bool:
Expand Down
9 changes: 6 additions & 3 deletions src/embit/liquid/pset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ def unblind(self, blinding_key):
return
# verify
gen = secp256k1.generator_generate_blinded(asset, in_abf)
assert gen == secp256k1.generator_parse(self.utxo.asset)
if gen != secp256k1.generator_parse(self.utxo.asset):
raise PSBTError("Invalid asset commitment")
cmt = secp256k1.pedersen_commit(vbf, value, gen)
assert cmt == secp256k1.pedersen_commitment_parse(self.utxo.value)
if cmt != secp256k1.pedersen_commitment_parse(self.utxo.value):
raise PSBTError("Invalid value commitment")

self.asset = asset
self.value = value
Expand Down Expand Up @@ -506,7 +508,8 @@ def unblind(self, blinding_key):
inp.unblind(blinding_key)

def txseed(self, seed: bytes):
assert len(seed) == 32
if len(seed) != 32:
raise PSBTError("Seed should be 32 bytes")
# get unique seed for this tx:
# we use seed + txid:vout + scriptpubkey as unique data for tagged hash
data = b"".join(
Expand Down
3 changes: 2 additions & 1 deletion src/embit/liquid/psetview.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

def skip_commitment(stream):
c = stream.read(1)
assert len(c) == 1
if len(c) != 1:
raise PSBTError("Unexpected end of stream")
if c == b"\x00": # None
return 1
if c == b"\x01": # unconfidential
Expand Down
27 changes: 18 additions & 9 deletions src/embit/liquid/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ class LTransactionError(TransactionError):

def read_commitment(stream):
c = stream.read(1)
assert len(c) == 1
if len(c) != 1:
raise TransactionError("Invalid commitment")
if c == b"\x00": # None
return None
if c == b"\x01": # unconfidential
r = stream.read(8)
assert len(r) == 8
if len(r) != 8:
raise TransactionError("Invalid commitment")
return int.from_bytes(r, "big")
# confidential
r = stream.read(32)
assert len(r) == 32
if len(r) != 32:
raise TransactionError("Invalid commitment")
return c + r


Expand All @@ -71,10 +74,14 @@ def unblind(
message_length=64,
) -> tuple:
"""Unblinds a range proof and returns value, asset, value blinding factor, asset blinding factor, extra data, min and max values"""
assert len(pubkey) in [33, 65]
assert len(blinding_key) == 32
assert len(value_commitment) == 33
assert len(asset_commitment) == 33
if len(pubkey) not in [33, 65]:
raise TransactionError("Invalid pubkey length")
if len(blinding_key) != 32:
raise TransactionError("Invalid blinding key length")
if len(value_commitment) != 33:
raise TransactionError("Invalid value commitment length")
if len(asset_commitment) != 33:
raise TransactionError("Invalid asset commitment length")
pub = secp256k1.ec_pubkey_parse(pubkey)
secp256k1.ec_pubkey_tweak_mul(pub, blinding_key)
sec = secp256k1.ec_pubkey_serialize(pub)
Expand Down Expand Up @@ -397,9 +404,11 @@ def __init__(self, nonce, entropy, amount_commitment, token_commitment=None):
@classmethod
def read_from(cls, stream):
nonce = stream.read(32)
assert len(nonce) == 32
if len(nonce) != 32:
raise TransactionError("Invalid nonce")
entropy = stream.read(32)
assert len(entropy) == 32
if len(entropy) != 32:
raise TransactionError("Invalid entropy")
amount_commitment = read_commitment(stream)
token_commitment = read_commitment(stream)
return cls(nonce, entropy, amount_commitment, token_commitment)
Expand Down
3 changes: 2 additions & 1 deletion src/embit/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def secure_randint(vmin: int, vmax: int) -> int:
"""
import math

assert vmax > vmin
if vmax <= vmin:
raise ValueError("vmax must be greater than vmin")
delta = vmax - vmin
nbits = math.ceil(math.log2(delta + 1))
randn = getrandbits(nbits)
Expand Down
6 changes: 4 additions & 2 deletions src/embit/psbtview.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ def view(cls, stream, offset=None, compress=CompressMode.KEEP_ALL):
num_outputs = compact.from_bytes(value)
elif key == b"\x00":
# we found global transaction
assert version != 2
assert (num_inputs is None) and (num_outputs is None)
if version == 2:
raise PSBTError("Global transaction with version 2 PSBT")
if (num_inputs is not None) or (num_outputs is not None):
raise PSBTError("Invalid global transaction")
tx_len = compact.read_from(stream)
cur += len(compact.to_bytes(tx_len))
tx_offset = cur
Expand Down
21 changes: 14 additions & 7 deletions src/embit/util/ctypes_secp256k1.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,16 +761,20 @@ def xonly_pubkey_from_pubkey(pubkey, context=_secp.ctx):

@locked
def schnorrsig_verify(sig, msg, pubkey, context=_secp.ctx):
assert len(sig) == 64
assert len(msg) == 32
assert len(pubkey) == 64
if len(sig) != 64:
raise ValueError("Signature should be 64 bytes long")
if len(msg) != 32:
raise ValueError("Message should be 32 bytes long")
if len(pubkey) != 64:
raise ValueError("Public key should be 64 bytes long")
res = _secp.secp256k1_schnorrsig_verify(context, sig, msg, pubkey)
return bool(res)


@locked
def keypair_create(secret, context=_secp.ctx):
assert len(secret) == 32
if len(secret) != 32:
raise ValueError("Secret key should be 32 bytes long")
keypair = bytes(96)
r = _secp.secp256k1_keypair_create(context, keypair, secret)
if r == 0:
Expand All @@ -782,11 +786,13 @@ def keypair_create(secret, context=_secp.ctx):
def schnorrsig_sign(
msg, keypair, nonce_function=None, extra_data=None, context=_secp.ctx
):
assert len(msg) == 32
if len(msg) != 32:
raise ValueError("Message should be 32 bytes long")
if len(keypair) == 32:
keypair = keypair_create(keypair, context=context)
with _lock:
assert len(keypair) == 96
if len(keypair) != 96:
raise ValueError("Keypair should be 96 bytes long")
sig = bytes(64)
r = _secp.secp256k1_schnorrsig_sign(
context, sig, msg, keypair, nonce_function, extra_data
Expand Down Expand Up @@ -916,7 +922,8 @@ def pedersen_blind_generator_blind_sum(
if res == 0:
raise ValueError("Failed to get the last blinding factor.")
res = (c_char * 32).from_address(address).raw
assert len(res) == 32
if len(res) != 32:
raise ValueError("Blinding factor should be 32 bytes long")
return res


Expand Down
Loading