python - HSM 与 Python 请求模块的集成
问题描述
所以我正在编写一个需要使用客户端证书(相互身份验证)对服务器进行身份验证的应用程序。客户端证书密钥存储在 HSM(金雅拓)中。我与 Luna 客户端进行了 OpenSSL 集成,但请求模块需要一个文件:
from requests import Session
session: Session = Session()
session.cert = (
"/ssl/client.pem",
"/ssl/client.key"
)
session.verify = "/ssl/server.pem"
我的问题是,当私钥在 HSM 中时,我找不到绑定私钥的方法。到目前为止,这是我对pycryptoki库的尝试:
from pycryptoki.session_management import (
c_initialize_ex,
c_open_session_ex,
login_ex,
c_logout_ex,
c_close_session_ex,
c_finalize_ex,
)
from requests import Session
c_initialize_ex()
auth_session = c_open_session_ex(0)
login_ex(auth_session, 0, "some-pass")
session: Session = Session()
session.cert = (
"/ssl/client.pem",
"rsa-private-156405640312",
)
session.verify = "/ssl/server.pem"
...
c_logout_ex(auth_session)
c_close_session_ex(auth_session)
c_finalize_ex()
不久前我在这里打开了一个问题,我必须完成应用程序实施,所以我将 HSM 集成放在了冰上,但我需要在投入生产之前完成这项工作:https ://github.com/gemalto /pycryptoki/issues/17
我也尝试过使用py-hsm,但它是一个低级 api 库,我还用我的代码示例在那里打开了一个问题:
from pyhsm.hsmclient import HsmClient
from requests import Session
c = HsmClient(pkcs11_lib="/usr/lib/libCryptoki2_64.so")
c.open_session(slot=0)
c.login(pin="some-code")
session: Session = Session()
session.cert = "/ssl/client.pem"
c.logout()
c.close_session()
任何人都可以提供与 HSM 中的证书对进行相互身份验证的示例吗?如果你有 C/C++ 中的东西,那也很棒,我可以实现我的请求函数并将其包装在我的 python 代码中。
先感谢您!
解决方案
我已经测试了几乎所有 Python 的包装器来做同样的事情。PyKCS11 并不是很可靠。
我建议使用以下两种可能性之一:
1. PyCurl
当您正确配置 OpenSSL 和 cURL 时,cUrl 的 Python 包装器可以做到这一点。
这是一个简单的实现:
import pycurl
from io import BytesIO
import pprint
import json
c = pycurl.Curl()
url = 'https://yourserver/endpoint'
c.setopt(pycurl.URL, url)
# set proxy-insecure
c.setopt(c.SSL_VERIFYPEER, 0)
c.setopt(c.SSL_VERIFYHOST, False)
c.setopt(c.VERBOSE, 0)
c.setopt(pycurl.SSLENGINE, 'pkcs11')
c.setopt(pycurl.SSLCERTTYPE, 'eng')
c.setopt(pycurl.SSLKEYTYPE, 'eng')
c.setopt(pycurl.SSLCERT, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=cert;pin-value=pin-pin')
c.setopt(pycurl.SSLKEY, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=private;pin-value=pin-pin')
# set headers
c.setopt(pycurl.HEADER, True)
# c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0')
c.setopt(pycurl.HTTPHEADER, ("HEADER_TO_ADD:VALUE",))
buffer = BytesIO()
c.setopt(c.WRITEDATA, buffer)
c.perform()
# HTTP response code, e.g. 200.
print('>>> Status: %d' % c.getinfo(c.RESPONSE_CODE))
# Elapsed time for the transfer.
print('>>> Time: %f' % c.getinfo(c.TOTAL_TIME))
# getinfo must be called before close.
c.close()
body = buffer.getvalue().decode('utf-8')
print('>>> Body:\n', body)
if body.find('{') >= 0:
body = body[body.find('{'):]
dictionary = json.loads(body)
pprint.pprint(dictionary)
请注意,pkcs#11 URI 方案必须符合RFC7512 可以使用以下命令发现:
p11tool --provider=/usr/lib/libeTPkcs11.so --list-all
包括 pin 在内的所有字段都必须进行 url 编码。使用在线网站编码/解码 string(pin)<->url_encoded
2. M2Crypto
在我看来,这是为 Python 完成的最佳 pkcs#11 实现。由于 requests.Session.mount 方法和 requests.adapters.BaseAdapter,它允许覆盖 urllib2 和请求调用。
我在这里放了一些代码以将其与 urllib2 一起使用:
from M2Crypto import m2urllib2 as urllib2
from M2Crypto import m2, SSL, Engine
# load dynamic engine
e = Engine.load_dynamic_engine("pkcs11", "/usr/lib/x86_64-linux-gnu/engines-1.1/libpkcs11.so")
pk = Engine.Engine("pkcs11")
pk.ctrl_cmd_string("MODULE_PATH", "/usr/lib/libeTPkcs11.so")
m2.engine_init(m2.engine_by_id("pkcs11"))
pk.ctrl_cmd_string("PIN", 'pin-pin')
cert = pk.load_certificate('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
key = pk.load_private_key('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC', pin='pin-pin')
ssl_context = SSL.Context('tls')
ssl_context.set_cipher_list('EECDH+AESGCM:EECDH+aECDSA:EECDH+aRSA:EDH+AESGCM:EDH+aECDSA:EDH+aRSA:!SHA1:!SHA256:!SHA384:!MEDIUM:!LOW:!EXP:!aNULL:!eNULL:!PSK:!SRP:@STRENGTH')
ssl_context.set_default_verify_paths()
ssl_context.set_allow_unknown_ca(True)
SSL.Connection.postConnectionCheck = None
m2.ssl_ctx_use_x509(ssl_context.ctx, cert.x509)
m2.ssl_ctx_use_pkey_privkey(ssl_context.ctx, key.pkey)
opener = urllib2.build_opener(ssl_context)
urllib2.install_opener(opener)
url = 'https://yourserver/endpoint'
content = urllib2.urlopen(url=url).read()
# content = opener.open(url)
print(content)
请注意,我们没有像 PyCurl 那样在 pkcs#11 URI 中指明类型和引脚。PIN 在传递给 load_private_key 时显示为字符串,而不是 url 编码。
最后,我找到了一个很好的 HttpAdapter 实现来挂载请求以进行调用。 这是原始实现。
我添加了一些行来支持 PIN 码并禁用检查主机名:
M2Crypto.SSL.Connection.postConnectionCheck = None
以下是如何安装适配器:
from requests import Session
from m2requests import M2HttpsAdapter
import pprint, json
request = Session()
m2httpsadapter = M2HttpsAdapter()
# Added by me to set a pin when loading private key
m2httpsadapter.pin = 'pin-pin'
# Need this attribute set to False else we cannot use direct IP (added by me to disable checking hostname)
m2httpsadapter.check_hostname = False
request.mount("https://", m2httpsadapter)
request.cert=('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC',
'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
headers = {'HEADER_TO_ADD_IF_WANTED': 'VALUE', }
r = request.get("https://yourserver/endpoint", headers=headers, verify=False)
print(r.status_code)
pprint.pprint(json.loads(r.raw.data.decode('utf-8')))
推荐阅读
- python-3.x - 当我的数据在包含源数据和目标数据的 excel 中时,如何使用纬度和经度计算距离?
- jenkins - Jenkins artifactory 插件 - 确定文件不存在
- windows - 带有clang和cmake的Windows上的OpenMP
- android - 在 Azure env 中的 windows server 2016 上安装适用于 android 的 VS 模拟器
- c# - 以 MVVM 的方式使用接口在 viewModel 之间进行通信
- angular - 是否可以将多个 ID 传递给 angular-in-memory-web-api 中的查询字符串?
- objective-c - Clang 工具:获取文件的文件导入列表
- c++ - 导致运行时执行的 constexpr 函子中的成员
- linux - 尝试启动 Linux Bash 脚本时找不到命令
- jquery - 浏览器找不到函数,可能是从外部文件内部调用的错误?