首页 > 解决方案 > 为 DNS 更新生成 TSIG 密钥环(作为编码的字节字符串)

问题描述

我正在尝试使用 python DNS 模块(dnspython)创建(add)新的 DNS 记录。

文档指定如何创建更新http://www.dnspython.org/examples.html

import dns.tsigkeyring
import dns.update
import sys

keyring = dns.tsigkeyring.from_text({
    'host-example.' : 'XXXXXXXXXXXXXXXXXXXXXX=='
})

update = dns.update.Update('dyn.test.example', keyring=keyring)
update.replace('host', 300, 'a', sys.argv[1])

但它并不精确,首先如何实际生成可以传递给dns.tsigkeyring.from_text()方法的密钥环字符串。

生成密钥的正确方法是什么?我在我的组织中使用 krb5。

标签: pythondnskerberosdnspython

解决方案


服务器正在使用 GSS-TSIG 的 Microsoft AD DNS 上运行。

TSIG 和 GSS-TSIG 是不同的野兽——前者使用可以简单地从服务器复制的静态预共享密钥,但后者使用 Kerberos (GSSAPI) 为每个事务协商会话密钥。

在最初发布此线程时,dnspython 1.x 没有任何对 GSS-TSIG 的支持。

(握手不会产生可以转换为常规 TSIG 密钥环的静态密钥;相反,必须调用 GSSAPI 库本身来构建身份验证器——dnspython 1.x 无法做到这一点,尽管 dnspython 2.1 最终可以。)

如果您尝试更新 Active Directory DNS 服务器,BIND 的nsupdate命令行工具支持 GSS-TSIG(有时甚至可以工作)。您应该能够通过子进程运行它,并通过标准输入简单地提供必要的更新。

cmds = [f'zone {dyn_zone}\n',
        f'del {fqdn}\n',
        f'add {fqdn} 60 TXT "{challenge}"\n',
        f'send\n']
subprocess.run(["nsupdate", "-g"],
               input="".join(cmds).encode(),
               check=True)

与大多数 Kerberos 客户端应用程序一样,nsupdate 期望凭据已经存在于环境中(也就是说,您需要kinit事先使用 TGT;或者,如果使用最新版本的 MIT Krb5,您可以$KRB5_CLIENT_KTNAME指向包含客户端凭据的密钥表)。

更新:dnspython 2.1终于有了 GSS-TSIG 所需的部分,但创建密钥环目前是一个非常手动的过程——您必须调用 GSSAPI 库并自己处理 TKEY 协商。这样做的代码包含在底部。

(下面的 Python 代码可以传递一个自定义gssapi.Credentials对象,否则它会像 nsupdate 一样在环境中查找凭据。)

import dns.rdtypes.ANY.TKEY
import dns.resolver
import dns.update
import gssapi
import socket
import time
import uuid

def _build_tkey_query(token, key_ring, key_name):
    inception_time = int(time.time())
    tkey = dns.rdtypes.ANY.TKEY.TKEY(dns.rdataclass.ANY,
                                     dns.rdatatype.TKEY,
                                     dns.tsig.GSS_TSIG,
                                     inception_time,
                                     inception_time,
                                     3,
                                     dns.rcode.NOERROR,
                                     token,
                                     b"")

    query = dns.message.make_query(key_name,
                                   dns.rdatatype.TKEY,
                                   dns.rdataclass.ANY)
    query.keyring = key_ring
    query.find_rrset(dns.message.ADDITIONAL,
                     key_name,
                     dns.rdataclass.ANY,
                     dns.rdatatype.TKEY,
                     create=True).add(tkey)
    return query

def _probe_server(server_name, zone):
    gai = socket.getaddrinfo(str(server_name),
                             "domain",
                             socket.AF_UNSPEC,
                             socket.SOCK_DGRAM)
    for af, sf, pt, cname, sa in gai:
        query = dns.message.make_query(zone, "SOA")
        res = dns.query.udp(query, sa[0], timeout=2)
        return sa[0]

def gss_tsig_negotiate(server_name, server_addr, creds=None):
    # Acquire GSSAPI credentials
    gss_name = gssapi.Name(f"DNS@{server_name}",
                           gssapi.NameType.hostbased_service)
    gss_ctx = gssapi.SecurityContext(name=gss_name,
                                     creds=creds,
                                     usage="initiate")

    # Name generation tips: https://tools.ietf.org/html/rfc2930#section-2.1
    key_name = dns.name.from_text(f"{uuid.uuid4()}.{server_name}")
    tsig_key = dns.tsig.Key(key_name, gss_ctx, dns.tsig.GSS_TSIG)

    key_ring = {key_name: tsig_key}
    key_ring = dns.tsig.GSSTSigAdapter(key_ring)

    token = gss_ctx.step()
    while not gss_ctx.complete:
        tkey_query = _build_tkey_query(token, key_ring, key_name)
        response = dns.query.tcp(tkey_query, server_addr, timeout=5)
        if not gss_ctx.complete:
            # Original comment:
            # https://github.com/rthalley/dnspython/pull/530#issuecomment-658959755
            # "this if statement is a bit redundant, but if the final token comes
            # back with TSIG attached the patch to message.py will automatically step
            # the security context. We dont want to excessively step the context."
            token = gss_ctx.step(response.answer[0][0].key)

    return key_name, key_ring

def gss_tsig_update(zone, update_msg, creds=None):
    # Find the SOA of our zone
    answer = dns.resolver.resolve(zone, "SOA")
    soa_server = answer.rrset[0].mname
    server_addr = _probe_server(soa_server, zone)

    # Get the GSS-TSIG key
    key_name, key_ring = gss_tsig_negotiate(soa_server, server_addr, creds)

    # Dispatch the update
    update_msg.use_tsig(keyring=key_ring,
                        keyname=key_name,
                        algorithm=dns.tsig.GSS_TSIG)
    response = dns.query.tcp(update_msg, server_addr)
    return response

推荐阅读