Urn:ietf:params:acme:error:unauthorized

ChallengeResource(body=ChallengeBody(chall=DNS01(token=b'h\xeeU}\xc1\xd1\xbd\xb19\x04&]co\xafN\x03+\xda\xc4_V7.\xd8\x9f\xe9\xcb\xa0\x0f2O'), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776987/b37seQ', status=Status(pending), validated=None, error=None), authzr_uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776987')
ChallengeResource(body=ChallengeBody(chall=DNS01(token=b";g\xdb\t\xfd\xe8\xc5\r\x96\x06\xd5\x9f\xb0\xe0\x18\xf6l'\xd3{k$\xbeX\xa10\x8eo\x17w8\xd0"), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776997/biRjWg', status=Status(pending), validated=None, error=None), authzr_uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776997')
AuthorizationResource(body=Authorization(identifier=Identifier(typ=IdentifierType(dns), value='xixsxj.cn'), challenges=(ChallengeBody(chall=DNS01(token=b'h\xeeU}\xc1\xd1\xbd\xb19\x04&]co\xafN\x03+\xda\xc4_V7.\xd8\x9f\xe9\xcb\xa0\x0f2O'), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776987/b37seQ', status=Status(invalid), validated=datetime.datetime(2023, 5, 17, 2, 57, 20, tzinfo=), error=Error(typ='urn:ietf:params:acme:error:unauthorized', title=None, detail='Incorrect TXT record "O2fbCf3oxQ2WBtWfsOAY9mwn03trJL5YoTCObxd3ONA" (and 1 more) found at _acme-challenge.xixsxj.cn', identifier=None, subproblems=None)),), status=Status(invalid), expires=datetime.datetime(2023, 5, 24, 2, 55, 19, tzinfo=), wildcard=True), uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776987', new_cert_uri=None)
<Response [200]>

My domain is: lzkj

I ran this command: acme.client.ClientV2 answer_challenge

It produced this output: python3 acme.client.ClientV2

My web server is (include version): python3

The operating system my web server runs on is (include version): PyCharm Community Edition 2022.2.4

My hosting provider, if applicable, is:

I can login to a root shell on my machine (yes or no, or I don't know): yes

I'm using a control panel to manage my site (no, or provide the name and version of the control panel):

The version of my client is (e.g. output of certbot --version or certbot-auto --version if you're using Certbot):

import time
from enum import Enum
from acme import client, messages, challenges
from acme.errors import Error as AcmeError
import OpenSSL
import josepy as jose
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import os


# 生成CSR证书
def create_csr(*args: dict) -> dict:
    """
    args:   common_name (str).
            country (str).
            state (str).
            city (str).
            organization (str).
            organizational_unit (str).
            email_address (str).
            dns_names(set)
            extensions(any)
        Returns:
            (str, str).  Tuple containing private key and certificate
            signing request (PEM).
        """
    dic_t = args[0]
    common_name = dic_t.get('common_name')
    country = dic_t.get('country')
    state = dic_t.get('state')
    city = dic_t.get('city')
    organization = dic_t.get('organization')
    organizational_unit = dic_t.get('organizational_unit')
    email_address = dic_t.get('email_address')
    dns_names = dic_t.get("dns_names")
    extensions_add = dic_t.get("extensions")

    key = OpenSSL.crypto.PKey()
    key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)

    req = OpenSSL.crypto.X509Req()
    req.get_subject().CN = common_name
    if country:
        req.get_subject().C = country
    if state:
        req.get_subject().ST = state
    if city:
        req.get_subject().L = city
    if organization:
        req.get_subject().O = organization
    if organizational_unit:
        req.get_subject().OU = organizational_unit
    if email_address:
        req.get_subject().emailAddress = email_address

    if all([dns_names, extensions_add]):
        try:
            san_extension = ", ".join(["DNS:" + san for san in list(dns_names)])
            extensions = [
                OpenSSL.crypto.X509Extension(b"subjectAltName", False, san_extension.encode())
            ]
            req.add_extensions(extensions)
        except Exception as e:
            print("create csr error %s" % e)
            data = {'private_key': "", 'csr': "", "certificate_request": None}
            return data

    req.set_pubkey(key)
    req.sign(key, 'sha256')

    private_key = OpenSSL.crypto.dump_privatekey(
        OpenSSL.crypto.FILETYPE_PEM, key)

    csr = OpenSSL.crypto.dump_certificate_request(
        OpenSSL.crypto.FILETYPE_PEM, req)

    # if os.path.exists(settings.MEDIA_ROOT2 + common_name):
    #     shutil.rmtree(settings.MEDIA_ROOT2 + common_name)
    # os.makedirs(settings.MEDIA_ROOT2 + common_name)

    # cert_key_file_path_name = settings.MEDIA_ROOT2 + common_name + "/" + "%s.key" % common_name
    os.makedirs("./" + common_name)
    cert_key_file_path_name = "./" + common_name + "/" + "%s.key" % common_name
    with open(cert_key_file_path_name, 'w') as f:
        f.write(private_key.decode("utf-8"))
    data = {'private_key': private_key, 'csr': csr.decode("utf-8"), "certificate_request": csr}
    return data


class CertificateManagement:
    def __init__(self, email: str) -> None:
        """
        @param email: 用户email
        """
        self.email = email
        self.account_exist = False
        self.ca = self.CertCA.LetEncrypt

    @property
    def __account_dir(self) -> str:
        """
        @return: str : 用户名目录
        """
        _account_dir = "/account/" + self.ca.value + "/" + self.email.split("@")[0]
        return _account_dir

    @property
    def __account_url(self) -> json:
        """
        @return: 已存在用户的url {"location":"https://xxxx"}
        """
        if os.path.exists(self.__account_dir + "/account.json"):
            account_json_str = ''
            try:
                with open(self.__account_dir + "/account.json", "r") as f:
                    account_json_str = account_json_str + f.read()
                return json.loads(account_json_str)
            except Exception as e:
                print("certificate account json file format error: %s" % e)
                return {"location": None}
        else:
            return {"location": None}

    # 证书类型
    @staticmethod
    class CertCA(Enum):
        """
        ZeroSSL 1
        LetEncrypt 2
        Staging 测试使用
        """
        ZeroSSL = "ZeroSSL"
        LetEncrypt = "LetEncrypt"
        R3Staging = "R3Staging"

    # 验证域名服务器类型
    @staticmethod
    class ChallengeType(Enum):
        DNS = "dns-01"
        HTTP = "http-01"

    # 获取相应的 目录地址
    def __directory_url(self, cert_type: CertCA, net: client.ClientNetwork) -> jose:
        """
        @param cert_type: 证书机构类型
        @param net: client.ClientNetwork
        @return: json
        """
        if cert_type == self.CertCA.ZeroSSL:
            directory_addr_url = "https://acme.zerossl.com/v2/DV90"

        elif cert_type == self.CertCA.LetEncrypt:
            directory_addr_url = "https://acme-v02.api.letsencrypt.org/directory"

        elif cert_type == self.CertCA.R3Staging:
            directory_addr_url = "https://acme-staging-v02.api.letsencrypt.org/directory"

        else:
            directory_addr_url = "https://acme-staging-v02.api.letsencrypt.org/directory"

        return net.get(directory_addr_url).json()

    # 1. 加载私钥,如果没有传入参数自动生成
    def load_account_key(self, account_pri_key_str: bytes = b'', length: int = 2048) -> jose.JWKRSA:
        """
        :param account_pri_key_str: 传入密钥
        :param length: 加密的宽度
        :return: 返回类型为jose.JWKRSA
        """
        if account_pri_key_str is not None and account_pri_key_str != b'':
            account_pri_key = serialization.load_pem_private_key(
                account_pri_key_str,
                password=None,
                backend=default_backend()
            )
            if os.path.isdir(self.__account_dir) is False:
                os.makedirs(self.__account_dir)
            else:
                self.account_exist = True
            return jose.JWKRSA(key=account_pri_key)
        else:
            # 判断是否有相关邮件的私钥存在
            # 保存私钥
            account_pri_key_file_name = self.__account_dir + '/private.pem'
            key_bytes = b''
            if os.path.exists(account_pri_key_file_name):
                with open(account_pri_key_file_name, "rb") as a:
                    key_bytes = key_bytes.join(a)
                self.account_exist = True
            else:
                # 创建账户目录
                if os.path.isdir(self.__account_dir) is False:
                    os.makedirs(self.__account_dir)
                # 没有传入参数自动生成
                key = OpenSSL.crypto.PKey()
                key.generate_key(OpenSSL.crypto.TYPE_RSA, length)
                key_bytes = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
                with open(account_pri_key_file_name, 'wb') as f:
                    f.write(key_bytes)

                # 保存公钥
                cert = OpenSSL.crypto.X509()
                cert.set_pubkey(key)
                account_pub_key_file_name = self.__account_dir + '/public.pem'
                with open(account_pub_key_file_name, "wb") as f:
                    f.write(OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, cert.get_pubkey()))

            # 返回数据
            account_pri_key = serialization.load_pem_private_key(
                key_bytes,
                password=None,
                backend=default_backend()
            )
            return jose.JWKRSA(key=account_pri_key)

    # 2. 使用你的私钥创建一个新的ACME客户端
    def create_acme_client(self, cert_ca: CertCA, account_pri_key_str: bytes = b'', ) -> client.ClientV2:
        """
        @param cert_ca:  证书机构类型
        @param account_pri_key_str:   账户私钥
        @return:
        """
        # 获取jose.RSA
        self.ca = cert_ca
        key = self.load_account_key(account_pri_key_str)
        net = client.ClientNetwork(key, user_agent="Django")
        directory = messages.Directory.from_json(self.__directory_url(cert_ca, net))
        acme_client = client.ClientV2(directory=directory, net=net)
        if self.account_exist:
            if self.__account_url.get("location") is not None:
                reg_s = messages.RegistrationResource(
                    body=messages.Registration(
                        key=key.public_key(),
                        contact=("mailto:" + self.email,),
                        agreement=True,
                        status=True,
                        terms_of_service_agreed=True,
                    ),
                    terms_of_service=True,
                    uri=self.__account_url.get("location"),  # 您的账户 URL
                )
            else:
                # 创建一个新用户
                account_status = self.__acme_new_registration(acme_client)
                # 注册已存在的账户,用你的账户的URL获取你的账户
                reg_s = messages.RegistrationResource(
                    body=messages.Registration(
                        key=key.public_key(),
                        contact=("mailto:" + self.email,),
                        agreement=True,
                        status=True,
                        terms_of_service_agreed=True,
                    ),
                    terms_of_service=True,
                    uri=account_status.get("location"),  # 您的账户 URL
                )
            try:
                # 将现有的 RegistrationResource 设置为客户端的注册对象
                acme_client.registration = reg_s
                # 初始化已存在的用户
                acme_client.query_registration(reg_s)
                return acme_client
            except Exception as e:
                print("certificate ca initialization exist user error: %s" % e)
                return acme_client
        else:
            account_status = self.__acme_new_registration(acme_client)
            if account_status.get("Code") == 400:
                raise messages.errors.Error
            elif account_status.get("Code") == 301:
                # 你的账户的URL
                account_url = account_status.get("location")
                # 注册已存在的账户,用你的账户的URL获取你的账户
                reg_s = messages.RegistrationResource(
                    body=messages.Registration(
                        key=key.public_key(),
                        contact=("mailto:" + self.email,),
                        agreement=True,
                        status=True,
                        terms_of_service_agreed=True,
                    ),
                    terms_of_service=True,
                    uri=account_url,  # 您的账户 URL
                )
                # 将现有的 RegistrationResource 设置为客户端的注册对象
                # acme_client.registration = reg_s
                acme_client.query_registration(reg_s)
            else:
                pass
            return acme_client

    # 4. 判断用户是否存在,不存在则创建用户
    def __acme_new_registration(self, acme_client: client.ClientV2) -> dict:
        """
        :param acme_client: acme客户端
        """
        new_reg = messages.NewRegistration.from_data(
            # 这里可以放入您的电子邮件地址,以接收有关证书即将过期的通知
            email=self.email,
            terms_of_service_agreed=True,
            phone=None,
            external_account_binding=None,
            agreement=True,
        )
        try:
            # 注册新账户
            new_account = acme_client.new_account(new_reg)
            location_info = {"location": new_account.uri}
            with open(self.__account_dir + "/account.json", "w") as f:
                f.write(json.dumps(location_info))
            return {"Code": 200}
        except AcmeError as e:
            dict_l = e.__dict__
            if dict_l.get("location"):
                with open(self.__account_dir + "/account.json", "w") as f:
                    f.write(json.dumps(dict_l))
                print("Certificate Account already exists: %s" % dict_l.get("location"))
                return {"Code": 301, "location": dict_l.get("location")}
            else:
                print(f"Error registering new account acme: {e}")
                return {"Code": 400}
        except Exception as e:
            print(f"Error registering new account Exception: {e}")
            return {"Code": 400}

    # 5. 提交一个订单
    def new_orders(self, acme_client_obj: client.ClientV2, domains: tuple, challenge_type: ChallengeType):
        if domains.__len__() == 0:
            return {"Code": 402, "Error": "Certificate domains invalid,none list"}

        # 生成csr请求文件
        csr_request = {"common_name": (list(domains))[0], "dns_names": domains, "extensions": True}
        csr_pem = create_csr(csr_request).get("certificate_request")
        if csr_pem is None or csr_pem == "":
            return {"Code": 402, "Error": "Certificate CSR invalid"}

        try:
            # 请求DNS-01验证 发送证书请求
            order = acme_client_obj.new_order(csr_pem)
            order_body = order.to_json()
            authorizations_list = order_body.get("authorizations")
            data = []
            challenges_list = []
            # 找到DNS挑战
            for i in authorizations_list:
                for t in i.get("body").get("challenges"):
                    if t.get("type") == challenge_type.value:
                        instance = {"dns_domain_name": i.get("body").get("identifier").get("value"),
                                    "dns_rr": "_acme-challenge",
                                    "dns_type": "TXT",
                                    "dns_value": t.get("token")
                                    }
                        challenge_list = {"domain_name": i.get("body").get("identifier").get("value"),
                                          "url": t.get("url"),
                                          "token": t.get("token"),
                                          "status": t.get("status"),
                                          "expires": i.get("body").get("expires"),
                                          "wildcard": i.get("body").get("wildcard"),
                                          "uri": i.get("uri")}
                        data.append(instance)
                        challenges_list.append(challenge_list)
            # print(challenges_list)

            # aliyun_obj = AlibabaCloud()
            # for i in data:
            #     i["account_name"] = "Geodown"
            #     res = aliyun_obj.run(18, i)
            #     if res['Code'] != 200:
            #         return {"Code": 501, "Error": res['Error']}
            for i in data:
                print(i)
            time.sleep(120)

            # 5. 完成验证
            for authr in order.authorizations:
                if authr.body.identifier.typ == messages.IDENTIFIER_FQDN:
                    dns_challenge = None
                    for i in authr.body.challenges:
                        if isinstance(i.chall, challenges.DNS01):
                            dns_challenge = i
                            break
                    if dns_challenge is None:
                        raise ValueError("No DNS challenge found for domain")
                    # challenge_response = messages.challenges.ChallengeResponse(type=challenge_type.value)
                    response = acme_client_obj.answer_challenge(dns_challenge, dns_challenge.chall.response(
                        self.load_account_key()))
                    time.sleep(2)
                    print(response)

            certificate_source, response = acme_client_obj.poll(order.authorizations[0])
            print(certificate_source)
            print(response)
            if certificate_source.body.status == 'valid':
                with open('cert.pem', 'wb') as f:
                    f.write(certificate_source.body.encode("utf-8"))

            # acme.answer_challenge(challenge, dns_challenge_response)
            # for req in challenges_list:
            #     if req.get("wildcard") is True:
            #         challenge_body = messages.ChallengeBody(chall=challenges.DNS01(token=req.get("token")),
            #                                                 uri=req.get("uri"),
            #                                                 status=req.get("status"),
            #                                                 expires=req.get("expires"),
            #                                                 wildcard=req.get("wildcard"),
            #                                                 url=req.get("url"))
            #     else:
            #         challenge_body = messages.ChallengeBody(chall=challenges.DNS01(token=req.get("token")),
            #                                                 uri=req.get("uri"),
            #                                                 status=req.get("status"),
            #                                                 expires=req.get("expires"),
            #                                                 url=req.get("url")
            #                                                 )
            #     challenge_response = messages.challenges.ChallengeResponse(type=challenge_type.value)
            #     print(challenge_response.values())
            #     response = acme_client_obj.answer_challenge(challenge_body, challenge_response)
            #     print(response.values())

        except Exception as e:
            dict_l = e.__dict__
            print(dict_l)
            # acme_client_obj.answer_challenge()
            #
            # # 6. 发出证书请求
            # csr = crypto_util.make_csr(ACCOUNT_KEY, [DOMAIN])
            # orderr = acme.finalize_order(authzr, timedelta(minutes=5), csr)
            # cert, chain = acme.fetch_chain(orderr)
            #
            # 注意,这里没有自动进行验证
            # 在 DNS 记录生效后,你需要自己调用 client_acme.answer_challenge 进行验证

            # 等待证书生成
            # orderr = client_acme.poll_and_finalize(authzr)
            #
            # # 保存证书
            # with open('certificate.pem', 'wb') as f:
            #     f.write(orderr.fullchain_pem)
            # # 等待订单完成
            # while order.status != messages.STATUS_VALID:
            #     time.sleep(1)
            #     order = acme.poll_order(order)
            #
            # # 下载证书
            # certificate_pem = acme.fetch_certificate(order)
            #
            # print(certificate_pem)


if __name__ == '__main__':
    email_list = "lzkjsoftware@gmail.com"
    domain_name_list = ("xixsxj.cn", "*.xixsxj.cn")  # domain_name
    crt = CertificateManagement(email_list)
    ac_client = crt.create_acme_client(CertificateManagement.CertCA.LetEncrypt)
    crt.new_orders(ac_client, domain_name_list, CertificateManagement.ChallengeType.DNS)

You'll need to post a full code example if you'd like help with this.

The basic issue is that you have not published the correct TXT record that was asked of you by the ACME challenge.

4 Likes

This is the challenge token.

The challenge token must be transformed into another value before publishing it to the DNS record. The details can be found in the RFC8555 document.

To do this, there is a validation method on the DNS Challenge. You pass the account key (jose.JWK) to this method, and it will return a str which you should then publish to the DNS record.

7 Likes

Thank you very much

4 Likes

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.