diff --git a/_codeql_detected_source_root b/_codeql_detected_source_root new file mode 120000 index 00000000..945c9b46 --- /dev/null +++ b/_codeql_detected_source_root @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/app/src/cose.h b/app/src/cose.h index b6d19025..48532d89 100644 --- a/app/src/cose.h +++ b/app/src/cose.h @@ -64,6 +64,13 @@ namespace scitt::cose static constexpr int64_t COSE_HEADER_PARAM_FEED = 392; static constexpr int64_t COSE_HEADER_PARAM_SCITT_RECEIPTS = 394; + // Unprotected header label for the registration transaction ID embedded in + // COSE receipts. The value is a text string (tstr) of the transaction ID, + // e.g. "7.145". This allows clients to correlate a receipt with the ledger + // entry they retrieved it from. + static constexpr const char* COSE_HEADER_PARAM_REGISTRATION_TX = + "registration_tx"; + static constexpr int64_t COSE_CWT_CLAIM_ISS = 1; static constexpr int64_t COSE_CWT_CLAIM_SUB = 2; static constexpr int64_t COSE_CWT_CLAIM_IAT = 6; diff --git a/app/src/main.cpp b/app/src/main.cpp index 94253799..61657fec 100644 --- a/app/src/main.cpp +++ b/app/src/main.cpp @@ -123,9 +123,14 @@ namespace scitt * from a CCF TxReceiptImplPtr obtained through a historical query. * The proof format is described in * https://datatracker.ietf.org/doc/draft-birkholz-cose-receipts-ccf-profile/ + * + * The registration_tx parameter is embedded as a text string in the + * unprotected header under the "registration_tx" key, allowing clients to + * correlate the receipt with the ledger entry transaction ID. */ std::vector get_cose_receipt( - const ccf::TxReceiptImplPtr& receipt_ptr) + const ccf::TxReceiptImplPtr& receipt_ptr, + const std::string& registration_tx) { auto proof = ccf::describe_merkle_proof_v1(*receipt_ptr); if (!proof.has_value()) @@ -139,18 +144,119 @@ namespace scitt throw InternalCborError("Failed to get COSE signature"); } + // Parse the COSE signature to extract phdr, payload, and signature bytes. + // We re-encode the receipt manually so we can include both the inclusion + // proof and the registration_tx entry in a single unprotected header. + UsefulBufC sig_buf{signature->data(), signature->size()}; + QCBORDecodeContext dctx; + QCBORDecode_Init(&dctx, sig_buf, QCBOR_DECODE_MODE_NORMAL); + + QCBORItem item; + QCBORDecode_EnterArray(&dctx, nullptr); + if (QCBORDecode_GetError(&dctx) != QCBOR_SUCCESS) + { + throw InternalCborError("Failed to parse COSE signature outer array"); + } + + // Protected header (bstr) + QCBORDecode_GetNext(&dctx, &item); + if (item.uDataType != QCBOR_TYPE_BYTE_STRING) + { + throw InternalCborError("Failed to get protected header from COSE signature"); + } + UsefulBufC phdr{item.val.string.ptr, item.val.string.len}; + + // Skip the existing unprotected header + QCBORDecode_VGetNextConsume(&dctx, &item); + + // Capture the payload as pre-encoded CBOR bytes + size_t pos_start = 0; + size_t pos_end = 0; + auto partial_err = QCBORDecode_PartialFinish(&dctx, &pos_start); + if (partial_err != QCBOR_ERR_ARRAY_OR_MAP_UNCONSUMED) + { + throw InternalCborError("Failed to find start of payload in COSE signature"); + } + QCBORDecode_VGetNextConsume(&dctx, &item); + partial_err = QCBORDecode_PartialFinish(&dctx, &pos_end); + if (partial_err != QCBOR_ERR_ARRAY_OR_MAP_UNCONSUMED) + { + throw InternalCborError("Failed to find end of payload in COSE signature"); + } + UsefulBufC payload{signature->data() + pos_start, pos_end - pos_start}; + + // Signature bytes (bstr) + QCBORDecode_GetNext(&dctx, &item); + if (item.uDataType != QCBOR_TYPE_BYTE_STRING) + { + throw InternalCborError("Failed to get signature bytes from COSE signature"); + } + UsefulBufC sig_bytes{item.val.string.ptr, item.val.string.len}; + + QCBORDecode_ExitArray(&dctx); + if (QCBORDecode_Finish(&dctx) != QCBOR_SUCCESS) + { + throw InternalCborError("Failed to finish parsing COSE signature"); + } + // See // https://datatracker.ietf.org/doc/draft-ietf-cose-merkle-tree-proofs/ - // Page 11, vdp is the label for verifiable-proods in the unprotected + // Page 11, vdp is the label for verifiable-proofs in the unprotected // header of the receipt const int64_t vdp = 396; // -1 is the label for inclusion-proofs - auto inclusion_proof = ccf::cose::edit::pos::AtKey{-1}; - ccf::cose::edit::desc::Value inclusion_desc{inclusion_proof, vdp, *proof}; - - auto cose_receipt = - ccf::cose::edit::set_unprotected_header(*signature, inclusion_desc); - return cose_receipt; + const int64_t inclusion_proof_key = -1; + + // Allocate output buffer with enough space for the receipt. + // The unprotected header will contain: + // {vdp: {inclusion_proof_key: [proof]}, "registration_tx": txid} + size_t additional_size = + 3 * QCBOR_HEAD_BUFFER_SIZE + sizeof(vdp) + // vdp sub-map + 3 * QCBOR_HEAD_BUFFER_SIZE + + sizeof(inclusion_proof_key) + // inclusion proof array + QCBOR_HEAD_BUFFER_SIZE + proof->size() + // proof bytes + QCBOR_HEAD_BUFFER_SIZE + + strlen(scitt::cose::COSE_HEADER_PARAM_REGISTRATION_TX) + // key + QCBOR_HEAD_BUFFER_SIZE + registration_tx.size(); // txid value + + std::vector output( + signature->size() + additional_size + QCBOR_HEAD_BUFFER_SIZE); + UsefulBuf output_buf{output.data(), output.size()}; + + QCBOREncodeContext ectx; + QCBOREncode_Init(&ectx, output_buf); + QCBOREncode_AddTag(&ectx, CBOR_TAG_COSE_SIGN1); + QCBOREncode_OpenArray(&ectx); + QCBOREncode_AddBytes(&ectx, phdr); + QCBOREncode_OpenMap(&ectx); + + // Add inclusion proof: {vdp: {inclusion_proof_key: [proof_bytes]}} + QCBOREncode_OpenMapInMapN(&ectx, vdp); + QCBOREncode_OpenArrayInMapN(&ectx, inclusion_proof_key); + QCBOREncode_AddBytes(&ectx, {proof->data(), proof->size()}); + QCBOREncode_CloseArray(&ectx); + QCBOREncode_CloseMap(&ectx); + + // Add registration_tx: {"registration_tx": txid} + QCBOREncode_AddTextToMap( + &ectx, + scitt::cose::COSE_HEADER_PARAM_REGISTRATION_TX, + cbor::from_string(registration_tx)); + + QCBOREncode_CloseMap(&ectx); + QCBOREncode_AddEncoded(&ectx, payload); + QCBOREncode_AddBytes(&ectx, sig_bytes); + QCBOREncode_CloseArray(&ectx); + + UsefulBufC cose_output; + QCBORError err = QCBOREncode_Finish(&ectx, &cose_output); + if (err != QCBOR_SUCCESS) + { + throw InternalCborError("Failed to encode COSE receipt"); + } + output.resize(cose_output.len); + output.shrink_to_fit(); + return output; } class AppEndpoints : public ccf::UserEndpointRegistry @@ -427,7 +533,9 @@ namespace scitt } SCITT_DEBUG("Get receipt from the ledger"); - auto cose_receipt = get_cose_receipt(historical_state->receipt); + auto cose_receipt = get_cose_receipt( + historical_state->receipt, + historical_state->transaction_id.to_str()); ctx.rpc_ctx->set_response_body(cose_receipt); ctx.rpc_ctx->set_response_header( @@ -470,7 +578,9 @@ namespace scitt } SCITT_DEBUG("Get receipt from the ledger"); - auto cose_receipt = get_cose_receipt(historical_state->receipt); + auto cose_receipt = get_cose_receipt( + historical_state->receipt, + historical_state->transaction_id.to_str()); // See https://datatracker.ietf.org/doc/draft-ietf-scitt-architecture/ // Section 4.4, 394 is the label for an array of receipts in the diff --git a/pyscitt/pyscitt/crypto.py b/pyscitt/pyscitt/crypto.py index 395cd7d8..a181a7bc 100644 --- a/pyscitt/pyscitt/crypto.py +++ b/pyscitt/pyscitt/crypto.py @@ -89,6 +89,12 @@ class SCITTReceipts(CoseHeaderAttribute): # Other expected CWT claims CWT_SVN = "svn" # AMD Security Version Number +# Unprotected header label for the registration transaction ID in COSE receipts. +# The value is a text string (tstr) identifying the ledger entry transaction, +# e.g. "7.145". This allows clients to correlate a receipt with the ledger entry +# they retrieved it from. +REGISTRATION_TX = "registration_tx" + def ec_curve_from_name(name: str) -> EllipticCurve: if name == "P-256": diff --git a/test/test_verify.py b/test/test_verify.py index cfbd61f3..6c7c94e7 100644 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -18,10 +18,75 @@ from pycose.headers import KID from pycose.messages import Sign1Message -from pyscitt.crypto import CWT_ISS, CWTClaims +from pyscitt.crypto import CWT_ISS, REGISTRATION_TX, CWTClaims, embed_receipt_in_cose +from pyscitt.receipt import cbor_to_printable from pyscitt.verify import DynamicTrustStore, DynamicTrustStoreClient +def _make_receipt_bytes(txid: str, uhdr: dict | None = None) -> bytes: + """Build a minimal COSE_Sign1 receipt with registration_tx in the unprotected header.""" + phdr_encoded = cbor2.dumps({1: -7}) # alg: ES256 + if uhdr is None: + uhdr = { + 396: {-1: [b"proof_bytes"]}, # verifiable-proofs / inclusion-proof + REGISTRATION_TX: txid, + } + return cbor2.dumps(cbor2.CBORTag(18, [phdr_encoded, uhdr, None, b"\x00" * 64])) + + +class TestReceiptRegistrationTx: + """Tests verifying that the registration_tx unprotected header is handled.""" + + def test_receipt_bytes_contain_registration_tx(self): + """A receipt built with registration_tx in the uhdr can be decoded and + the txid value is accessible.""" + txid = "350.9219" + receipt_bytes = _make_receipt_bytes(txid) + + decoded = cbor2.loads(receipt_bytes) + assert decoded.tag == 18 # COSE_Sign1 + uhdr = decoded.value[1] + assert REGISTRATION_TX in uhdr + assert uhdr[REGISTRATION_TX] == txid + + def test_cbor_to_printable_includes_registration_tx(self): + """cbor_to_printable correctly renders the registration_tx field.""" + txid = "7.145" + uhdr = { + 396: {-1: [b"proof_bytes"]}, + REGISTRATION_TX: txid, + } + result = cbor_to_printable(uhdr) + assert result.get(REGISTRATION_TX) == txid + + def test_embed_receipt_in_cose_preserves_registration_tx(self): + """When a receipt with registration_tx is embedded into a signed statement, + the registration_tx value is preserved in the embedded receipt.""" + txid = "350.9219" + receipt_bytes = _make_receipt_bytes(txid) + + # Create a minimal signed statement (COSE_Sign1) + stmt_phdr_encoded = cbor2.dumps({1: -7}) + stmt = cbor2.dumps( + cbor2.CBORTag(18, [stmt_phdr_encoded, {}, b"payload", b"\x00" * 64]) + ) + + # Embed the receipt into the statement + result_bytes = embed_receipt_in_cose(stmt, receipt_bytes) + result = cbor2.loads(result_bytes) + + # The embedded receipt is in uhdr[394][0] as a CBORTag + embedded_receipt = result.value[1][394][0] + assert isinstance(embedded_receipt, cbor2.CBORTag) + embedded_uhdr = embedded_receipt.value[1] + assert REGISTRATION_TX in embedded_uhdr + assert embedded_uhdr[REGISTRATION_TX] == txid + + def test_registration_tx_constant_value(self): + """The REGISTRATION_TX constant has the expected string value.""" + assert REGISTRATION_TX == "registration_tx" + + class TestDynamicTrustStore: def test_init_creates_client_if_none_provided(self): trust_store = DynamicTrustStore()