0

I am struggling to get a successful signature validation when I send a test message through [email protected]. Below is my current relevant Python code after some trial and error. But I always get an InvalidSignature error on signing_cert.public_key().verify. Any ideas on what I am doing wrong?

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding


def create_string_to_sign(payload):
    string_to_sign = (
        f"""Message:\n{payload['Message']}\nMessageId:\n{payload['MessageId']}\n"""
    )

    # 'Subject' is optional, add it only if present in the payload
    if "Subject" in payload:
        string_to_sign += f"Subject:\n{payload['Subject']}\n"

    string_to_sign += f"""Timestamp:\n{payload['Timestamp']}\nTopicArn:\n{payload['TopicArn']}\nType:\n{payload['Type']}\n"""

    # Add 'UnsubscribeURL' only if present
    if "UnsubscribeURL" in payload:
        string_to_sign += f"UnsubscribeURL:\n{payload['UnsubscribeURL']}\n"

    return string_to_sign


class AmazonSNSSESWebhookView(WebhookView):
    """
    Validate and process webhook events from Amazon SNS for Amazon SES spam complaints.
    """

    def validate(self):
        """
        Sample payload from Amazon SNS
        {
        "Type" : "Notification",
        "MessageId" : "1c2a7465-1f6b-43a2-b92f-0f24b9c7f3c5",
        "TopicArn" : "arn:aws:sns:us-east-1:123456789012:SES_SpamComplaints",
        "Message" : "{\"notificationType\":\"Complaint\",\"complaint\":{\"complainedRecipients\":[{\"emailAddress\":\"[email protected]\"}],\"complaintFeedbackType\":\"abuse\",\"arrivalDate\":\"2024-09-25T14:00:00.000Z\"},\"mail\":{\"timestamp\":\"2024-09-25T13:59:48.000Z\",\"source\":\"[email protected]\",\"messageId\":\"1234567890\"}}",
        "Timestamp" : "2024-09-25T14:00:00.000Z",
        "SignatureVersion" : "1",
        "Signature" : "...",
        "SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService.pem",
        "UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe"
        }
        """
        payload = json.loads(self.request.body)

        # Ensure that the sender is actually Amazon SNS
        signing_cert_url = payload["SigningCertURL"]
        if not signing_cert_url.startswith(
            f"https://sns.{settings.AWS_SES_REGION_NAME}.amazonaws.com/"
        ):
            return False

        # We need to handle the subscription confirmation
        if payload["Type"] == "SubscriptionConfirmation":
            response = requests.get(payload["SubscribeURL"])

            if response.status_code != 200:
                logger.error(
                    f"Failed to confirm Amazon SNS subscription. Payload: {payload}"
                )
                return False
            else:
                return True

        # Message Type: Ensure that you only process SNS messages of type Notification.
        # There are other message types (e.g., SubscriptionConfirmation and UnsubscribeConfirmation),
        # which you may want to handle separately. For SubscriptionConfirmation,
        # you should respond to confirm the subscription.
        if payload["Type"] != "Notification":
            return False

        # Check that the message is recent, protect against replay attacks
        # Ignore if it is old
        sns_datetime = parser.parse(payload["Timestamp"])
        current_datetime = datetime.now(timezone.utc)
        time_window = timedelta(minutes=5)
        if sns_datetime < current_datetime - time_window:
            return False

        # Retrieve the certificate.
        signing_cert = x509.load_pem_x509_certificate(
            requests.get(signing_cert_url).content
        )
        decoded_signature = base64.b64decode(payload["Signature"])
        signature_hash = (
            hashes.SHA1() if payload["SignatureVersion"] == "1" else hashes.SHA256()
        )

        # Sign the string.
        string_to_sign = create_string_to_sign(payload)

        return signing_cert.public_key().verify(
            decoded_signature,
            string_to_sign.encode("UTF-8"),
            padding=padding.PKCS1v15(),
            algorithm=signature_hash,
        )
5
  • 1
    It's not really possible to reproduce your issue, without setting up the same SNS webhook, for which you don't really provide instructions. By looking at the specs here, your code seems to be correct, but given that you say it doesn't work, there isn't much more to go about. The cert you link in your example doesn't exist. You could include the cert and the signature, I don't believe they are secret. It's also not clear what lib you are using for hashes.SHA1(). Without this information, I can't help. Commented Oct 12, 2024 at 16:22
  • 1
    According to the specs the ‘UnsubscribeURL’ should not be added in the string to sign. It’s also not clear if the last new line character should be there. Commented Oct 12, 2024 at 18:14
  • 1
    The new lines are correct per the spec, but the UnsubscribeURL is wrong indeed. You only need to include the SubscribeURL, and only if present (in which case it's the 3rd field) Commented Oct 13, 2024 at 0:05
  • @tituszban I am using the cryptography module, added the imports now. Also I have removed the UnsubscribeURL but still it is failing. Commented Oct 16, 2024 at 10:20
  • @BotondBéres good, can you please include the actual signing cert URL (one that doesn't throw <UnknownOperationException/> when called), and the actual signature, so it can actually ran? Commented Oct 16, 2024 at 20:43

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.