Skip to content

Commit 906f916

Browse files
committed
WIP: add support for AES-256-CBC and AES-256-GCM
1 parent db9deba commit 906f916

File tree

2 files changed

+80
-17
lines changed

2 files changed

+80
-17
lines changed

block_io/__init__.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ def from_wif(privkey):
102102
class Helper:
103103

104104
@staticmethod
105-
def pinToAesKey(pin):
105+
def pinToAesKey(pin, salt = "", iterations = 2048, hashfn = sha256, phase1_key_length = 16, phase2_key_length = 32):
106106
# use pbkdf2 magic
107-
ret = pbkdf2.pbkdf2(pin, 16)
108-
ret = pbkdf2.pbkdf2(hexlify(ret), 32)
107+
# ( password, keylen, salt = "", itercount = 1024, hashfn = sha256 )
108+
ret = pbkdf2.pbkdf2(pin, phase1_key_length, salt, int(iterations/2), hashfn)
109+
ret = pbkdf2.pbkdf2(hexlify(ret), phase2_key_length, salt, int(iterations/2), hashfn)
109110
return hexlify(ret) # the encryption key
110111

111112
@staticmethod
@@ -115,7 +116,7 @@ def extractKey(encrypted_data, enc_key_hex):
115116
return BlockIo.Key.from_passphrase(unhexlify(decrypted))
116117

117118
@staticmethod
118-
def encrypt(data, key):
119+
def encrypt(data, key, iv = None, cipher_type = "AES-256-ECB", auth_data = None):
119120
# key in hex, data as string
120121
# returns ciphertext in base64
121122

@@ -125,28 +126,69 @@ def encrypt(data, key):
125126
pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
126127
unpad = lambda s : s[0:-s[-1]]
127128

128-
obj = AES.new(key, AES.MODE_ECB)
129-
ciphertext = obj.encrypt(pad(data).encode('latin-1'))
129+
obj = None
130130

131-
return base64.b64encode(ciphertext)
131+
if cipher_type == "AES-256-ECB":
132+
obj = AES.new(key, AES.MODE_ECB)
133+
elif cipher_type == "AES-256-CBC":
134+
obj = AES.new(key, AES.MODE_CBC, unhexlify(iv))
135+
elif cipher_type == "AES-256-GCM":
136+
obj = AES.new(key, AES.MODE_GCM, unhexlify(iv))
137+
else:
138+
raise Exception("Unsupported cipher", cipher_type)
139+
140+
response = {"aes_iv": iv, "aes_cipher_text": None, "aes_auth_tag": None, "aes_auth_data": None, "aes_cipher": cipher_type}
141+
142+
if cipher_type != "AES-256-GCM":
143+
response["aes_cipher_text"] = base64.b64encode(obj.encrypt(pad(data).encode('latin-1')))
144+
else:
145+
# AES-256-GCM
146+
# no padding for data
147+
ciphertext = obj.encrypt(data.encode('latin-1'))
148+
response["aes_cipher_text"] = base64.b64encode(ciphertext)
149+
response["aes_auth_tag"] = hexlify(obj.digest())
150+
151+
return response
132152

133153
@staticmethod
134-
def decrypt(b64data, key):
154+
def decrypt(b64data, key, iv = None, cipher_type = "AES-256-ECB", auth_tag = None):
135155
# key in hex, b64data as base64 string
136156
# returns utf-8 string
137157

138158
message = None
139159

140160
try:
141-
key = unhexlify(key) # get bytes
142161

162+
key = unhexlify(key) # get bytes
163+
143164
BS = 16
144165
pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
145166
unpad = lambda s : s[0:-s[-1]]
146-
167+
147168
data = base64.b64decode(b64data) # decode from base64
148-
obj = AES.new(key, AES.MODE_ECB)
149-
message = unpad(obj.decrypt(data))
169+
170+
obj = None
171+
172+
if cipher_type == "AES-256-ECB":
173+
obj = AES.new(key, AES.MODE_ECB)
174+
elif cipher_type == "AES-256-CBC":
175+
obj = AES.new(key, AES.MODE_CBC, unhexlify(iv))
176+
elif cipher_type == "AES-256-GCM":
177+
obj = AES.new(key, AES.MODE_GCM, unhexlify(iv))
178+
else:
179+
raise Exception("Unsupported cipher", cipher_type)
180+
181+
message = None
182+
183+
if cipher_type != "AES-256-GCM":
184+
message = unpad(obj.decrypt(data))
185+
else:
186+
# AES-256-GCM
187+
# Auth tag must be exactly 16 bytes
188+
if (len(auth_tag) != 32):
189+
raise Exception("Auth tag must be 16 bytes exactly.")
190+
message = obj.decrypt_and_verify(data, unhexlify(auth_tag))
191+
150192
except:
151193
# error decrypting? must be an invalid secret pin
152194
raise Exception('Invalid Secret PIN provided.')

tests/test_blockio.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,33 @@ def test_pin_derivation(self):
5353
key = BlockIo.Helper.pinToAesKey(self.pin)
5454
self.assertEqual(key, b'd0478c395b66e588a1518cdd08d8257aa2145a4c20bcd05c466afb334b7d18e7')
5555

56-
def test_aes_encryption(self):
56+
def test_pin_derivation_with_salt(self):
57+
key = BlockIo.Helper.pinToAesKey("deadbeef", "922445847c173e90667a19d90729e1fb", 500000)
58+
self.assertEqual(key, b'f206403c6bad20e1c8cb1f3318e17cec5b2da0560ed6c7b26826867452534172')
59+
60+
def test_aes256ecb_encryption(self):
5761
clear = "I\'m a little tea pot short and stout"
5862
key = BlockIo.Helper.pinToAesKey(self.pin)
59-
ciphertext = BlockIo.Helper.encrypt(clear, key)
60-
self.assertEqual(ciphertext, b'7HTfNBYJjq09+vi8hTQhy6lCp3IHv5rztNnKCJ5RB7cSL+NjHrFVv1jl7qkxJsOg')
61-
cleartext = BlockIo.Helper.decrypt(ciphertext, key)
63+
encrypted_data = BlockIo.Helper.encrypt(clear, key)
64+
self.assertEqual(encrypted_data['aes_cipher_text'], b'7HTfNBYJjq09+vi8hTQhy6lCp3IHv5rztNnKCJ5RB7cSL+NjHrFVv1jl7qkxJsOg')
65+
cleartext = BlockIo.Helper.decrypt(encrypted_data['aes_cipher_text'], key)
66+
self.assertEqual(cleartext, clear.encode('utf-8'))
67+
68+
def test_aes256cbc_encryption(self):
69+
clear = "beadbeef"
70+
key = BlockIo.Helper.pinToAesKey("deadbeef", "922445847c173e90667a19d90729e1fb", 500000)
71+
result = BlockIo.Helper.encrypt(clear, key, "11bc22166c8cf8560e5fa7e5c622bb0f", "AES-256-CBC")
72+
self.assertEqual(result['aes_cipher_text'], b'LExu1rUAtIBOekslc328Lw==')
73+
cleartext = BlockIo.Helper.decrypt(result['aes_cipher_text'], key, result['aes_iv'], result['aes_cipher'])
74+
self.assertEqual(cleartext, clear.encode('utf-8'))
75+
76+
def test_aes256gcm_encryption(self):
77+
clear = "beadbeef"
78+
key = BlockIo.Helper.pinToAesKey("deadbeef", "922445847c173e90667a19d90729e1fb", 500000)
79+
result = BlockIo.Helper.encrypt(clear, key, "a57414b88b67f977829cbdca", "AES-256-GCM")
80+
self.assertEqual(result['aes_cipher_text'], b'ELV56Z57KoA=')
81+
self.assertEqual(result['aes_auth_tag'], b'adeb7dfe53027bdda5824dc524d5e55a')
82+
cleartext = BlockIo.Helper.decrypt(result['aes_cipher_text'], key, result['aes_iv'], result['aes_cipher'], result['aes_auth_tag'])
6283
self.assertEqual(cleartext, clear.encode('utf-8'))
6384

6485
class TestKeys(unittest.TestCase):
@@ -96,5 +117,5 @@ def test_throws_blockio_api_error(self):
96117
self.blockio.get_balance()
97118
self.assertEquals(True,False) # will fail if we get here
98119
except BlockIoAPIError as e:
99-
self.assertDictEqual(e.get_raw_data(), {"status": "fail", "data": {"error_message": "Invalid API Key provided for this API version."}})
120+
self.assertEqual(e.get_raw_data()['data']['error_message'], "API Key invalid or you have not enabled API access for this machine's IP address(es). Check that your API Keys are correct, and that you have enabled API access for this machine's IP Address(es) on your account's Settings page.")
100121

0 commit comments

Comments
 (0)