Search code examples
pythonemailencryptionopensslsmime

Decode and retrieve an S/MIME encrypted email's body automatically (python)


How do I:

  1. Connect my mail inbox with python code to automatically get unread emails' ciphered content;
  2. Decode the S/MIME encrypted email (I have the priv. key);
  3. Retrieve the email's body in plain text;
  4. Check whether the body (or subject) matches a certain keyword ('test' for now) and print something when this is true;
  5. Use this code on a raspberry pi without having to do anything manually.

I've checked similar posts, but I can't manage to actually connect it to my mailbox and make it all work without having to do anything manually.

I'm using iOS'/iCloud's IMAP (imap.mail.me.com) and I'm using mail.login() right now, but it doesn't seem to actually get any content from unread emails.

The code I have right now:

import imaplib
import email
from email.header import decode_header
from OpenSSL import crypto
import time, datetime

# Email receiving configuration
imap_server = "imap.mail.me.com" # <-- iCloud's mail IMAP
imap_user = "[email protected]" # <- Email
imap_password = "example_password" # <-- Email App-Specific password

# Paths to your certificate and private key
cert_path = 'path\to\my\certificate.pem' #<-- Path to cert.pem
key_path = 'path\to\my\privatekey.pem' # <-- Path to privkey.pem

# Function to decrypt S/MIME email
def decrypt_smime(encrypted_message):
    try:
        # Load the certificate and private key
        with open(cert_path, 'rb') as f:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
        with open(key_path, 'rb') as f:
            key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())

        # Create a Crypto object and decrypt the email
        smime = crypto.SMIME()
        smime.load_cert(cert)
        smime.load_key(key)
        decrypted_message = smime.decrypt(encrypted_message)
        return decrypted_message.decode('utf-8')
    except Exception as e:
        print(f"Error decrypting S/MIME email: {e}")
        return None

# Function to process email payload
def process_email_payload(msg):
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == "application/pkcs7-mime":
                payload = part.get_payload(decode=True)
                payload = decrypt_smime(payload)
                if payload:
                    return payload.strip()
            elif content_type in ["text/plain", "text/html"]:
                payload = part.get_payload(decode=True).strip()
                return payload.decode('utf-8')
    else:
        payload = msg.get_payload(decode=True)
        if msg.get_content_type() == "application/pkcs7-mime":
            payload = decrypt_smime(payload)
        return payload.decode('utf-8')
    return None

# Function to check for new emails
def check_email():
    try:
        mail = imaplib.IMAP4_SSL(imap_server)
        mail.login(imap_user, imap_password)
        mail.select("inbox")

        result, data = mail.search(None, "UNSEEN")

        # Debugging information
        print(f"Search result {datetime.datetime.now():%H.%M.%S}: {result}") # <-- Also prints current time so I can differentiate the different outputs
        print(f"Search data: {data}")

        if result != "OK":
            print(f"Error searching Inbox: {result}")
            return

        if data[0] is None:
            print("No new emails.")
            return

        email_ids = data[0].split()
        if not email_ids:
            print("No new emails.")
            return

        print(f"Email IDs: {email_ids}")  # Debug email IDs

        for email_id in email_ids:
            result, msg_data = mail.fetch(email_id, "(RFC822)")
            print(f"Fetch result: {result}")  # Debug fetch result
            print(f"Fetch msg_data: {msg_data}")  # Debug msg_data

            if result != "OK":
                print(f"Error fetching email ID {email_id}: {result}")
                continue

            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    raw_email = response_part[1]
                    print(f"Raw email: {raw_email}")  # Debug raw email data
                    try:
                        msg = email.message_from_bytes(raw_email)
                        subject, encoding = decode_header(msg["Subject"])[0]
                        if isinstance(subject, bytes):
                            subject = subject.decode(encoding if encoding else 'utf-8')
                        sender = email.utils.parseaddr(msg["From"])[1]

                        print(f"Subject: {subject}")
                        print(f"From: {sender}")

                        payload = process_email_payload(msg)
                        if payload:
                            print(f"Payload: {payload}")  # Debug payload content
                        else:
                            print(f"Could not decode payload for email ID {email_id}")
                            continue

                        if "test" in payload:
                            print("Test prompt received")

                    except Exception as e:
                        print(f"Error decoding email ID {email_id}: {e}")
                        continue

        mail.close()
        mail.logout()
    except Exception as e:
        print(f"Failed to check email: {e}")

# Actually try to check and decode emails (loop)
while True:
    check_email()
    time.sleep(10)

best code as of now, programmed in Visual Studio Code on windows, not Linux

It does actively check for emails, but it doesn't fetch anything:

Fetch result: OK
Fetch msg_data: [b'0000 ()']

terminal output

and doesn't print subject, etc., because it isn't isinstance(response_part, tuple) I assume and msg_data is literally empty (basically).

And I've tried dozens of other possible solutions, but this seems to be the best for now.

So how do I actually actively check for unread emails and decode their body?

Please try your best with describing the solution, since I'm really interested in learning more information about this and I would like to improve my over-all coding skills. I'm also interested in what I did wrong.


Solution

  • I managed to make it work, the main problem was the way the mail was fetched. When changing this to:

    result, msg_data = mail.fetch(email_id, "(BODY.PEEK[])")

    It seems to work perfectly fine.

    Fully updated check_mail():

    def check_email():
        try:
            mail = imaplib.IMAP4_SSL(imap_server)
            mail.login(imap_user, imap_password)
            mail.select("inbox")
    
            result, data = mail.search(None, "UNSEEN")
    
            # Debugging information
            print(f"Search result {datetime.datetime.now():%H.%M.%S}: {result}")
            print(f"Search data: {data}")
    
            if result != "OK":
                print(f"Error searching Inbox: {result}")
                return
    
            if not data or not data[0]:
                print("No new emails.")
                return
    
            email_ids = data[0].split()
            if not email_ids:
                print("No new emails.")
                return
    
            # print(f"Email IDs: {email_ids}")  # Debug email IDs
    
            for email_id in email_ids:
                # Try fetching different parts of the email
                result, msg_data = mail.fetch(email_id, "(BODY.PEEK[])")
                # print(f"Fetch result: {result}")  # Debug fetch result
                # print(f"Fetch msg_data: {msg_data}")  # Debug msg_data
    
                if result != "OK":
                    print(f"Error fetching email ID {email_id}: {result}")
                    continue
    
                if not msg_data or msg_data == [b'']:
                    print(f"No data fetched for email ID {email_id}")
                    continue
    
                # Checking each part of the message data
                for response_part in msg_data:
                    # print(f"Response part: {response_part}")  # Debug each response part
                    if isinstance(response_part, tuple):
                        raw_email = response_part[1]
                        # print(f"Raw email: {raw_email[:500]}...")  # Debug raw email data (truncated)
                        try:
                            msg = email.message_from_bytes(raw_email)
                            subject, encoding = decode_header(msg["Subject"])[0]
                            if isinstance(subject, bytes):
                                subject = subject.decode(encoding if encoding else 'utf-8')
                            sender = email.utils.parseaddr(msg["From"])[1]
    
                            print(f"Subject: {subject}")
                            print(f"From: {sender}")
    
    
    
                            payload = process_email_payload(msg)
                            if payload:
                                # print(f"Payload: {payload}")  # Debug payload content
                                ""
                            else:
                                print(f"Could not decode payload for email ID {email_id}")
                                continue
                            
    
    
                            """
                            You can put all kinds of if statements here, for example:
    
                            if "test" in payload:
                                print("Test Prompt received")
    
                            To check if there's "test" in the body of the email, or
    
                            if "test" in subject:
                                print("Test Subject received")
    
                            To check if there's test in the body. You can also check the sender and do stuff based off of the sender, for example:
    
                            if sender == "[email protected]" and "test" in payload:
                                print("The sender of the email was (...) and there was 'test' in the email's body")
                            """
    
    
    
    
                            mail.store(email_id, '+FLAGS', '\\Seen')
                            mail.store(email_id, '+FLAGS', '\\Deleted')
                            # Automatically marks the sent email as "Seen/Read" and moves it to "Deleted/Bin".
                            
    
                        except Exception as e:
                            print(f"Error decoding email ID {email_id}: {e}")
                            continue
    
            mail.close()
            mail.logout()
        except Exception as e:
            print(f"Failed to check email: {e}")
    

    You can remove the comments for some prints to receive the raw body of the email, but this gives the cleanest output with the most important information.