diff --git a/setup.py b/setup.py index a10bfbe..c90f612 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ python_requires=">=3.9.11", install_requires=[ 'asn1crypto>=1.5.1', - 'cryptography>=41.0.3' + 'cryptography==41.0.3' ], extras_require={ 'tests': [ diff --git a/src/pymrtd/ef/__init__.py b/src/pymrtd/ef/__init__.py index 479a726..d5e7d3b 100644 --- a/src/pymrtd/ef/__init__.py +++ b/src/pymrtd/ef/__init__.py @@ -1,36 +1,31 @@ -from .base import ( - ElementaryFile, - ElementaryFileError, - LDSVersionInfo -) - -from .dg import ( - DataGroup, - DataGroupNumber, - DG1, - DG14, - DG15 -) - -from .mrz import ( - MachineReadableZone -) - -from .sod import ( - SOD, - SODError -) +from .base import ElementaryFile, ElementaryFileError, LDSVersionInfo +from .dg import DataGroup, DataGroupNumber +from .dg1 import DG1, DataGroup1 +from .dg2 import DG2, DataGroup2 +from .dg7 import DG7, DataGroup7 +from .dg11 import DG11, DataGroup11 +from .dg14 import DG14 +from .dg15 import DG15 +from .errors import NFCPassportReaderError +from .sod import SOD, SODError __all__ = [ + "ElementaryFile", + "ElementaryFileError", + "LDSVersionInfo", "DataGroup", "DataGroupNumber", + "DataGroup1", "DG1", + "DataGroup2", + "DG2", + "DataGroup7", + "DG7", + "DataGroup11", + "DG11", "DG14", "DG15", - "ElementaryFile", - "ElementaryFileError", - "LDSVersionInfo", - "MachineReadableZone", + "NFCPassportReaderError", "SOD", - "SODError" -] \ No newline at end of file + "SODError", +] diff --git a/src/pymrtd/ef/base.py b/src/pymrtd/ef/base.py index 6a29266..da10c15 100644 --- a/src/pymrtd/ef/base.py +++ b/src/pymrtd/ef/base.py @@ -1,41 +1,68 @@ import hashlib + import asn1crypto.core as asn1 import asn1crypto.parser as asn1Parser class LDSVersionInfo(asn1.Sequence): _fields = [ - ('ldsVersion', asn1.PrintableString), - ('unicodeVersion', asn1.PrintableString), + ("ldsVersion", asn1.PrintableString), + ("unicodeVersion", asn1.PrintableString), ] + class ElementaryFileError(ValueError): pass + class ElementaryFile(asn1.Asn1Value): _content_spec = None _str_rep = None - def __init__(self, explicit=None, implicit=None, no_explicit=False, tag_type=None, class_=None, tag=None, - optional=None, default=None, contents=None, method=None, spec=None): + def __init__( + self, + explicit=None, + implicit=None, + no_explicit=False, + tag_type=None, + class_=None, + tag=None, + optional=None, + default=None, + contents=None, + method=None, + spec=None, + ): if spec: self._content_spec = spec - super().__init__(explicit=explicit, implicit=implicit, no_explicit=no_explicit, tag_type=tag_type, class_=class_, tag=tag, - optional=optional, default=default, contents=contents, method=method) + super().__init__( + explicit=explicit, + implicit=implicit, + no_explicit=no_explicit, + tag_type=tag_type, + class_=class_, + tag=tag, + optional=optional, + default=default, + contents=contents, + method=method, + ) self._content = None - self._fp = None + self._fp = None def __str__(self): """ Returns string representation of self i.e. EF(fp=XXXXXXXXXXXXXXXX) """ if self._str_rep is None: - self._str_rep = f'EF(fp={self.fingerprint})' + self._str_rep = f"EF(fp={self.fingerprint})" return self._str_rep @classmethod - def load(cls, encoded_data: bytes, strict=False): #pylint: disable=arguments-differ - ''' + def load( + cls, encoded_data: bytes, strict=False + ): # pylint: disable=arguments-differ + """ Loads a BER/DER-encoded byte string using the current class as the spec :param encoded_data: A byte string of BER or DER encoded data @@ -44,29 +71,35 @@ def load(cls, encoded_data: bytes, strict=False): #pylint: disable=arguments-dif ValueError will be raised when trailing data exists :return: A instance of the current class - ''' + """ - class_, method, tag, header, contents, trailer = asn1Parser.parse(encoded_data, strict=strict) #pylint: disable=unused-variable + class_, method, tag, header, contents, trailer = asn1Parser.parse( + encoded_data, strict=strict + ) # pylint: disable=unused-variable value = cls(class_=class_, tag=tag, method=method, contents=contents) if cls.class_ is not None and value.class_ != cls.class_: - raise ElementaryFileError("Invalid elementary file class, expected class '{}' got '{}'" - .format( + raise ElementaryFileError( + "Invalid elementary file class, expected class '{}' got '{}'".format( asn1.CLASS_NUM_TO_NAME_MAP.get(cls.class_, cls.class_), - asn1.CLASS_NUM_TO_NAME_MAP.get(value.class_, value.class_) - )) + asn1.CLASS_NUM_TO_NAME_MAP.get(value.class_, value.class_), + ) + ) if cls.method is not None and value.method != cls.method: - raise ElementaryFileError("Invalid elementary file method , expected method '{}' got '{}'" - .format( + raise ElementaryFileError( + "Invalid elementary file method , expected method '{}' got '{}'".format( asn1.METHOD_NUM_TO_NAME_MAP.get(cls.method, cls.method), - asn1.METHOD_NUM_TO_NAME_MAP.get(value.method, value.method) - )) + asn1.METHOD_NUM_TO_NAME_MAP.get(value.method, value.method), + ) + ) if cls.tag is not None and value.tag != cls.tag: - raise ElementaryFileError(f"Invalid elementary file tag, expected tag '{cls.tag}' got '{value.tag}'") + raise ElementaryFileError( + f"Invalid elementary file tag, expected tag '{cls.tag}' got '{value.tag}'" + ) # Force parsing of content. This is done in order for any invalid content to raise an exception - value.content #pylint: disable=pointless-statement + value.content # pylint: disable=pointless-statement return value @property @@ -76,23 +109,23 @@ def fingerprint(self) -> str: """ if self._fp is None: d = hashlib.sha256(self.dump()).digest() - self._fp = d[0:8].hex().upper().rjust(16, '0') + self._fp = d[0:8].hex().upper().rjust(16, "0") return self._fp @property def content(self): - ''' Returns content object of a type content_type ''' + """Returns content object of a type content_type""" if self._content is None: self._parse_content() return self._content @property def native(self): - ''' + """ The native Python data type representation of this value :return: A native representation of content object or None. - ''' + """ if self.contents is None: return None @@ -102,12 +135,12 @@ def native(self): return self.content.native def _parse_content(self): - ''' + """ Parses the contents and generates Asn1Value content objects based on the definitions from _content_spec. :raises: ValueError - when an error occurs parsing content object - ''' + """ self._content = None if self.contents is None: @@ -115,15 +148,22 @@ def _parse_content(self): if self._content_spec is not None: if not issubclass(self._content_spec, asn1.Asn1Value): - raise ValueError(f'_content_spec must be of a Ans1Value type, not {self._content_spec!r}') + raise ValueError( + f"_content_spec must be of a Ans1Value type, not {self._content_spec!r}" + ) try: self._content = self._content_spec.load(self.contents, strict=True) if isinstance(self._content, (asn1.Sequence, asn1.SequenceOf)): - self._content._parse_children(recurse=True) #pylint: disable=protected-access + self._content._parse_children( + recurse=True + ) # pylint: disable=protected-access except (ValueError, TypeError) as e: - from asn1crypto._types import type_name #pylint: disable=import-outside-toplevel + from asn1crypto._types import ( + type_name, + ) # pylint: disable=import-outside-toplevel + self._content = None - args = e.args[1:] - e.args = (e.args[0] + f'\n while parsing {type_name(self)}',) + args + args = e.args[1:] + e.args = (e.args[0] + f"\n while parsing {type_name(self)}",) + args raise diff --git a/src/pymrtd/ef/dg.py b/src/pymrtd/ef/dg.py index 502a025..bd14c72 100644 --- a/src/pymrtd/ef/dg.py +++ b/src/pymrtd/ef/dg.py @@ -1,125 +1,31 @@ import asn1crypto.core as asn1 from asn1crypto.util import int_from_bytes -from asn1crypto.keys import PublicKeyInfo -from pymrtd.pki import keys, oids -from typing import Union #pylint: disable=wrong-import-order from .base import ElementaryFile -from .mrz import MachineReadableZone - -class ActiveAuthenticationInfoId(asn1.ObjectIdentifier): - _map = { - oids.id_icao_mrtd_security_aaProtocolObject: 'aa_info', - } - - -class ActiveAuthenticationInfo(asn1.Sequence): - _fields = [ - ('protocol', ActiveAuthenticationInfoId), - ('version', asn1.Integer), - ('signature_algorithm', keys.SignatureAlgorithmId) - ] - - -class ChipAuthenticationInfoId(asn1.ObjectIdentifier): - _map = { - oids.id_CA_DH_3DES_CBC_CBC : 'ca_dh_3des_cbc_cbc', - oids.id_CA_DH_AES_CBC_CMAC_128 : 'ca_dh_aes_cbc_cmac_128', - oids.id_CA_DH_AES_CBC_CMAC_192 : 'ca_dh_aes_cbc_cmac_192', - oids.id_CA_DH_AES_CBC_CMAC_256 : 'ca_dh_aes_cbc_cmac_256', - oids.id_CA_ECDH_3DES_CBC_CBC : 'ca_ecdh_3des_cbc_cbc', - oids.id_CA_ECDH_AES_CBC_CMAC_128 : 'ca_ecdh_aes_cbc_cmac_128', - oids.id_CA_ECDH_AES_CBC_CMAC_192 : 'ca_ecdh_aes_cbc_cmac_192', - oids.id_CA_ECDH_AES_CBC_CMAC_256 : 'ca_ecdh_aes_cbc_cmac_256' - } - - -class ChipAuthenticationInfo(asn1.Sequence): - _fields = [ - ('protocol', ChipAuthenticationInfoId), - ('version', asn1.Integer), - ('key_id', asn1.Integer, {'optional': True}) - ] - - -class ChipAuthenticationPublicKeyInfoId(asn1.ObjectIdentifier): - _map = { - oids.id_PK_DH : 'pk_dh', - oids.id_PK_ECDH : 'pk_ecdh' - } - - -class ChipAuthenticationPublicKeyInfo(asn1.Sequence): - _fields = [ - ('protocol', ChipAuthenticationPublicKeyInfoId), - ('chip_auth_public_key', PublicKeyInfo), - ('key_id', asn1.Integer, {'optional': True}) - ] - - -class DefaultSecurityInfo(asn1.Sequence): - _fields = [ - ('protocol', asn1.ObjectIdentifier), - ('required_data', asn1.Any), - ('optional', asn1.Any, {'optional': True}) - ] - - -class SecurityInfo(asn1.Choice): - _alternatives = [ - ('security_info', DefaultSecurityInfo), - ('aa_info', ActiveAuthenticationInfo), - ('chip_auth_info', ChipAuthenticationInfo), - ('chip_auth_pub_key_info', ChipAuthenticationPublicKeyInfo) - #Note: Missing PACEDomainParameterInfo and PACEInfo - ] - - def validate(self, class_, tag, contents): - """ this function select proper SecurityInfo choice index based on OID """ - oid = asn1.ObjectIdentifier.load(contents).dotted - - self._choice = 0 - for index, info in enumerate(self._alternatives): - toidm = info[1]._fields[0][1]._map #pylint: disable=protected-access - if toidm is not None and oid in toidm: - self._choice = index - return - - def parse(self): - if self._parsed is None: - super().parse() - if self.name == 'aa_info' or self.name == 'chip_auth_info': - if self._parsed['version'].native != 1: - from asn1crypto._types import type_name #pylint: disable=import-outside-toplevel - raise ValueError(f'{type_name(self._parsed)} version != 1') - return self._parsed - - -class SecurityInfos(asn1.SetOf): - _child_spec = SecurityInfo +from .errors import NFCPassportReaderError class DataGroupNumber(asn1.Integer): - min = 1 # DG min value - max = 16 # DG max value + min = 1 # DG min value + max = 16 # DG max value _map = { - 1: 'EF.DG1', - 2: 'EF.DG2', - 3: 'EF.DG3', - 4: 'EF.DG4', - 5: 'EF.DG5', - 6: 'EF.DG6', - 7: 'EF.DG7', - 8: 'EF.DG8', - 9: 'EF.DG9', - 10: 'EF.DG10', - 11: 'EF.DG11', - 12: 'EF.DG12', - 13: 'EF.DG13', - 14: 'EF.DG14', - 15: 'EF.DG15', - 16: 'EF.DG16' + 1: "EF.DG1", + 2: "EF.DG2", + 3: "EF.DG3", + 4: "EF.DG4", + 5: "EF.DG5", + 6: "EF.DG6", + 7: "EF.DG7", + 8: "EF.DG8", + 9: "EF.DG9", + 10: "EF.DG10", + 11: "EF.DG11", + 12: "EF.DG12", + 13: "EF.DG13", + 14: "EF.DG14", + 15: "EF.DG15", + 16: "EF.DG16", } @property @@ -138,12 +44,12 @@ def __ne__(self, other) -> bool: def set(self, value): if isinstance(value, int): - if value == 21: # DG2 tag + if value == 21: # DG2 tag value = 2 - elif value == 22: # DG4 tag + elif value == 22: # DG4 tag value = 4 elif value not in DataGroupNumber._map: - raise ValueError('Invalid data group number') + raise ValueError("Invalid data group number") super().set(value) def __hash__(self) -> int: @@ -159,68 +65,63 @@ def __str__(self): Returns string representation of self i.e. EF.DG(fp=XXXXXXXXXXXXXXXX) """ if self._str_rep is None: - self._str_rep = super().__str__()\ - .replace("EF(", f'{self.number.native}(', 1) + self._str_rep = ( + super().__str__().replace("EF(", f"{self.number.native}(", 1) + ) return self._str_rep @property def number(self) -> DataGroupNumber: return DataGroupNumber(self.tag) - -class DG1(DataGroup): - tag = 1 - _content_spec = MachineReadableZone - - @property - def mrz(self) -> MachineReadableZone: - return self.content - - @property - def native(self): - return { 'mrz': self.mrz.native } - - -class DG14(DataGroup): - tag = 14 - _content_spec = SecurityInfos - - @property - def aaInfo(self) -> Union[ActiveAuthenticationInfo, None]: - ''' Returns ActiveAuthenticationInfo if in list otherwise None. ''' - - # Loop over list of SecurityInfo objects and try to find ActiveAuthentication object - # Should contain only one ActiveAuthenticationInfo - for si in self.content: - if isinstance(si.chosen, ActiveAuthenticationInfo): - return si - return None - - @property - def aaSignatureAlgo(self) -> keys.SignatureAlgorithm: - ''' Returns SignatureAlgorithm object or None if DG doesn't contain one. ''' - - aai = self.aaInfo - if aai is None: - return None - - # Get signature algorithm - return keys.SignatureAlgorithm({ 'algorithm' : aai.native['signature_algorithm'] }) - - -class DG15(DataGroup): - tag = 15 - _content_spec = PublicKeyInfo - _aakey: keys.AAPublicKey - - @property - def aaPublicKeyInfo(self) -> PublicKeyInfo: - ''' Returns active authentication public key info ''' - return self.content - - @property - def aaPublicKey(self) -> keys.AAPublicKey: - ''' Returns active authentication public key ''' - if not hasattr(self, '_aakey'): - self._aakey = keys.AAPublicKey.load(self.aaPublicKeyInfo.dump()) - return self._aakey + def get_next_tag(self) -> int: + tag = 0 + + # Fix for some passports that may have invalid data - ensure that we do have data! + if len(self.data) <= self.pos: + raise NFCPassportReaderError(NFCPassportReaderError.INVALID_DATA) + + if self.bin_to_hex(self.data[self.pos : self.pos + 1]) & 0x0F == 0x0F: + tag = self.bin_to_hex(self.data[self.pos : self.pos + 2]) + self.pos += 2 + else: + tag = self.data[self.pos] + self.pos += 1 + + return tag + + def verify_tag(self, tag, valid_values): + if isinstance(valid_values, list): + if tag not in valid_values: + raise NFCPassportReaderError(NFCPassportReaderError.INVALID_TAG) + else: + if tag != valid_values: + raise NFCPassportReaderError("InvalidTag") + + def asn1_length(self, data: bytes) -> tuple: + if data[0] < 0x80: + return int(data[0]), 1 + if data[0] == 0x81: + return int(data[1]), 2 + if data[0] == 0x82: + val = int.from_bytes(data[1:3], byteorder="big") + return val, 3 + raise NFCPassportReaderError(NFCPassportReaderError.INVALID_LENGTH) + + def get_next_length(self) -> int: + end = self.pos + 4 if self.pos + 4 < len(self.data) else len(self.data) + length, len_offset = self.asn1_length(self.data[self.pos : end]) + self.pos += len_offset + return length + + def get_next_value(self) -> bytes: + length = self.get_next_length() + value = self.data[self.pos : self.pos + length] + self.pos += length + return value + + def bin_to_int(self, data: bytes, offset: int, length: int) -> int: + return int.from_bytes(data[offset : offset + length], byteorder="big") + + def bin_to_hex(self, data: bytes) -> int: + return int.from_bytes(data, byteorder="big") diff --git a/src/pymrtd/ef/dg1.py b/src/pymrtd/ef/dg1.py new file mode 100644 index 0000000..ce32f20 --- /dev/null +++ b/src/pymrtd/ef/dg1.py @@ -0,0 +1,268 @@ +from datetime import date, datetime, timedelta +from enum import Enum +from typing import Optional + +import asn1crypto.core as asn1 + +from .dg import DataGroup + + +class DocumentType(Enum): + """ + Enumeration of possible mayjor document types. + """ + + Passport = "P" # DOC ICAO 9303-p4 4.2.2 specifies a capital letter 'P' as document code to define machine readable passport (MRP). + # One additional capital letter can follow 'P' at the discretion of the issuing State or organization, to designate + # other types of passports such as MRP issued to diplomatic staff, an MRP issued for travel on government business, or a passport issued for a special purpose. + # + # Additional note: From doc ICAO 9303-p4 4.2.2.1 notes 'm': + # "In documents other than passports, e.g. United Nations laissez-passer, seafarer’s identity document + # or refugee travel document, the official title of the document shall be indicated instead of “Passport”. + # However, the first character of the document code shall be P" + + +class DataGroup1(asn1.OctetString): + class_ = 1 + tag = 31 + _parsed = None + _type = None + + @classmethod + def load(cls, encoded_data: bytes, strict=False, **kwargs): + v: DataGroup1 = super().load(encoded_data, strict, **kwargs) + clen = len(v.contents) + # pylint: disable=protected-access + if clen == 90: + v._type = "td1" + elif clen == 72: + v._type = "td2" + elif clen == 88: + v._type = "td3" + else: + raise ValueError("Unknown MRZ type") + return v + + def __getitem__(self, key): + return self.native[key] + + @property + def country(self) -> str: # Issuing country + return self["country"] + + @property + def dateOfBirth(self) -> Optional[date]: # Could be None if date is not known + return self["date_of_birth"] + + @property + def dateOfExpiry(self) -> date: + return self["date_of_expiry"] + + @property + def documentCode(self) -> str: + return self["document_code"] + + @property + def documentNumber(self) -> str: + return self["document_number"] + + @property + def gender(self) -> str: + return self["gender"] + + @property + def name(self) -> str: + ni = self["name_identifiers"] + if len(ni) > 1: + return ni[-1] + return "" + + @property + def nationality(self) -> str: + return self["nationality"] + + @property + def native(self): + if self._parsed is None: + self.parse() + return self._parsed + + @property + def additionalData(self) -> str: + if self.type == "td1": + return ( + self["optional_data_1"] + if len(self["optional_data_1"]) + else self["optional_data_2"] + ) + return self["optional_data"] + + @property + def surname(self) -> str: + ni = self["name_identifiers"] + if len(ni) > 0: + return ni[0] + return "" + + @property + def type(self) -> str: + return self._type + + def toJson(self) -> dict: + return { + "type": self.type, + "doc_code": self.documentCode, + "doc_number": self.documentNumber, + "date_of_expiry": self.dateOfExpiry, + "surname": self.surname, + "name": self.name, + "date_of_birth": self.dateOfBirth, + "gender": self.gender, + "country": self.country, + "nationality": self.nationality, + "additional_data": self.additionalData, + } + + def parse(self): + self._parsed = {} + if self.type == "td1": + self._parse_td1() + elif self.type == "td2": + self._parse_td2() + elif self.type == "td3": + self._parse_td3() + else: + raise ValueError("Cannot parse unknown MRZ type") + + def _parse_td1(self): + self._parsed["document_code"] = self._read(0, 2) + self._parsed["country"] = self._read(2, 3) + self._parsed["document_number"] = self._read(5, 9) + self._parsed["document_number_cd"] = self._read_with_filter( + 14, 1 + ) # document number check digit, could be char '<' + self._parsed["optional_data_1"] = self._read(15, 15) + self._parsed["date_of_birth"] = self._read_date_of_birth(30, 6) + self._parsed["date_of_birth_cd"] = self._read_cd(36) # document dob check digit + self._parsed["gender"] = self._read(37, 1) + self._parsed["date_of_expiry"] = self._read_date_of_expiry(38, 6) + self._parsed["date_of_expiry_cd"] = self._read_cd( + 44 + ) # document doe check digit + self._parsed["nationality"] = self._read(45, 3) + self._parsed["optional_data_2"] = self._read(48, 11) + self._parsed["composite_cd"] = self._read_cd(59) + self._parsed["name_identifiers"] = self._read_name_identifiers(60, 30) + self._parseExtendedDocumentNumber() + + def _parse_td2(self): + self._parsed["document_code"] = self._read(0, 2) + self._parsed["country"] = self._read(2, 3) + self._parsed["name_identifiers"] = self._read_name_identifiers(5, 31) + self._parsed["document_number"] = self._read(36, 9) + self._parsed["document_number_cd"] = self._read_with_filter( + 45, 1 + ) # document number check digit + self._parsed["nationality"] = self._read(46, 3) + self._parsed["date_of_birth"] = self._read_date_of_birth(49, 6) + self._parsed["date_of_birth_cd"] = self._read_cd(55) # document dob check digit + self._parsed["gender"] = self._read(56, 1) + self._parsed["date_of_expiry"] = self._read_date_of_expiry(57, 6) + self._parsed["date_of_expiry_cd"] = self._read_cd( + 63 + ) # document doe check digit + self._parsed["optional_data"] = self._read(64, 7) + self._parsed["composite_cd"] = self._read_cd(71) + self._parseExtendedDocumentNumber() + + def _parse_td3(self): + self._parsed["document_code"] = self._read(0, 2) + self._parsed["country"] = self._read(2, 3) + self._parsed["name_identifiers"] = self._read_name_identifiers(5, 39) + self._parsed["document_number"] = self._read(44, 9) + self._parsed["document_number_cd"] = self._read_cd( + 53 + ) # document number check digit + self._parsed["nationality"] = self._read(54, 3) + self._parsed["date_of_birth"] = self._read_date_of_birth(57, 6) + self._parsed["date_of_birth_cd"] = self._read_cd(63) # document dob check digit + self._parsed["gender"] = self._read(64, 1) + self._parsed["date_of_expiry"] = self._read_date_of_expiry(65, 6) + self._parsed["date_of_expiry_cd"] = self._read_cd( + 71 + ) # document doe check digit + self._parsed["optional_data"] = self._read(72, 14) + self._parsed["optional_data_cd"] = self._read_cd(86) + self._parsed["composite_cd"] = self._read_cd(87) + + def _parseExtendedDocumentNumber(self): + # doc 9303 p10 page 30 + fn_opt_data = "optional_data_1" if self.type == "td1" else "optional_data" + if ( + self._parsed["document_number_cd"] == "<" + and len(self._parsed[fn_opt_data]) > 0 + ): + self._parsed["document_number"] += self._parsed[fn_opt_data][:-1] + self._parsed["document_number_cd"] = self._parsed[fn_opt_data][-1] + self._parsed[fn_opt_data] = "" + + def _read_with_filter(self, idx, len): + return self.contents[idx : idx + len].decode("ascii") + + def _read(self, idx, len): + return self._read_with_filter(idx, len).rstrip("<") + + def _read_cd(self, idx) -> int: + scd = self._read_with_filter(idx, 1) + if scd == "<": + return 0 + try: + return int(scd) + except ValueError: + raise ValueError( + f"Invalid check digit character '{scd}' in MRZ at position {idx}" + ) + + def _read_date(self, idx, len): + date = self._read_with_filter(idx, len) + if "<" in date: # In case of unknown date of birth + return None + try: + return datetime.strptime(date.rstrip("<"), "%y%m%d").date() + except ValueError: + raise ValueError(f"Invalid date format '{date}' in MRZ at position {idx}") + + def _read_date_of_birth(self, idx, len): + date = self._read_date(idx, len) + if ( + date is not None and date > datetime.today().date() + ): # reduce date for 100 years if greater then current date + days_per_year = 365.25 + date -= timedelta(days=(100 * days_per_year)) + return date + + def _read_date_of_expiry(self, idx, len): + date = self._read_date(idx, len) + if date is None: + raise ValueError("Invalid date of expiry in MRZ") + return date + + def _read_name_identifiers(self, idx, size): + name_field = self._read(idx, size) + ids = name_field.split("<<") + for i in range(0, len(ids)): + ids[i] = ids[i].replace("<", " ") + return tuple(ids) + + +class DG1(DataGroup): + tag = 1 + _content_spec = DataGroup1 + + @property + def mrz(self) -> DataGroup1: + return self.content + + @property + def native(self): + return {"mrz": self.mrz.native} diff --git a/src/pymrtd/ef/dg11.py b/src/pymrtd/ef/dg11.py new file mode 100644 index 0000000..1593274 --- /dev/null +++ b/src/pymrtd/ef/dg11.py @@ -0,0 +1,83 @@ +import asn1crypto.core as asn1 + +from .dg import DataGroup + + +class DataGroup11(asn1.OctetString, DataGroup): + class_ = 11 + tag = 11 + + def __init__(self, contents=None, **kwargs): + self.full_name = "" + self.personal_number = "" + self.date_of_birth = "" + self.place_of_birth = "" + self.address = "" + self.telephone = "" + self.profession = "" + self.title = "" + self.personal_summary = "" + self.proof_of_citizenship = "" + self.td_numbers = "" + self.custody_info = "" + + self.data = contents + self.pos = 0 + self.body = self.data[self.pos :] + + super().__init__(contents=contents, **kwargs) + + @property + def datagroup_type(self): + return "DG11" + + @classmethod + def load(cls, contents: bytes, strict=True): + instance = cls(contents=contents) + instance.parse() + return instance + + def parse(self): + tag = self.get_next_tag() + self.verify_tag(tag, 0x5C) + _ = self.get_next_value() + + while self.pos < len(self.data): + tag = self.get_next_tag() + value = self.get_next_value() + + if tag == 0x5F0E: + self.full_name = value.decode("utf-8") + elif tag == 0x5F10: + self.personal_number = value.decode("utf-8") + elif tag == 0x5F11: + self.place_of_birth = value.decode("utf-8") + elif tag == 0x5F42: + self.address = value.decode("utf-8") + elif tag == 0x5F12: + self.telephone = value.decode("utf-8") + elif tag == 0x5F13: + self.profession = value.decode("utf-8") + elif tag == 0x5F14: + self.title = value.decode("utf-8") + elif tag == 0x5F15: + self.personal_summary = value.decode("utf-8") + elif tag == 0x5F16: + self.proof_of_citizenship = value.decode("utf-8") + elif tag == 0x5F17: + self.td_numbers = value.decode("utf-8") + elif tag == 0x5F18: + self.custody_info = value.decode("utf-8") + + +class DG11(DataGroup): + tag = 11 + _content_spec = DataGroup11 + + @property + def personal_info(self) -> DataGroup11: + return self.content + + @property + def native(self): + return {"personal_info": self.personal_info} diff --git a/src/pymrtd/ef/dg14.py b/src/pymrtd/ef/dg14.py new file mode 100644 index 0000000..574b307 --- /dev/null +++ b/src/pymrtd/ef/dg14.py @@ -0,0 +1,127 @@ +from typing import Union + +import asn1crypto.core as asn1 +from asn1crypto.keys import PublicKeyInfo + +from pymrtd.pki import keys, oids + +from .dg import DataGroup + + +class ActiveAuthenticationInfoId(asn1.ObjectIdentifier): + _map = { + oids.id_icao_mrtd_security_aaProtocolObject: "aa_info", + } + + +class ActiveAuthenticationInfo(asn1.Sequence): + _fields = [ + ("protocol", ActiveAuthenticationInfoId), + ("version", asn1.Integer), + ("signature_algorithm", keys.SignatureAlgorithmId), + ] + + +class ChipAuthenticationInfoId(asn1.ObjectIdentifier): + _map = { + oids.id_CA_DH_3DES_CBC_CBC: "ca_dh_3des_cbc_cbc", + oids.id_CA_DH_AES_CBC_CMAC_128: "ca_dh_aes_cbc_cmac_128", + oids.id_CA_DH_AES_CBC_CMAC_192: "ca_dh_aes_cbc_cmac_192", + oids.id_CA_DH_AES_CBC_CMAC_256: "ca_dh_aes_cbc_cmac_256", + oids.id_CA_ECDH_3DES_CBC_CBC: "ca_ecdh_3des_cbc_cbc", + oids.id_CA_ECDH_AES_CBC_CMAC_128: "ca_ecdh_aes_cbc_cmac_128", + oids.id_CA_ECDH_AES_CBC_CMAC_192: "ca_ecdh_aes_cbc_cmac_192", + oids.id_CA_ECDH_AES_CBC_CMAC_256: "ca_ecdh_aes_cbc_cmac_256", + } + + +class ChipAuthenticationInfo(asn1.Sequence): + _fields = [ + ("protocol", ChipAuthenticationInfoId), + ("version", asn1.Integer), + ("key_id", asn1.Integer, {"optional": True}), + ] + + +class ChipAuthenticationPublicKeyInfoId(asn1.ObjectIdentifier): + _map = {oids.id_PK_DH: "pk_dh", oids.id_PK_ECDH: "pk_ecdh"} + + +class ChipAuthenticationPublicKeyInfo(asn1.Sequence): + _fields = [ + ("protocol", ChipAuthenticationPublicKeyInfoId), + ("chip_auth_public_key", PublicKeyInfo), + ("key_id", asn1.Integer, {"optional": True}), + ] + + +class DefaultSecurityInfo(asn1.Sequence): + _fields = [ + ("protocol", asn1.ObjectIdentifier), + ("required_data", asn1.Any), + ("optional", asn1.Any, {"optional": True}), + ] + + +class SecurityInfo(asn1.Choice): + _alternatives = [ + ("security_info", DefaultSecurityInfo), + ("aa_info", ActiveAuthenticationInfo), + ("chip_auth_info", ChipAuthenticationInfo), + ("chip_auth_pub_key_info", ChipAuthenticationPublicKeyInfo), + # Note: Missing PACEDomainParameterInfo and PACEInfo + ] + + def validate(self, class_, tag, contents): + """this function select proper SecurityInfo choice index based on OID""" + oid = asn1.ObjectIdentifier.load(contents).dotted + + self._choice = 0 + for index, info in enumerate(self._alternatives): + toidm = info[1]._fields[0][1]._map # pylint: disable=protected-access + if toidm is not None and oid in toidm: + self._choice = index + return + + def parse(self): + if self._parsed is None: + super().parse() + if self.name == "aa_info" or self.name == "chip_auth_info": + if self._parsed["version"].native != 1: + from asn1crypto._types import ( + type_name, + ) # pylint: disable=import-outside-toplevel + + raise ValueError(f"{type_name(self._parsed)} version != 1") + return self._parsed + + +class SecurityInfos(asn1.SetOf): + _child_spec = SecurityInfo + + +class DG14(DataGroup): + tag = 14 + _content_spec = SecurityInfos + + @property + def aaInfo(self) -> Union[ActiveAuthenticationInfo, None]: + """Returns ActiveAuthenticationInfo if in list otherwise None.""" + + # Loop over list of SecurityInfo objects and try to find ActiveAuthentication object + # Should contain only one ActiveAuthenticationInfo + for si in self.content: + if isinstance(si.chosen, ActiveAuthenticationInfo): + return si + return None + + @property + def aaSignatureAlgo(self) -> keys.SignatureAlgorithm: + """Returns SignatureAlgorithm object or None if DG doesn't contain one.""" + + aai = self.aaInfo + if aai is None: + return None + + # Get signature algorithm + return keys.SignatureAlgorithm({"algorithm": aai.native["signature_algorithm"]}) diff --git a/src/pymrtd/ef/dg15.py b/src/pymrtd/ef/dg15.py new file mode 100644 index 0000000..285f4de --- /dev/null +++ b/src/pymrtd/ef/dg15.py @@ -0,0 +1,23 @@ +from asn1crypto.keys import PublicKeyInfo + +from pymrtd.pki import keys + +from .dg import DataGroup + + +class DG15(DataGroup): + tag = 15 + _content_spec = PublicKeyInfo + _aakey: keys.AAPublicKey + + @property + def aaPublicKeyInfo(self) -> PublicKeyInfo: + """Returns active authentication public key info""" + return self.content + + @property + def aaPublicKey(self) -> keys.AAPublicKey: + """Returns active authentication public key""" + if not hasattr(self, "_aakey"): + self._aakey = keys.AAPublicKey.load(self.aaPublicKeyInfo.dump()) + return self._aakey diff --git a/src/pymrtd/ef/dg2.py b/src/pymrtd/ef/dg2.py new file mode 100644 index 0000000..1e27d70 --- /dev/null +++ b/src/pymrtd/ef/dg2.py @@ -0,0 +1,164 @@ +import asn1crypto.core as asn1 + +from .dg import DataGroup +from .errors import NFCPassportReaderError + + +class DataGroup2(asn1.OctetString, DataGroup): + class_ = 2 + tag = 21 + + def __init__(self, contents=None, **kwargs): + self.nr_images = 0 + self.version_number = 0 + self.length_of_record = 0 + self.number_of_facial_images = 0 + self.facial_record_data_length = 0 + self.nr_feature_points = 0 + self.gender = 0 + self.eye_color = 0 + self.hair_color = 0 + self.feature_mask = 0 + self.expression = 0 + self.pose_angle = 0 + self.pose_angle_uncertainty = 0 + self.face_image_type = 0 + self.image_data_type = 0 + self.image_width = 0 + self.image_height = 0 + self.image_color_space = 0 + self.source_type = 0 + self.device_type = 0 + self.quality = 0 + self.image_data = [] + + self.data = contents + self.pos = 0 + self.body = self.data[self.pos :] + + super().__init__(contents=contents, **kwargs) + + @property + def datagroup_type(self): + return "DG2" + + @classmethod + def load(cls, contents: bytes, strict=True): + instance = cls(contents=contents) + instance.parse() + return instance + + def parse(self): + tag = self.get_next_tag() + self.verify_tag(tag, 0x7F61) + self.get_next_length() + + # Tag should be 0x02 + tag = self.get_next_tag() + self.verify_tag(tag, 0x02) + value = self.get_next_value() + self.nr_images = int(value[0]) + + # Next tag is 0x7F60 + tag = self.get_next_tag() + self.verify_tag(tag, 0x7F60) + self.get_next_length() + + # Next tag is 0xA1 (Biometric Header Template) - don't care about this + tag = self.get_next_tag() + self.verify_tag(tag, 0xA1) + self.get_next_value() + + # Now we get to the good stuff - next tag is either 5F2E or 7F2E + tag = self.get_next_tag() + self.verify_tag(tag, [0x5F2E, 0x7F2E]) + value = self.get_next_value() + self.parse_iso19794_5(value) + + def parse_iso19794_5(self, data: bytes): + if not ( + data[0] == 0x46 and data[1] == 0x41 and data[2] == 0x43 and data[3] == 0x00 + ): + raise NFCPassportReaderError(NFCPassportReaderError.INVALID_DATA) + + offset = 4 + self.version_number = self.bin_to_int(data, offset, 4) + offset += 4 + self.length_of_record = self.bin_to_int(data, offset, 4) + offset += 4 + self.number_of_facial_images = self.bin_to_int(data, offset, 2) + offset += 2 + + self.facial_record_data_length = self.bin_to_int(data, offset, 4) + offset += 4 + self.nr_feature_points = self.bin_to_int(data, offset, 2) + offset += 2 + self.gender = self.bin_to_int(data, offset, 1) + offset += 1 + self.eye_color = self.bin_to_int(data, offset, 1) + offset += 1 + self.hair_color = self.bin_to_int(data, offset, 1) + offset += 1 + self.feature_mask = self.bin_to_int(data, offset, 3) + offset += 3 + self.expression = self.bin_to_int(data, offset, 2) + offset += 2 + self.pose_angle = self.bin_to_int(data, offset, 3) + offset += 3 + self.pose_angle_uncertainty = self.bin_to_int(data, offset, 3) + offset += 3 + + # Skip the feature points, 8 bytes per point + offset += self.nr_feature_points * 8 + + self.face_image_type = self.bin_to_int(data, offset, 1) + offset += 1 + self.image_data_type = self.bin_to_int(data, offset, 1) + offset += 1 + self.image_width = self.bin_to_int(data, offset, 2) + offset += 2 + self.image_height = self.bin_to_int(data, offset, 2) + offset += 2 + self.image_color_space = self.bin_to_int(data, offset, 1) + offset += 1 + self.source_type = self.bin_to_int(data, offset, 1) + offset += 1 + self.device_type = self.bin_to_int(data, offset, 2) + offset += 2 + self.quality = self.bin_to_int(data, offset, 2) + offset += 2 + + jpeg_header = bytes( + [0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46] + ) + jpeg2000_bitmap_header = bytes( + [0x00, 0x00, 0x00, 0x0C, 0x6A, 0x50, 0x20, 0x20, 0x0D, 0x0A] + ) + jpeg2000_codestream_bitmap_header = bytes([0xFF, 0x4F, 0xFF, 0x51]) + + if len(data) < offset + len(jpeg2000_codestream_bitmap_header): + raise NFCPassportReaderError(NFCPassportReaderError.UNKNOWN_IMAGE_FORMAT) + + if not ( + data[offset : offset + len(jpeg_header)] == jpeg_header + or data[offset : offset + len(jpeg2000_bitmap_header)] + == jpeg2000_bitmap_header + or data[offset : offset + len(jpeg2000_codestream_bitmap_header)] + == jpeg2000_codestream_bitmap_header + ): + raise NFCPassportReaderError(NFCPassportReaderError.UNKNOWN_IMAGE_FORMAT) + + self.image_data = list(data[offset:]) + + +class DG2(DataGroup): + tag = 21 + _content_spec = DataGroup2 + + @property + def portrait(self) -> DataGroup2: + return self.content + + @property + def native(self): + return {"portrait": self.portrait} diff --git a/src/pymrtd/ef/dg7.py b/src/pymrtd/ef/dg7.py new file mode 100644 index 0000000..10b8e71 --- /dev/null +++ b/src/pymrtd/ef/dg7.py @@ -0,0 +1,50 @@ +import asn1crypto.core as asn1 + +from .dg import DataGroup + + +class DataGroup7(asn1.OctetString, DataGroup): + class_ = 7 + tag = 7 + + def __init__(self, contents=None, **kwargs): + self.image_data = [] + + self.data = contents + self.pos = 0 + self.body = self.data[self.pos :] + + super().__init__(contents=contents, **kwargs) + + @property + def datagroup_type(self): + return "DG7" + + @classmethod + def load(cls, contents: bytes, strict=True): + instance = cls(contents=contents) + instance.parse() + return instance + + def parse(self): + tag = self.get_next_tag() + self.verify_tag(tag, 0x02) + _ = self.get_next_value() + + tag = self.get_next_tag() + self.verify_tag(tag, 0x5F43) + + self.image_data = self.get_next_value() + + +class DG7(DataGroup): + tag = 7 + _content_spec = DataGroup7 + + @property + def signature(self) -> DataGroup7: + return self.content + + @property + def native(self): + return {"signature": self.signature} diff --git a/src/pymrtd/ef/errors.py b/src/pymrtd/ef/errors.py new file mode 100644 index 0000000..05a40d9 --- /dev/null +++ b/src/pymrtd/ef/errors.py @@ -0,0 +1,12 @@ +class NFCPassportReaderError(Exception): + INVALID_DATA = "InvalidData" + INVALID_TAG = "InvalidTag" + INVALID_LENGTH = "InvalidLength" + UNKNOWN_IMAGE_FORMAT = "UnknownImageFormat" + + def __init__(self, message=""): + self.message = message + super().__init__(self.message) + + def __str__(self): + return self.message diff --git a/src/pymrtd/ef/mrz.py b/src/pymrtd/ef/mrz.py deleted file mode 100644 index 0479828..0000000 --- a/src/pymrtd/ef/mrz.py +++ /dev/null @@ -1,228 +0,0 @@ -from enum import Enum -import asn1crypto.core as asn1 -from datetime import datetime, date, timedelta -from typing import Optional - -class DocumentType(Enum): - """ - Enumeration of possible mayjor document types. - """ - Passport = 'P' # DOC ICAO 9303-p4 4.2.2 specifies a capital letter 'P' as document code to define machine readable passport (MRP). - # One additional capital letter can follow 'P' at the discretion of the issuing State or organization, to designate - # other types of passports such as MRP issued to diplomatic staff, an MRP issued for travel on government business, or a passport issued for a special purpose. - # - # Additional note: From doc ICAO 9303-p4 4.2.2.1 notes 'm': - # "In documents other than passports, e.g. United Nations laissez-passer, seafarer’s identity document - # or refugee travel document, the official title of the document shall be indicated instead of “Passport”. - # However, the first character of the document code shall be P" - -class MachineReadableZone(asn1.OctetString): - class_ = 1 - tag = 31 - _parsed = None - _type = None - - @classmethod - def load(cls, encoded_data: bytes, strict=False, **kwargs): - v:MachineReadableZone = super().load(encoded_data, strict, **kwargs) - clen = len(v.contents) - # pylint: disable=protected-access - if clen == 90: - v._type = 'td1' - elif clen == 72: - v._type = 'td2' - elif clen == 88: - v._type = 'td3' - else: - raise ValueError("Unknown MRZ type") - return v - - def __getitem__(self, key): - return self.native[key] - - @property - def country(self) -> str: # Issuing country - return self['country'] - - @property - def dateOfBirth(self) -> Optional[date]: # Could be None if date is not known - return self['date_of_birth'] - - @property - def dateOfExpiry(self) -> date: - return self['date_of_expiry'] - - @property - def documentCode(self) -> str: - return self['document_code'] - - @property - def documentNumber(self) -> str: - return self['document_number'] - - @property - def gender(self) -> str: - return self['gender'] - - @property - def name(self) -> str: - ni = self['name_identifiers'] - if len(ni) > 1: - return ni[-1] - return "" - - @property - def nationality(self) -> str: - return self['nationality'] - - @property - def native(self): - if self._parsed is None: - self.parse() - return self._parsed - - @property - def additionalData(self) -> str: - if self.type == 'td1': - return self['optional_data_1'] \ - if len(self['optional_data_1']) \ - else self['optional_data_2'] - return self['optional_data'] - - @property - def surname(self) -> str: - ni = self['name_identifiers'] - if len(ni) > 0: - return ni[0] - return "" - - @property - def type(self) -> str: - return self._type - - def toJson(self) -> dict: - return { - 'type' : self.type, - 'doc_code' : self.documentCode, - 'doc_number' : self.documentNumber, - 'date_of_expiry' : self.dateOfExpiry, - 'surname' : self.surname, - 'name' : self.name, - 'date_of_birth' : self.dateOfBirth, - 'gender' : self.gender, - 'country' : self.country, - 'nationality' : self.nationality, - 'additional_data' : self.additionalData - } - - def parse(self): - self._parsed = {} - if self.type == 'td1': - self._parse_td1() - elif self.type == 'td2': - self._parse_td2() - elif self.type == 'td3': - self._parse_td3() - else: - raise ValueError("Cannot parse unknown MRZ type") - - def _parse_td1(self): - self._parsed['document_code'] = self._read(0, 2) - self._parsed['country'] = self._read(2, 3) - self._parsed['document_number'] = self._read(5, 9) - self._parsed['document_number_cd'] = self._read_with_filter(14, 1) # document number check digit, could be char '<' - self._parsed['optional_data_1'] = self._read(15, 15) - self._parsed['date_of_birth'] = self._read_date_of_birth(30, 6) - self._parsed['date_of_birth_cd'] = self._read_cd(36) # document dob check digit - self._parsed['gender'] = self._read(37, 1) - self._parsed['date_of_expiry'] = self._read_date_of_expiry(38, 6) - self._parsed['date_of_expiry_cd'] = self._read_cd(44) # document doe check digit - self._parsed['nationality'] = self._read(45, 3) - self._parsed['optional_data_2'] = self._read(48, 11) - self._parsed['composite_cd'] = self._read_cd(59) - self._parsed['name_identifiers'] = self._read_name_identifiers(60, 30) - self._parseExtendedDocumentNumber() - - def _parse_td2(self): - self._parsed['document_code'] = self._read(0, 2) - self._parsed['country'] = self._read(2, 3) - self._parsed['name_identifiers'] = self._read_name_identifiers(5, 31) - self._parsed['document_number'] = self._read(36, 9) - self._parsed['document_number_cd'] = self._read_with_filter(45, 1) # document number check digit - self._parsed['nationality'] = self._read(46, 3) - self._parsed['date_of_birth'] = self._read_date_of_birth(49, 6) - self._parsed['date_of_birth_cd'] = self._read_cd(55) # document dob check digit - self._parsed['gender'] = self._read(56, 1) - self._parsed['date_of_expiry'] = self._read_date_of_expiry(57, 6) - self._parsed['date_of_expiry_cd'] = self._read_cd(63) # document doe check digit - self._parsed['optional_data'] = self._read(64, 7) - self._parsed['composite_cd'] = self._read_cd(71) - self._parseExtendedDocumentNumber() - - def _parse_td3(self): - self._parsed['document_code'] = self._read(0, 2) - self._parsed['country'] = self._read(2, 3) - self._parsed['name_identifiers'] = self._read_name_identifiers(5, 39) - self._parsed['document_number'] = self._read(44, 9) - self._parsed['document_number_cd'] = self._read_cd(53) # document number check digit - self._parsed['nationality'] = self._read(54, 3) - self._parsed['date_of_birth'] = self._read_date_of_birth(57, 6) - self._parsed['date_of_birth_cd'] = self._read_cd(63) # document dob check digit - self._parsed['gender'] = self._read(64, 1) - self._parsed['date_of_expiry'] = self._read_date_of_expiry(65, 6) - self._parsed['date_of_expiry_cd'] = self._read_cd(71) # document doe check digit - self._parsed['optional_data'] = self._read(72, 14) - self._parsed['optional_data_cd'] = self._read_cd(86) - self._parsed['composite_cd'] = self._read_cd(87) - - def _parseExtendedDocumentNumber(self): - # doc 9303 p10 page 30 - fn_opt_data = 'optional_data_1' if self.type == 'td1' else 'optional_data' - if self._parsed['document_number_cd'] == '<' and len(self._parsed[fn_opt_data]) > 0: - self._parsed['document_number'] += self._parsed[fn_opt_data][:-1] - self._parsed['document_number_cd'] = self._parsed[fn_opt_data][-1] - self._parsed[fn_opt_data] = "" - - def _read_with_filter(self, idx, len): - return self.contents[idx: idx + len].decode('ascii') - - def _read(self, idx, len): - return self._read_with_filter(idx, len).rstrip('<') - - def _read_cd(self, idx) -> int: - scd = self._read_with_filter(idx, 1) - if scd == '<': - return 0 - try: - return int(scd) - except: - raise ValueError(f"Invalid check digit character '{scd}' in MRZ at position {idx}") - - def _read_date(self, idx, len): - date = self._read_with_filter(idx, len) - if '<' in date: # In case of unknown date of birth - return None - try: - return datetime.strptime(date.rstrip('<'), '%y%m%d').date() - except: - raise ValueError(f"Invalid date format '{date}' in MRZ at position {idx}") - - def _read_date_of_birth(self, idx, len): - date = self._read_date(idx, len) - if date is not None and date > datetime.today().date(): # reduce date for 100 years if greater then current date - days_per_year = 365.25 - date -= timedelta(days=(100 * days_per_year)) - return date - - def _read_date_of_expiry(self, idx, len): - date = self._read_date(idx, len) - if date is None: - raise ValueError('Invalid date of expiry in MRZ') - return date - - def _read_name_identifiers(self, idx, size): - name_field = self._read(idx, size) - ids = name_field.split('<<') - for i in range(0, len(ids)): - ids[i] = ids[i].replace('<', ' ') - return tuple(ids) diff --git a/src/pymrtd/ef/sod.py b/src/pymrtd/ef/sod.py index ce5a146..16d3b79 100644 --- a/src/pymrtd/ef/sod.py +++ b/src/pymrtd/ef/sod.py @@ -1,42 +1,42 @@ +from typing import List, Optional, Union, cast + import asn1crypto.core as asn1 from asn1crypto.algos import DigestAlgorithm from asn1crypto.util import int_from_bytes - from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from pymrtd.pki import algo_utils, cms, oids, x509 -from typing import cast, List, Optional, Union from .base import ElementaryFile, LDSVersionInfo from .dg import DataGroup, DataGroupNumber + class SODError(Exception): pass + class LDSSecurityObjectVersion(asn1.Integer): - _map = { - 0: 'v0', - 1: 'v1' - } + _map = {0: "v0", 1: "v1"} @property def value(self): return int_from_bytes(self.contents, signed=True) + class DataGroupHash(asn1.Sequence): _fields = [ - ('dataGroupNumber', DataGroupNumber), - ('dataGroupHashValue', asn1.OctetString), + ("dataGroupNumber", DataGroupNumber), + ("dataGroupHashValue", asn1.OctetString), ] @property def number(self) -> DataGroupNumber: - return self['dataGroupNumber'] + return self["dataGroupNumber"] @property def hash(self) -> bytes: - return self['dataGroupHashValue'].native + return self["dataGroupHashValue"].native class DataGroupHashValues(asn1.SequenceOf): @@ -59,51 +59,51 @@ def find(self, dgNumber: DataGroupNumber) -> Union[DataGroupHash, None]: class LDSSecurityObject(asn1.Sequence): _fields = [ - ('version', LDSSecurityObjectVersion), - ('hashAlgorithm', DigestAlgorithm), - ('dataGroupHashValues', DataGroupHashValues), - ('ldsVersionInfo', LDSVersionInfo, {'optional': True}) + ("version", LDSSecurityObjectVersion), + ("hashAlgorithm", DigestAlgorithm), + ("dataGroupHashValues", DataGroupHashValues), + ("ldsVersionInfo", LDSVersionInfo, {"optional": True}), ] @property def version(self) -> LDSSecurityObjectVersion: - return self['version'] + return self["version"] @property def dgHashAlgo(self) -> DigestAlgorithm: - ''' Returns the hash algorithm that the hash values of data groups were produced with. ''' - return self['hashAlgorithm'] + """Returns the hash algorithm that the hash values of data groups were produced with.""" + return self["hashAlgorithm"] @property def dgHashes(self) -> DataGroupHashValues: - ''' Returns hash values of data groups. ''' - return self['dataGroupHashValues'] + """Returns hash values of data groups.""" + return self["dataGroupHashValues"] @property def ldsVersion(self) -> Union[LDSVersionInfo, None]: - ''' Returns the version of LDS. It can return None if version of this object is 0 ''' - return self['ldsVersionInfo'] + """Returns the version of LDS. It can return None if version of this object is 0""" + return self["ldsVersionInfo"] def getDgHasher(self) -> hashes.Hash: - ''' Returns hashes.Hash object of dgHashAlgo ''' - h = algo_utils.get_hash_algo_by_name(self.dgHashAlgo['algorithm'].native) + """Returns hashes.Hash object of dgHashAlgo""" + h = algo_utils.get_hash_algo_by_name(self.dgHashAlgo["algorithm"].native) return hashes.Hash(h, backend=default_backend()) def find(self, dgNumber: DataGroupNumber) -> Union[DataGroupHash, None]: - '''' + """' Returns DataGroupHash if DataGroupHashValues contains specific data group number, else None :param dgNumber: Data group number to find DataGroupHash object - ''' + """ assert isinstance(dgNumber, DataGroupNumber) return self.dgHashes.find(dgNumber) def contains(self, dg: DataGroup) -> bool: - '''' + """' Returns True if DataGroupHashValues has matching hash of data group, else False :param dg: Data group to find and compare hash value of - ''' + """ assert isinstance(dg, DataGroup) dgh = self.find(dg.number) if dgh is None: @@ -114,54 +114,50 @@ def contains(self, dg: DataGroup) -> bool: return h.finalize() == dgh.hash -class _DataChoice(asn1.Choice): # For OID '1.2.840.113549.1.7.1' +class _DataChoice(asn1.Choice): # For OID '1.2.840.113549.1.7.1' _alternatives = [ - ('data', asn1.OctetString), - ('ldsSecurityObject', LDSSecurityObject), + ("data", asn1.OctetString), + ("ldsSecurityObject", LDSSecurityObject), ] + class SODSignedData(cms.MrtdSignedData): _certificate_spec = x509.DocumentSignerCertificate cms.cms_register_encap_content_info_type( - 'ldsSecurityObject', - oids.id_mrtd_ldsSecurityObject, - LDSSecurityObject - ) - cms.cms_register_encap_content_info_type( - 'data', - oids.id_data, - _DataChoice + "ldsSecurityObject", oids.id_mrtd_ldsSecurityObject, LDSSecurityObject ) + cms.cms_register_encap_content_info_type("data", oids.id_data, _DataChoice) @property def content(self) -> LDSSecurityObject: - ''' overloads MrtdSignedData.content ''' + """overloads MrtdSignedData.content""" lso = super().content - if isinstance(lso, _DataChoice): # In case of OID '1.2.840.113549.1.7.1' + if isinstance(lso, _DataChoice): # In case of OID '1.2.840.113549.1.7.1' lso = lso.chosen if not isinstance(lso, LDSSecurityObject): - raise SODError('SignedData content is not not LDSSecurityObject') + raise SODError("SignedData content is not not LDSSecurityObject") return lso class SODContentInfo(cms.MrtdContentInfo): _signed_data_spec = SODSignedData + class SOD(ElementaryFile): class_ = 1 method = 1 - tag = 23 + tag = 23 _content_spec = SODContentInfo _allowedSodContentTypes = { - oids.id_data, # Some Chinese passports has this OID instead of id_mrtd_ldsSecurityObject + oids.id_data, # Some Chinese passports has this OID instead of id_mrtd_ldsSecurityObject } @classmethod def load(cls, encoded_data: bytes, strict=False) -> "SOD": - ''' + """ Loads EF.SOD from BER/DER-encoded byte string :param encoded_data: A byte string of BER or DER encoded data @@ -172,32 +168,46 @@ def load(cls, encoded_data: bytes, strict=False) -> "SOD": A instance of the `SOD` :raises: SODError - when an error occurs while parsing `encoded_data`. - ''' + """ try: # Parse parent type s = cast(cls, super(SOD, cls).load(encoded_data, strict=strict)) assert isinstance(s, SOD) ci = s.content - ctype = ci['content_type'].native - if ctype != 'signed_data': # ICAO 9303-10-p21 - raise SODError(f"Invalid content type: '{ctype}', expected 'signed_data'") + ctype = ci["content_type"].native + if ctype != "signed_data": # ICAO 9303-10-p21 + raise SODError( + f"Invalid content type: '{ctype}', expected 'signed_data'" + ) sdver = s.signedData.version.native - if sdver != 'v3': # ICAO 9303 part 10 - 4.6.2.2 - raise SODError(f'Invalid SignedData version: {sdver}') - - if not cls._valid_content_type(s.signedData.contentType, strict): #s.signedData.contentType.dotted != oids.id_mrtd_ldsSecurityObject: - raise SODError(f'Invalid encapContentInfo type: {s.signedData.contentType.dotted}, expected {oids.id_mrtd_ldsSecurityObject}') - - if not(0 <= s.ldsSecurityObject.version.value <= 1): - raise SODError(f'Unsupported LDSSecurityObject version: {s.ldsSecurityObject.version.value}, expected 0 or 1') - - assert isinstance(s.signedData.certificates[0], x509.DocumentSignerCertificate) if len(s.signedData.certificates) else True + if sdver != "v3": # ICAO 9303 part 10 - 4.6.2.2 + raise SODError(f"Invalid SignedData version: {sdver}") + + if not cls._valid_content_type( + s.signedData.contentType, strict + ): # s.signedData.contentType.dotted != oids.id_mrtd_ldsSecurityObject: + raise SODError( + f"Invalid encapContentInfo type: {s.signedData.contentType.dotted}, expected {oids.id_mrtd_ldsSecurityObject}" + ) + + if not (0 <= s.ldsSecurityObject.version.value <= 1): + raise SODError( + f"Unsupported LDSSecurityObject version: {s.ldsSecurityObject.version.value}, expected 0 or 1" + ) + + assert ( + isinstance(s.signedData.certificates[0], x509.DocumentSignerCertificate) + if len(s.signedData.certificates) + else True + ) assert isinstance(s.signedData.content, LDSSecurityObject) return s - except SODError: raise - except AssertionError: raise + except SODError: + raise + except AssertionError: + raise except Exception as e: raise SODError(e) @@ -218,21 +228,21 @@ def dump(self, force=False): @classmethod def _valid_content_type(cls, ct, strict): oid = ct.dotted - return oid == oids.id_mrtd_ldsSecurityObject or \ - (strict == False and oid in cls._allowedSodContentTypes) + return oid == oids.id_mrtd_ldsSecurityObject or ( + strict is False and oid in cls._allowedSodContentTypes + ) def __str__(self): """ Returns string representation of self i.e. EF.SOD(fp=XXXXXXXXXXXXXXXX) """ if self._str_rep is None: - self._str_rep = super().__str__()\ - .replace("EF(", "EF.SOD(", 1) + self._str_rep = super().__str__().replace("EF(", "EF.SOD(", 1) return self._str_rep @property def signedData(self) -> SODSignedData: - return self.content['content'] + return self.content["content"] @property def ldsSecurityObject(self) -> LDSSecurityObject: @@ -240,16 +250,18 @@ def ldsSecurityObject(self) -> LDSSecurityObject: @property def dscCertificates(self) -> Optional[List[x509.DocumentSignerCertificate]]: - ''' Returns list of document signer certificates if present, otherwise None. ''' + """Returns list of document signer certificates if present, otherwise None.""" return self.signedData.certificates - def getDscCertificate(self, si: cms.SignerInfo) -> Optional[x509.DocumentSignerCertificate]: - ''' + def getDscCertificate( + self, si: cms.SignerInfo + ) -> Optional[x509.DocumentSignerCertificate]: + """ Returns document signer certificates from the list of `dscCertificates` which signed `si` object. :param si: Signer object for which to return DSC certificate. :return: x509.DocumentSignerCertificate object or None if DSC is not found. :raises SODError: If `si` object is not version v1 or v3 - ''' + """ try: return self.signedData.getCertificate(si) except Exception as e: @@ -257,16 +269,16 @@ def getDscCertificate(self, si: cms.SignerInfo) -> Optional[x509.DocumentSignerC @property def signers(self) -> cms.SignerInfos: - ''' Returns list of SignerInfo which signed this file. ''' + """Returns list of SignerInfo which signed this file.""" return self.signedData.signers def verify(self, si: cms.SignerInfo, dsc: x509.DocumentSignerCertificate) -> None: - ''' + """ Verifies LdsSecurityObject was signed by `dsc`. :param si: The signer info object of `dsc` certificate. :param dsc: The DSC certificate which issued this EF.SOD. :raises: SODError - if verification fails or other some error occurs. - ''' + """ try: self.signedData.verify(si, dsc) except cms.MrtdSignedDataError as e: diff --git a/tests/ef/dg11_test.py b/tests/ef/dg11_test.py new file mode 100644 index 0000000..8b794b8 --- /dev/null +++ b/tests/ef/dg11_test.py @@ -0,0 +1,21 @@ +import pytest + +from pymrtd import ef + + +@pytest.mark.depends( + on=[ + "tests/ef/ef_base_test.py::test_ef_base", + "tests/ef/dg_base_test.py::test_dg_base", + ] +) +def test_dg11(): + assert issubclass(ef.DG11, ef.DataGroup) + + tv_dg11 = bytes.fromhex( + "6B305C065F0E5F2B5F115F0E0C546573743C3C5465737465725F2B0831393730313230315F110B4E6F727468616D70746F6E" + ) + dg11 = ef.DG11.load(tv_dg11) + assert dg11.dump() == tv_dg11 + assert dg11.personal_info.full_name == "Test<