DNS01 challenge automation and wildcard certs through python code using acme

DNS01 : Has anyone achieved wildcard certs using python code with acme libraries and automated the dns01 challenge , let me know if anyone done so .

I have implemented the process to automate the whole SSL generation process along with auto renewals in my python project. I have spent 1-2 months on figuring out the whole process and to add all bits &pieces ,will try to save your time :slight_smile: . Let me know how can I help.

1 Like

please @Nagendra can you help me , it is very urgent for me , but my certificate is wildcard so could you please help me on this , it would be great help for me . thanks once again .

Hi @Nagendra if you could send you GitHub code snippet that you implemented I can refer from that .

Repeatedly asking individual people for help is not very polite. You haven’t even told us what help you need. Please be respectful of others. I’m sure it is urgent for you, but this is a forum, and you cannot expect prompt responses from others.

The certbot project on github uses Python and is available as the acme package. There are many plugins for different DNS providers. You can go look at that code.

4 Likes

sure , I am not trying to be impolite but if in anyway I am I apologize , I will wait for the reply in this forum , thanks .

Hey Varun, Below are the detailed steps we have implemented.

Create Account (One Time)

private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend())
user_key = josepy.JWKRSA(key=private_key)
net = client.ClientNetwork(user_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
client_acme = client.ClientV2(directory, net=net)
account_created = messages.NewRegistration.from_data(email="<your email address>", terms_of_service_agreed=True)
regr = client_acme.new_account(account_created)
pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8,  
    encryption_algorithm=serialization.NoEncryption()
)

Store the registered account object in pickle file for future Use

pkey_data = { "private_key": pem.decode(), }
with open(f"acme_account_regr.pickle", "wb") as file:
    state = (regr.to_json(),pkey_data)
    pickle.dump(state,file)

When request to generate new SSL for a Domain

1. Read Account object from pickle and prepare ACME Client

with open(f"acme_account_regr.pickle","rb") as file:
   regr_json,user_rsa_json = pickle.load(file)
regr_obj=messages.RegistrationResource.from_json(regr_json)
user_rsa_pem = user_rsa_json["private_key"].encode()
user_rsa_key = serialization.load_pem_private_key(
                user_rsa_pem,
                password=None,
                backend=default_backend()
            )
user_key = josepy.JWKRSA(key=user_rsa_key)
net = client.ClientNetwork(user_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
client_acme = client.ClientV2(directory, net=net)
client_acme.query_registration(regr_obj)

2.Generate Primary Key & CSR for the requested Domain

pkey = OpenSSL.crypto.PKey()
pkey.generate_key(OpenSSL.crypto.TYPE_RSA, CERT_PKEY_BITS)
pkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,pkey)
csr_pem = crypto_util.make_csr(pkey_pem, [<domain>])

3. Make request for Challenge

orderr = client_acme.new_order(csr_pem)
challenge = None
authz_list = orderr.authorizations

for authz in authz_list:
    for i in authz.body.challenges:
        if isinstance(i.chall, challenges.DNS01):
            challenge = i
         else:
             logging.info(f"-> Other challenge found for {domain} : {i.chall}")
if challenge is None:
    logging.info(f'No DNS challenge found! :: {challenge} ')
    return Response(
                status=status.HTTP_406_NOT_ACCEPTABLE, 
                data={"message":'Could not find the DNS challenge!'}, 
                content_type='application/json'
            )
response, validation = challenge.response_and_validation(client_acme.net.key)
validation_domain = challenge.chall.validation_domain_name(domain)
validation_value = challenge.chall.validation(client_acme.net.key)
if validation_domain is None or validation_value is None:
            return Response(
                status=status.HTTP_406_NOT_ACCEPTABLE, 
                data={"message":'Could not generate validation value or domain.'}, 
                content_type='application/json'
            )

4. Add TXT Record in your Respective Domain Provider (name=<validation_domain>, value=<validation_value>).

5. DNS Propagation might take sometime, so we have to store the generated objects somewhere so that we can retrieve after later time.

# Store pkey
with open(f"{private_key_name}", "wb") as f:
                    f.write(pkey)

# Store csr, orderr, challenge & response
with open(f'csr_chall_res.pickle', 'wb') as file:
                    state = (csr_pem,pkey,orderr.to_json(),challenge.to_json(),response.to_json())
                    pickle.dump(state, file)
# Store the domain & pickle files path to DB

6. Schedule a background task after sometime to check if the DNS propagation is done (I am using Celery for this)

tasks.domain_finalise(args=[<domain_id_from_db>,<counter=1>], eta=timezone.now()+timezone.timedelta(minutes=10))

7. Finalise the SSL (domain_finalise)

# Retrieve the Domain information from DB
# Check if Counter is more than 5 <I am retrying 5 times with 10 mins delay and triggering an email to concerned team for further actions>.
# Check if TXT record is propagated or not. if not propagated, then schedule the same task after 10 minutes with counter+1
# Retrieve the required objects from pickle files
# 1.Retrieve acme client object
with open(f"acme_account_regr.pickle", "rb") as file:
    regr_json, user_rsa_json = pickle.load(file)
regr_obj=messages.RegistrationResource.from_json(regr_json)
user_rsa_pem = user_rsa_json["private_key"].encode()
user_rsa_key = serialization.load_pem_private_key(
            user_rsa_pem,
            password=None,
            backend=default_backend()
        )
user_key = josepy.JWKRSA(key=user_rsa_key)
net = client.ClientNetwork(user_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
client_acme = client.ClientV2(directory, net=net)
client_acme.query_registration(regr_obj)
# 2. Retrieve csr , orderr objects
with open(csr_chall_pickle_path, "rb") as file:
    csr_obj,pkey_obj,order_resource_json,challenge_json,response_json=pickle.load(file)
# Process the challenge
orderr = client_acme.new_order(csr_obj)
challenge_obj = messages.ChallengeBody.from_json(challenge_json)
response_obj, validation_obj = challenge_obj.response_and_validation(client_acme.net.key)
client_acme.answer_challenge(challenge_obj, response_obj)
finalized_orderr = client_acme.poll_and_finalize(orderr)

8. Save the certificate to necessary directories and reload webserver

fullchain_pem = finalized_orderr.fullchain_pem
cert_path = "<folder_path>fullchain_cert.pem"
with open(cert_path, "w") as f:
            f.write(fullchain_pem)
# Store the certificate to appropriate path and soft reload the web server (**nginx -s reload**)

9. Set up the Renewal Process

# schedule a job before 30 days of certificate expiration (As suggested by let's encrypt to renew the certificates before 30 days of expiration)
ssl_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)
expiration_date = ssl_cert.get_notAfter().decode()
utc_expiration_datetime = pytz.utc.localize(datetime.strptime(expiration_date, '%Y%m%d%H%M%SZ'))
renewal_datetime = utc_expiration_datetime - timedelta(days=30)
# You can schedule task as per your preferred tools. In the scheduled task function, Use same / similar function as above background task.

Let Me know if this helps. :slight_smile:

Thanks a lot @Nagendra for the code , I will look into this and let me know if any further help is required but thanks again mate.

1 Like