ChallengeResource(body=ChallengeBody(chall=DNS01(token=b'h\xeeU}\xc1\xd1\xbd\xb19\x04&]co\xafN\x03+\xda\xc4_V7.\xd8\x9f\xe9\xcb\xa0\x0f2O'), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776987/b37seQ', status=Status(pending), validated=None, error=None), authzr_uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776987')
ChallengeResource(body=ChallengeBody(chall=DNS01(token=b";g\xdb\t\xfd\xe8\xc5\r\x96\x06\xd5\x9f\xb0\xe0\x18\xf6l'\xd3{k$\xbeX\xa10\x8eo\x17w8\xd0"), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776997/biRjWg', status=Status(pending), validated=None, error=None), authzr_uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776997')
AuthorizationResource(body=Authorization(identifier=Identifier(typ=IdentifierType(dns), value='xixsxj.cn'), challenges=(ChallengeBody(chall=DNS01(token=b'h\xeeU}\xc1\xd1\xbd\xb19\x04&]co\xafN\x03+\xda\xc4_V7.\xd8\x9f\xe9\xcb\xa0\x0f2O'), uri='https://acme-v02.api.letsencrypt.org/acme/chall-v3/228464776987/b37seQ', status=Status(invalid), validated=datetime.datetime(2023, 5, 17, 2, 57, 20, tzinfo=), error=Error(typ='urn:ietf:params:acme:error:unauthorized', title=None, detail='Incorrect TXT record "O2fbCf3oxQ2WBtWfsOAY9mwn03trJL5YoTCObxd3ONA" (and 1 more) found at _acme-challenge.xixsxj.cn', identifier=None, subproblems=None)),), status=Status(invalid), expires=datetime.datetime(2023, 5, 24, 2, 55, 19, tzinfo=), wildcard=True), uri='https://acme-v02.api.letsencrypt.org/acme/authz-v3/228464776987', new_cert_uri=None)
<Response [200]>
My domain is: lzkj
I ran this command: acme.client.ClientV2 answer_challenge
It produced this output: python3 acme.client.ClientV2
My web server is (include version): python3
The operating system my web server runs on is (include version): PyCharm Community Edition 2022.2.4
My hosting provider, if applicable, is:
I can login to a root shell on my machine (yes or no, or I don't know): yes
I'm using a control panel to manage my site (no, or provide the name and version of the control panel):
The version of my client is (e.g. output of certbot --version
or certbot-auto --version
if you're using Certbot):
import time
from enum import Enum
from acme import client, messages, challenges
from acme.errors import Error as AcmeError
import OpenSSL
import josepy as jose
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import os
# 生成CSR证书
def create_csr(*args: dict) -> dict:
"""
args: common_name (str).
country (str).
state (str).
city (str).
organization (str).
organizational_unit (str).
email_address (str).
dns_names(set)
extensions(any)
Returns:
(str, str). Tuple containing private key and certificate
signing request (PEM).
"""
dic_t = args[0]
common_name = dic_t.get('common_name')
country = dic_t.get('country')
state = dic_t.get('state')
city = dic_t.get('city')
organization = dic_t.get('organization')
organizational_unit = dic_t.get('organizational_unit')
email_address = dic_t.get('email_address')
dns_names = dic_t.get("dns_names")
extensions_add = dic_t.get("extensions")
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
req = OpenSSL.crypto.X509Req()
req.get_subject().CN = common_name
if country:
req.get_subject().C = country
if state:
req.get_subject().ST = state
if city:
req.get_subject().L = city
if organization:
req.get_subject().O = organization
if organizational_unit:
req.get_subject().OU = organizational_unit
if email_address:
req.get_subject().emailAddress = email_address
if all([dns_names, extensions_add]):
try:
san_extension = ", ".join(["DNS:" + san for san in list(dns_names)])
extensions = [
OpenSSL.crypto.X509Extension(b"subjectAltName", False, san_extension.encode())
]
req.add_extensions(extensions)
except Exception as e:
print("create csr error %s" % e)
data = {'private_key': "", 'csr': "", "certificate_request": None}
return data
req.set_pubkey(key)
req.sign(key, 'sha256')
private_key = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key)
csr = OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, req)
# if os.path.exists(settings.MEDIA_ROOT2 + common_name):
# shutil.rmtree(settings.MEDIA_ROOT2 + common_name)
# os.makedirs(settings.MEDIA_ROOT2 + common_name)
# cert_key_file_path_name = settings.MEDIA_ROOT2 + common_name + "/" + "%s.key" % common_name
os.makedirs("./" + common_name)
cert_key_file_path_name = "./" + common_name + "/" + "%s.key" % common_name
with open(cert_key_file_path_name, 'w') as f:
f.write(private_key.decode("utf-8"))
data = {'private_key': private_key, 'csr': csr.decode("utf-8"), "certificate_request": csr}
return data
class CertificateManagement:
def __init__(self, email: str) -> None:
"""
@param email: 用户email
"""
self.email = email
self.account_exist = False
self.ca = self.CertCA.LetEncrypt
@property
def __account_dir(self) -> str:
"""
@return: str : 用户名目录
"""
_account_dir = "/account/" + self.ca.value + "/" + self.email.split("@")[0]
return _account_dir
@property
def __account_url(self) -> json:
"""
@return: 已存在用户的url {"location":"https://xxxx"}
"""
if os.path.exists(self.__account_dir + "/account.json"):
account_json_str = ''
try:
with open(self.__account_dir + "/account.json", "r") as f:
account_json_str = account_json_str + f.read()
return json.loads(account_json_str)
except Exception as e:
print("certificate account json file format error: %s" % e)
return {"location": None}
else:
return {"location": None}
# 证书类型
@staticmethod
class CertCA(Enum):
"""
ZeroSSL 1
LetEncrypt 2
Staging 测试使用
"""
ZeroSSL = "ZeroSSL"
LetEncrypt = "LetEncrypt"
R3Staging = "R3Staging"
# 验证域名服务器类型
@staticmethod
class ChallengeType(Enum):
DNS = "dns-01"
HTTP = "http-01"
# 获取相应的 目录地址
def __directory_url(self, cert_type: CertCA, net: client.ClientNetwork) -> jose:
"""
@param cert_type: 证书机构类型
@param net: client.ClientNetwork
@return: json
"""
if cert_type == self.CertCA.ZeroSSL:
directory_addr_url = "https://acme.zerossl.com/v2/DV90"
elif cert_type == self.CertCA.LetEncrypt:
directory_addr_url = "https://acme-v02.api.letsencrypt.org/directory"
elif cert_type == self.CertCA.R3Staging:
directory_addr_url = "https://acme-staging-v02.api.letsencrypt.org/directory"
else:
directory_addr_url = "https://acme-staging-v02.api.letsencrypt.org/directory"
return net.get(directory_addr_url).json()
# 1. 加载私钥,如果没有传入参数自动生成
def load_account_key(self, account_pri_key_str: bytes = b'', length: int = 2048) -> jose.JWKRSA:
"""
:param account_pri_key_str: 传入密钥
:param length: 加密的宽度
:return: 返回类型为jose.JWKRSA
"""
if account_pri_key_str is not None and account_pri_key_str != b'':
account_pri_key = serialization.load_pem_private_key(
account_pri_key_str,
password=None,
backend=default_backend()
)
if os.path.isdir(self.__account_dir) is False:
os.makedirs(self.__account_dir)
else:
self.account_exist = True
return jose.JWKRSA(key=account_pri_key)
else:
# 判断是否有相关邮件的私钥存在
# 保存私钥
account_pri_key_file_name = self.__account_dir + '/private.pem'
key_bytes = b''
if os.path.exists(account_pri_key_file_name):
with open(account_pri_key_file_name, "rb") as a:
key_bytes = key_bytes.join(a)
self.account_exist = True
else:
# 创建账户目录
if os.path.isdir(self.__account_dir) is False:
os.makedirs(self.__account_dir)
# 没有传入参数自动生成
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, length)
key_bytes = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
with open(account_pri_key_file_name, 'wb') as f:
f.write(key_bytes)
# 保存公钥
cert = OpenSSL.crypto.X509()
cert.set_pubkey(key)
account_pub_key_file_name = self.__account_dir + '/public.pem'
with open(account_pub_key_file_name, "wb") as f:
f.write(OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, cert.get_pubkey()))
# 返回数据
account_pri_key = serialization.load_pem_private_key(
key_bytes,
password=None,
backend=default_backend()
)
return jose.JWKRSA(key=account_pri_key)
# 2. 使用你的私钥创建一个新的ACME客户端
def create_acme_client(self, cert_ca: CertCA, account_pri_key_str: bytes = b'', ) -> client.ClientV2:
"""
@param cert_ca: 证书机构类型
@param account_pri_key_str: 账户私钥
@return:
"""
# 获取jose.RSA
self.ca = cert_ca
key = self.load_account_key(account_pri_key_str)
net = client.ClientNetwork(key, user_agent="Django")
directory = messages.Directory.from_json(self.__directory_url(cert_ca, net))
acme_client = client.ClientV2(directory=directory, net=net)
if self.account_exist:
if self.__account_url.get("location") is not None:
reg_s = messages.RegistrationResource(
body=messages.Registration(
key=key.public_key(),
contact=("mailto:" + self.email,),
agreement=True,
status=True,
terms_of_service_agreed=True,
),
terms_of_service=True,
uri=self.__account_url.get("location"), # 您的账户 URL
)
else:
# 创建一个新用户
account_status = self.__acme_new_registration(acme_client)
# 注册已存在的账户,用你的账户的URL获取你的账户
reg_s = messages.RegistrationResource(
body=messages.Registration(
key=key.public_key(),
contact=("mailto:" + self.email,),
agreement=True,
status=True,
terms_of_service_agreed=True,
),
terms_of_service=True,
uri=account_status.get("location"), # 您的账户 URL
)
try:
# 将现有的 RegistrationResource 设置为客户端的注册对象
acme_client.registration = reg_s
# 初始化已存在的用户
acme_client.query_registration(reg_s)
return acme_client
except Exception as e:
print("certificate ca initialization exist user error: %s" % e)
return acme_client
else:
account_status = self.__acme_new_registration(acme_client)
if account_status.get("Code") == 400:
raise messages.errors.Error
elif account_status.get("Code") == 301:
# 你的账户的URL
account_url = account_status.get("location")
# 注册已存在的账户,用你的账户的URL获取你的账户
reg_s = messages.RegistrationResource(
body=messages.Registration(
key=key.public_key(),
contact=("mailto:" + self.email,),
agreement=True,
status=True,
terms_of_service_agreed=True,
),
terms_of_service=True,
uri=account_url, # 您的账户 URL
)
# 将现有的 RegistrationResource 设置为客户端的注册对象
# acme_client.registration = reg_s
acme_client.query_registration(reg_s)
else:
pass
return acme_client
# 4. 判断用户是否存在,不存在则创建用户
def __acme_new_registration(self, acme_client: client.ClientV2) -> dict:
"""
:param acme_client: acme客户端
"""
new_reg = messages.NewRegistration.from_data(
# 这里可以放入您的电子邮件地址,以接收有关证书即将过期的通知
email=self.email,
terms_of_service_agreed=True,
phone=None,
external_account_binding=None,
agreement=True,
)
try:
# 注册新账户
new_account = acme_client.new_account(new_reg)
location_info = {"location": new_account.uri}
with open(self.__account_dir + "/account.json", "w") as f:
f.write(json.dumps(location_info))
return {"Code": 200}
except AcmeError as e:
dict_l = e.__dict__
if dict_l.get("location"):
with open(self.__account_dir + "/account.json", "w") as f:
f.write(json.dumps(dict_l))
print("Certificate Account already exists: %s" % dict_l.get("location"))
return {"Code": 301, "location": dict_l.get("location")}
else:
print(f"Error registering new account acme: {e}")
return {"Code": 400}
except Exception as e:
print(f"Error registering new account Exception: {e}")
return {"Code": 400}
# 5. 提交一个订单
def new_orders(self, acme_client_obj: client.ClientV2, domains: tuple, challenge_type: ChallengeType):
if domains.__len__() == 0:
return {"Code": 402, "Error": "Certificate domains invalid,none list"}
# 生成csr请求文件
csr_request = {"common_name": (list(domains))[0], "dns_names": domains, "extensions": True}
csr_pem = create_csr(csr_request).get("certificate_request")
if csr_pem is None or csr_pem == "":
return {"Code": 402, "Error": "Certificate CSR invalid"}
try:
# 请求DNS-01验证 发送证书请求
order = acme_client_obj.new_order(csr_pem)
order_body = order.to_json()
authorizations_list = order_body.get("authorizations")
data = []
challenges_list = []
# 找到DNS挑战
for i in authorizations_list:
for t in i.get("body").get("challenges"):
if t.get("type") == challenge_type.value:
instance = {"dns_domain_name": i.get("body").get("identifier").get("value"),
"dns_rr": "_acme-challenge",
"dns_type": "TXT",
"dns_value": t.get("token")
}
challenge_list = {"domain_name": i.get("body").get("identifier").get("value"),
"url": t.get("url"),
"token": t.get("token"),
"status": t.get("status"),
"expires": i.get("body").get("expires"),
"wildcard": i.get("body").get("wildcard"),
"uri": i.get("uri")}
data.append(instance)
challenges_list.append(challenge_list)
# print(challenges_list)
# aliyun_obj = AlibabaCloud()
# for i in data:
# i["account_name"] = "Geodown"
# res = aliyun_obj.run(18, i)
# if res['Code'] != 200:
# return {"Code": 501, "Error": res['Error']}
for i in data:
print(i)
time.sleep(120)
# 5. 完成验证
for authr in order.authorizations:
if authr.body.identifier.typ == messages.IDENTIFIER_FQDN:
dns_challenge = None
for i in authr.body.challenges:
if isinstance(i.chall, challenges.DNS01):
dns_challenge = i
break
if dns_challenge is None:
raise ValueError("No DNS challenge found for domain")
# challenge_response = messages.challenges.ChallengeResponse(type=challenge_type.value)
response = acme_client_obj.answer_challenge(dns_challenge, dns_challenge.chall.response(
self.load_account_key()))
time.sleep(2)
print(response)
certificate_source, response = acme_client_obj.poll(order.authorizations[0])
print(certificate_source)
print(response)
if certificate_source.body.status == 'valid':
with open('cert.pem', 'wb') as f:
f.write(certificate_source.body.encode("utf-8"))
# acme.answer_challenge(challenge, dns_challenge_response)
# for req in challenges_list:
# if req.get("wildcard") is True:
# challenge_body = messages.ChallengeBody(chall=challenges.DNS01(token=req.get("token")),
# uri=req.get("uri"),
# status=req.get("status"),
# expires=req.get("expires"),
# wildcard=req.get("wildcard"),
# url=req.get("url"))
# else:
# challenge_body = messages.ChallengeBody(chall=challenges.DNS01(token=req.get("token")),
# uri=req.get("uri"),
# status=req.get("status"),
# expires=req.get("expires"),
# url=req.get("url")
# )
# challenge_response = messages.challenges.ChallengeResponse(type=challenge_type.value)
# print(challenge_response.values())
# response = acme_client_obj.answer_challenge(challenge_body, challenge_response)
# print(response.values())
except Exception as e:
dict_l = e.__dict__
print(dict_l)
# acme_client_obj.answer_challenge()
#
# # 6. 发出证书请求
# csr = crypto_util.make_csr(ACCOUNT_KEY, [DOMAIN])
# orderr = acme.finalize_order(authzr, timedelta(minutes=5), csr)
# cert, chain = acme.fetch_chain(orderr)
#
# 注意,这里没有自动进行验证
# 在 DNS 记录生效后,你需要自己调用 client_acme.answer_challenge 进行验证
# 等待证书生成
# orderr = client_acme.poll_and_finalize(authzr)
#
# # 保存证书
# with open('certificate.pem', 'wb') as f:
# f.write(orderr.fullchain_pem)
# # 等待订单完成
# while order.status != messages.STATUS_VALID:
# time.sleep(1)
# order = acme.poll_order(order)
#
# # 下载证书
# certificate_pem = acme.fetch_certificate(order)
#
# print(certificate_pem)
if __name__ == '__main__':
email_list = "lzkjsoftware@gmail.com"
domain_name_list = ("xixsxj.cn", "*.xixsxj.cn") # domain_name
crt = CertificateManagement(email_list)
ac_client = crt.create_acme_client(CertificateManagement.CertCA.LetEncrypt)
crt.new_orders(ac_client, domain_name_list, CertificateManagement.ChallengeType.DNS)