Search code examples
pythoncryptographyx509applepaypython-cryptography

Python pkcs#7 x509 chain of trust with cryptography


I'm working on a python library to process apple pay payload into usable card details. To do so I follow the official documentation here.

Everything is working perfectly except for 2 verification steps:

Step 1.c.

Ensure that there is a valid X.509 chain of trust from the signature to the root CA. Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate, that the leaf certificate is signed by the intermediate CA, and that the intermediate CA is signed by the Apple Root CA - G3.

Here I ended up doing the 2 last check by the following code using the cryptography lib:

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA

def verify_root_ca_chain_of_trust(
    trusted_root_ca: x509.Certificate,
    intermediate_cert: x509.Certificate,
    leaf_cert: x509.Certificate
) -> None:
    try:
        # verify that the intermediate CA is signed by the Apple Root CA - G3
        trusted_pub = trusted_root_ca.public_key()
        trusted_pub.verify(
            intermediate_cert.signature,
            intermediate_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
        # verify that the leaf certificate is signed by the intermediate CA
        trusted_intermediate_pub = intermediate_cert.public_key()
        trusted_intermediate_pub.verify(
            leaf_cert.signature,
            leaf_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
    except TypeError as err:
        raise CustomError('error') from err

My problem here is that I only implement 2/3 of the checks required. What I can't figure out how to do is:

Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate

Step 1.d. and 1.e.

d. For ECC (EC_v1), ensure that the signature is a valid ECDSA signature (ecdsa-with-SHA256 1.2.840.10045.4.3.2) of the concatenated values of the ephemeralPublicKey, data, transactionId, and applicationData keys.

e. Inspect the CMS signing time of the signature, as defined by section 11.3 of RFC 5652. If the time signature and the transaction time differ by more than a few minutes, it's possible that the token is a replay attack.

what I tried so far:

def validate_token_signature(
    trusted_cert: x509.Certificate,
    signature: str,
    payment_data: str,
    ephemeral_pub: str,
    transaction_id: str,
    application_data: str = None,
) -> None:
    data_byte: bytes = base64.b64decode(ephemeral_pub)
    payment_data_byte: bytes = base64.b64decode(payment_data)
    transaction_id_byte: bytes = bytes.fromhex(transaction_id)
    data: bytes = data_byte + payment_data_byte + transaction_id_byte
    if application_data is not None:
        application_data_byte: bytes = base64.b64decode(application_data)
        data = data + application_data_byte
    try:
        trusted_leaf_pub = trusted_cert.public_key()
        trusted_leaf_pub.verify(base64.b64decode(signature), data, ECDSA(hashes.SHA256()))
    except InvalidSignature as err:
        print(err)
        # raise SignatureError('error') from err```

Here they do not specified which signature to verify, I presume it's the PKCS#7 but cryptography only process pkcs#7 as a list of x509 certs as stated here:

Deserialize a PEM encoded PKCS7 blob to a list of certificates. PKCS7 can contain many other types of data, including CRLs, but this function will ignore everything except certificates.

Is there a way to do those checks with python cryptography or do I have to use another lib like pyopenssl or something else?


Solution

  • To read the cms signing time and validate the pkcs7 signature i needed to get acces to the cms content and signer info. This can't be achive with pkcs#7 modules from m2crypto or cryptography. What i did endedup using was asn1crypto.

    Step 1.c and 1.d are part of the same checks so i did keep the abovechecks for the chain of trust:

    def verify_root_ca_chain_of_trust(
        trusted_root_ca: x509.Certificate,
        intermediate_cert: x509.Certificate,
        leaf_cert: x509.Certificate
    ) -> None:
        try:
            # verify that the intermediate CA is signed by the Apple Root CA - G3
            trusted_pub = trusted_root_ca.public_key()
            trusted_pub.verify(
                intermediate_cert.signature,
                intermediate_cert.tbs_certificate_bytes,
                ECDSA(hashes.SHA256())
            )
            # verify that the leaf certificate is signed by the intermediate CA
            trusted_intermediate_pub = intermediate_cert.public_key()
            trusted_intermediate_pub.verify(
                leaf_cert.signature,
                leaf_cert.tbs_certificate_bytes,
                ECDSA(hashes.SHA256())
            )
        except TypeError as err:
            raise CustomError('error') from err
    

    To do the last part check 'Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate' i did use asn1crypto:

        from asn1crypto import cms
        from asn1crypto import core
    
        def validate_token_signature(
            trusted_cert: x509.Certificate,
            signature: bytes,
            payment_data: bytes,
            ephemeral_pub: bytes,
            transaction_id: bytes,
            application_data: bytes,
        ) -> None:
            
            signed_data = cms.ContentInfo.load(signature)['content']
            algo = signed_data['digest_algorithms'][0]['algorithm'].native
            signers_info = signed_data['signer_infos']
            attr_signature = signers_info[0].native['signature']
            attrs = signers_info[0]['signed_attrs']
    
            # Insure data signer and cert signer match
            cert_issuer: str = trusted_cert.issuer.rdns[0].rfc4514_string().split("=")[1]
            signed_data_issuer: str = dict(dict(dict(signers_info.native[0])['sid'])['issuer'])['common_name']
            if not cert_issuer == signed_data_issuer:
                raise CustomError('error')
    
            # Verify that cert is still validate now
            if not trusted_cert.not_valid_before < datetime.now() < trusted_cert.not_valid_after:
                raise CustomError('error')
    
            # user data
            udata: bytes = ephemeral_pub + payment_data + transaction_id + application_data
    
            mdData = getattr(hashlib, algo)(udata).digest()
    
            if attrs is not None and not isinstance(attrs, core.Void):
                # if attrs, mdSigned == message_digest attribute
                mdSigned = None
                for attr in attrs:
                    if attr['type'].native == 'message_digest':
                        mdSigned = attr['values'].native[0]
                signedData = attrs.dump()
                signedData = b'\x31' + signedData[1:]
            else:
                # if no attrs, mdSigned == hash of userdata
                mdSigned = mdData
                signedData = udata
    
            # 2- verify() must succeed succeeded
            try:
                trusted_cert.public_key().verify(attr_signature, signedData, ECDSA(hashes.SHA256()))
            except InvalidSignature:
                raise CustomError('error')
    
            # 3- hashok must be True
            if not mdData == mdSigned:
                raise CustomError('error')
    

    for the cms signing time verification (1.e) i did also use the data get through the asn1 object:

        from asn1crypto import cms
    
        def cms_compare(p7: bytes) -> None:
            ci = cms.ContentInfo.load(p7)
            try:
                content = dict(ci.native['content'])
                signed_time: str = dict(dict(content['signer_infos'][0])['signed_attrs'][1])['values'][0]
                timed: datetime = datetime.strptime(str(signed_time), "%Y-%m-%d %H:%M:%S%z")
    
                if int(time.time()) - int(timed.strftime('%s')) > 30:
                    raise CustomError('error')
            except TypeError:
                raise CustomError('error')