Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 94 additions & 29 deletions src/hosts/qr.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,15 @@ async def scan(self, raw=True, chunk_timeout=0.5):

def check_animated(self, data: bytes):
try:
# Handle empty data
if not data:
return False

# Log data for debugging
print(f"QR: Checking animation for {len(data)} bytes")
if len(data) > 0:
print(f"QR: First 20 bytes hex: {data[:20].hex()}")

# should be only ascii characters
d = data.decode().strip().lower()
if d.startswith("ur:"): # ur:bytes or ur:crypto-psbt
Expand All @@ -590,9 +599,26 @@ def check_animated(self, data: bytes):
self.parse_prefix(data.split(b" ")[0])
return True
except Exception as e:
print("Exception at check animated", e)
print(f"QR: Exception in check_animated: {e}")
# Try alternative decoding methods for binary data
try:
# Check if it's base64 encoded
import base64
decoded = base64.b64decode(data, validate=True)
if len(decoded) >= 4 and decoded[:4] in [b'psbt', b'pset']:
print("QR: Detected base64 encoded PSBT/PSET")
return False
except:
pass

# Check for binary PSBT/PSET headers
if len(data) >= 4:
header = data[:4]
if header in [b'psbt', b'pset', b'\x70\x73\x62\x74']:
print(f"QR: Detected binary format: {header}")
return False

return False
return False

async def update(self):
if not self.scanning:
Expand Down Expand Up @@ -640,24 +666,52 @@ async def update(self):

def process_chunk(self):
"""Returns true when scanning complete"""
# should not be there if trigger mode or simulator
with open(self.tmpfile, "rb") as f:
c = f.read(len(SUCCESS))
while c == SUCCESS:
try:
# should not be there if trigger mode or simulator
with open(self.tmpfile, "rb") as f:
c = f.read(len(SUCCESS))
f.seek(-len(c), 1)
# check if it's bcur encoding
start = f.read(9).upper()
f.seek(-len(start), 1)
if start == b"UR:BYTES/":
self.bcur = True
return self.process_bcur(f)
# bcur2 encoding
elif start == b"UR:CRYPTO":
self.bcur2 = True
return self.process_bcur2(f)
else:
return self.process_normal(f)
while c == SUCCESS:
c = f.read(len(SUCCESS))
f.seek(-len(c), 1)

# Log chunk processing for debugging
debug_pos = f.tell()
debug_data = f.read()
f.seek(debug_pos)
print(f"QR: Processing chunk of {len(debug_data)} bytes")
if len(debug_data) > 0:
print(f"QR: First 20 bytes hex: {debug_data[:20].hex()}")

# check if it's bcur encoding
start = f.read(9).upper()
f.seek(-len(start), 1)
if start == b"UR:BYTES/":
self.bcur = True
return self.process_bcur(f)
# bcur2 encoding
elif start == b"UR:CRYPTO":
self.bcur2 = True
return self.process_bcur2(f)
else:
return self.process_normal(f)
except Exception as e:
print(f"QR: Error in process_chunk: {e}")
# Save debug information
try:
with open(self.tmpfile, "rb") as debug_f:
debug_data = debug_f.read()
with open(self.path + "/debug_error.txt", "w") as error_f:
error_f.write(f"Error: {e}\n")
error_f.write(f"Data length: {len(debug_data)}\n")
error_f.write(f"Data hex: {debug_data.hex()}\n")
try:
error_f.write(f"Data text: {debug_data.decode('utf-8', errors='ignore')}\n")
except:
pass
print(f"QR: Debug info saved to {self.path}/debug_error.txt")
except Exception as debug_e:
print(f"QR: Failed to save debug info: {debug_e}")
raise

def process_bcur2(self, f):
gc.collect()
Expand Down Expand Up @@ -746,18 +800,25 @@ def process_bcur(self, f):

def process_normal(self, f):
# check if it starts with pMofN
# Store position and read all data for better error handling
start_pos = f.tell()
all_data = f.read()
f.seek(start_pos)
print(f"QR: Processing normal format, total data length: {len(all_data)}")

chunk, char = read_until(f, b" ", max_len=10, return_on_max_len=True)
chunk = chunk or b""
if char is None:
if not self.animated:
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(chunk)
read_write(f, fout)
# Write all data to ensure nothing is lost
fout.write(all_data)
print(f"QR: Saved non-animated QR code, {len(all_data)} bytes")
return True
else:
self.stop_scanning()
raise HostError("Ivalid QR code part encoding: %r" % chunk)
raise HostError("Invalid QR code part encoding: %r" % chunk)
# space is there
if not self.animated:
if chunk.startswith(b"p") and b"of" in chunk:
Expand All @@ -771,21 +832,23 @@ def process_normal(self, f):
with open(fname, "wb") as fout:
read_write(f, fout)
self.parts[m - 1] = fname
print(f"QR: Started animated sequence {m}/{n}")
return False
# failed - not animated, just unfortunately similar data
except:
except Exception as parse_e:
print(f"QR: Failed to parse animated prefix '{chunk}': {parse_e}")
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(chunk)
fout.write(char)
read_write(f, fout)
# Use all_data to ensure complete data
fout.write(all_data)
print(f"QR: Saved as static QR code, {len(all_data)} bytes")
return True
else:
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(chunk)
fout.write(char)
read_write(f, fout)
# Use all_data to ensure complete data
fout.write(all_data)
print(f"QR: Saved static QR code, {len(all_data)} bytes")
return True
# expecting animated frame
m, n = self.parse_prefix(chunk)
Expand All @@ -795,6 +858,7 @@ def process_normal(self, f):
with open(fname, "wb") as fout:
read_write(f, fout)
self.parts[m - 1] = fname
print(f"QR: Received animated part {m}/{n}")
# all have non-zero len
if None not in self.parts:
self._stop_scanner()
Expand All @@ -803,6 +867,7 @@ def process_normal(self, f):
for part in self.parts:
with open(part, "rb") as fp:
read_write(fp, fout)
print("QR: Completed animated sequence")
return True
else:
return False
Expand Down