Hi this is my code : import os
import boto3
import logging
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from cryptography.x509.oid import NameOID
from acme import client, messages, challenges
from acme.errors import PollError
from josepy import jwk
from datetime import datetime, timedelta
import time
Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
S3 client to upload files
s3 = boto3.client('s3')
Constants
S3_BUCKET_NAME = "abc-test-abc-000-test-submit1"
S3_CERTIFICATE_PATH = "ssl_auto_certs/certificate.pem"
S3_PRIVATE_KEY_PATH = "ssl_auto_certs/private_key.pem"
DOMAIN = "testvar.hydra.sophos.com"
EMAIL = "varun.vikram@sophos.com" # Update with your email
ACME Directory URL (for Let's Encrypt)
ACME_DIRECTORY_URL = "https://acme-staging-v02.api.letsencrypt.org/directory" # Production URL (use staging for testing)
Generate CSR and private key for both server and client authentication
def generate_csr_and_key(domain):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Create CSR with extensions for server and client authentication
csr_builder = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, DOMAIN),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "SOPHOS LIMITED"),
x509.NameAttribute(NameOID.COUNTRY_NAME, "GB"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Oxfordshire"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Abingdon"),
])
).add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
key_agreement=False,
data_encipherment=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
).add_extension(
x509.ExtendedKeyUsage([
x509.ExtendedKeyUsageOID.SERVER_AUTH, # For Server Authentication
x509.ExtendedKeyUsageOID.CLIENT_AUTH # For Client Authentication
]),
critical=True
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(f"*.{domain}"),
x509.DNSName(domain), # Wildcard for both server and client
]),
critical=False
)
# Sign the CSR with the private key
csr = csr_builder.sign(private_key, hashes.SHA256(), default_backend())
# Save CSR and private key to /tmp directory (Lambda temporary storage)
csr_path = "/tmp/certificate.csr"
private_key_path = "/tmp/private_key.pem"
with open(csr_path, "wb") as csr_file:
csr_file.write(csr.public_bytes(serialization.Encoding.PEM))
with open(private_key_path, "wb") as key_file:
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
logger.info(f"CSR and private key generated and saved to /tmp: {csr_path}, {private_key_path}")
return csr_path, private_key_path, csr, private_key
Upload to S3
def upload_to_s3(cert_path, private_key_path):
try:
# Upload CSR to S3
with open(cert_path, "rb") as csr_file:
s3.put_object(
Bucket=S3_BUCKET_NAME,
Key=S3_CERTIFICATE_PATH,
Body=csr_file,
ServerSideEncryption='AES256'
)
# Upload Private Key to S3
with open(private_key_path, "rb") as key_file:
s3.put_object(
Bucket=S3_BUCKET_NAME,
Key=S3_PRIVATE_KEY_PATH,
Body=key_file,
ServerSideEncryption='AES256'
)
logger.info("CSR and private key successfully uploaded to S3.")
except Exception as e:
logger.error(f"Failed to upload to S3: {e}")
raise
Perform DNS-01 Challenge for domain validation
def dns01_challenge(orderr, domain, private_key_jwk, client_acme):
route53 = boto3.client('route53')
dns_challenge = None
# Loop through the authorizations to find the DNS-01 challenge
for auth in orderr.authorizations:
logger.info(f"Authorization URL: {auth.uri}")
for challenge in auth.body.challenges:
logger.info(f"Challenge status: {challenge.status}, Challenge URL: {challenge.uri}")
if isinstance(challenge.chall, challenges.DNS01):
dns_challenge = challenge
logger.info(f"Found DNS-01 challenge: {challenge}")
break
if dns_challenge:
break
# If DNS-01 challenge is not found, raise an exception
if dns_challenge is None:
raise Exception("No DNS-01 challenge found.")
# Create TXT record for DNS validation
response, validation = dns_challenge.response_and_validation(private_key_jwk)
change_batch = {
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name':f"_acme-challenge.{domain}",
'Type': 'TXT',
'TTL': 60,
'ResourceRecords': [{'Value': f'"{validation}"'}]
}
}]
}
# Update Route 53 DNS record
try:
route53.change_resource_record_sets(HostedZoneId="Z07477012J1NZ3TLVGDW7", ChangeBatch=change_batch)
logger.info(f"DNS TXT record added for {dns_challenge.validation_domain_name(domain)}")
except Exception as e:
logger.error(f"Error adding DNS TXT record: {e}")
raise
# Wait for DNS propagation and finalize the order
client_acme.answer_challenge(dns_challenge, response)
finalized_orderr = client_acme.poll_and_finalize(orderr)
return finalized_orderr.fullchain_pem
#retries = 30
#for i in range(retries):
#try:
#finalized_orderr = client_acme.poll_and_finalize(order)
#logger.info(f"Certificate issued: {finalized_order.fullchain_pem}")
#return finalized_order.fullchain_pem # Return the certificate chain
#except PollError:
#wait_time = 30 * (i + 1)
#ogger.info(f"Retrying in {wait_time} seconds...")
#time.sleep(wait_time)
#raise Exception("DNS validation failed after retries.")
Request certificate from Let's Encrypt using CSR
def request_certificate_from_letsencrypt(csr, private_key):
# Create a client to interact with Let's Encrypt
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Generate JWK from private key using jwk.RSAKey
private_key_jwk = jwk.JWKRSA(key=private_key)
# Initialize ACME client
net = client.ClientNetwork(private_key_jwk, user_agent="AWS Lambda/ACME")
directory = client.ClientV2.get_directory(ACME_DIRECTORY_URL, net)
client_acme = client.ClientV2(directory, net=net)
# Register account with Let's Encrypt
account = client_acme.new_account(messages.NewRegistration.from_data(email=EMAIL, terms_of_service_agreed=True))
# Create new order with the CSR
orderr = client_acme.new_order(csr.public_bytes(serialization.Encoding.PEM))
# Perform DNS-01 challenge for domain validation
certificate = dns01_challenge(orderr, DOMAIN, private_key_jwk, client_acme)
# Save certificate and private key to /tmp directory (Lambda storage)
cert_path = "/tmp/certificate.pem"
private_key_path = "/tmp/private_key.pem"
with open(cert_path, "wb") as cert_file:
cert_file.write(certificate)
with open(private_key_path, "wb") as key_file:
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
logger.info(f"Certificate and private key saved to: {cert_path}, {private_key_path}")
return cert_path, private_key_path
Main handler function
def main(event, context):
try:
# Generate CSR and private key
csr_path, private_key_path, csr, private_key = generate_csr_and_key(DOMAIN)
# Request certificate from Let's Encrypt
cert_path, private_key_path = request_certificate_from_letsencrypt(csr, private_key)
# Upload certificate and private key to S3
upload_to_s3(cert_path, private_key_path)
except Exception as e:
logger.error(f"Error during certificate generation and upload: {e}")
raise
Lambda handler
def lambda_handler(event, context):
main(event, context)
I am trying to generate the certificate for my wildcard but all the time it is showing error.timeout can anyone help me on this , I am using route53 where the _acme-challenge. is created but don't know what I am missing .