google-cloud-platform - Google 服务帐户 P12 凭据 - 证书的用途是什么?
问题描述
我的问题是 Google P12 凭证中的证书是什么?
我用 Python 编写了以下程序,该程序采用 P12 (PFX) 格式的 Google 服务帐户凭据文件。该程序提取私钥和证书。
私钥用于在创建 Google Access Token 时对 JWT 进行签名(请参阅本文关于从 P12 凭据创建访问令牌的文章)。我找不到证书的用处。
注意:此代码也适用于普通 SSL PFX 证书包。CA 被放入一个单独的文件中。
'''
Convert a Google P12 (PFX) service account into private key and certificate.
Convert an SSL Certifcate (PFX) into private key, certificate and CAs.
'''
import os
import OpenSSL.crypto
def write_CAs(filename, p12):
''' Write the Certificate Authorities, if any, to filename '''
ca = p12.get_ca_certificates()
if ca is None:
return
if os.path.exists(filename):
os.remove(filename)
print('Creating Certificate CA File:', filename)
with open(filename, 'wb') as f:
for cert in ca:
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
def pfx_to_pem(pfx_path, pfx_password, pkey_path, pem_path, pem_ca_path):
'''
Decrypt the P12 (PFX) file and create a private key file and certificate file.
Input:
pfx_path INPUT: This is the Google P12 file or SSL PFX certificate file
pfx_password INPUT: Password used to protect P12 (PFX)
pkey_path INPUT: File name to write the Private Key to
pem_path INPUT: File name to write the Certificate to
pem_ca_path INPUT: File name to write the Certificate Authorities to
'''
print('Opening:', pfx_path)
with open(pfx_path, 'rb') as f_pfx:
pfx = f_pfx.read()
print('Loading P12 (PFX) contents:')
p12 = OpenSSL.crypto.load_pkcs12(pfx, pfx_password)
print('Creating Private Key File:', pkey_path)
with open(pkey_path, 'wb') as f:
# Write Private Key
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, p12.get_privatekey()))
print('Creating Certificate File:', pem_path)
with open(pem_path, 'wb') as f:
# Write Certificate
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, p12.get_certificate()))
# Google P12 does not have certifiate authorities but SSL PFX certificates do
write_CAs(pem_ca_path, p12)
# Start here
pfx_to_pem(
'compute-engine.p12', # Google Service Account P12 file
'notasecret', # P12 file password
'compute-engine.key', # Filename to write private key
'compute-engine.pem', # Filename to write certificate
'compute-engine_ca.pem')# Filename to write CAs if present
解决方案
对于 Google 服务帐户凭据,该证书用于验证签名 JWT。
签名的 JWT 用于从 Google 的 OAuth 2.0 服务器请求访问令牌。
P12 格式的服务帐户凭据包括作为 PKCS#12 包的证书。Json 格式的服务帐户凭据在 Google 的网站上有可用的证书。
以下代码是使用 Google P12 凭据创建签名 JWT,然后对其进行验证并显示内容的示例。
'''
This program verifies a Signed JWT created by Google Service Account P12 credentials
First a JWT is signed with the P12 Private Key.
The certificate is extracted from the P12 file and used to verify the signature
'''
import json
import time
import base64
import jwt
import OpenSSL.crypto
# Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
auth_url = "https://www.googleapis.com/oauth2/v4/token"
# Set how long this token will be valid in seconds
expires_in = 3600 # Expires in 1 hour
#scopes = "https://www.googleapis.com/auth/cloud-platform"
scopes = "https://www.googleapis.com/auth/devstorage.read_only"
# Details on the Google Service Account. The email must match the Google Console.
sa_filename = 'compute-engine.p12'
sa_password = 'notasecret'
sa_email = 'developer-123456@developer.gserviceaccount.com'
# You can control what is verified in the JWT. For example to allow expired JWTs
# set 'verify_exp' to False
options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'require_exp': False,
'require_iat': False,
'require_nbf': False
}
aud = 'https://www.googleapis.com/oauth2/v4/token'
def load_private_key(p12_path, p12_password):
''' Read the private key and return as base64 encoded '''
# print('Opening:', p12_path)
with open(p12_path, 'rb') as f:
data = f.read()
# print('Loading P12 (PFX) contents:')
p12 = OpenSSL.crypto.load_pkcs12(data, p12_password)
# Dump the Private Key in PKCS#1 PEM format
pkey = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM,
p12.get_privatekey())
# return the private key
return pkey
def load_public_key(p12_path, p12_password):
''' Read the public key and return as base64 encoded '''
# print('Opening:', p12_path)
with open(p12_path, 'rb') as f:
p12_data = f.read()
# print('Loading P12 (PFX) contents:')
p12 = OpenSSL.crypto.load_pkcs12(p12_data, p12_password)
public_key = OpenSSL.crypto.dump_publickey(
OpenSSL.crypto.FILETYPE_PEM,
p12.get_certificate().get_pubkey())
# print(public_key)
return public_key
def create_signed_jwt(p12_path, p12_password, p12_email, scope):
''' Create an AccessToken from a service account p12 credentials file '''
pkey = load_private_key(p12_path, p12_password)
issued = int(time.time())
expires = issued + expires_in # expires_in is in seconds
# Note: this token expires and cannot be refreshed. The token must be recreated
# JWT Headers
additional_headers = {
"alg": "RS256",
"typ": "JWT" # Google uses SHA256withRSA
}
# JWT Payload
payload = {
"iss": p12_email, # Issuer claim
"sub": p12_email, # Issuer claim
"aud": auth_url, # Audience claim
"iat": issued, # Issued At claim
"exp": expires, # Expire time
"scope": scope # Permissions
}
# Encode the headers and payload and sign creating a Signed JWT (JWS)
sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
# print(sig)
return sig
def pad(data):
""" pad base64 string """
missing_padding = len(data) % 4
data += '=' * (4 - missing_padding)
return data
def print_jwt(signed_jwt):
""" Print a JWT Header and Payload """
s = signed_jwt.decode('utf-8').split('.')
print('Header:')
h = base64.urlsafe_b64decode(pad(s[0])).decode('utf-8')
print(json.dumps(json.loads(h), indent=4))
print('Payload:')
p = base64.urlsafe_b64decode(pad(s[1])).decode('utf-8')
print(json.dumps(json.loads(p), indent=4))
def verify_signed_jwt(signed_jwt):
'''
This function takes a Signed JWT and verifies it using a Google P12 service account.
'''
# Get the Public Key
public_key = load_public_key(sa_filename, sa_password)
# Verify the Signed JWT
r = jwt.decode(signed_jwt, public_key, algorithms=["RS256"], audience=aud, options=options)
print('Decoded JWT:')
print(json.dumps(r, indent=4))
if __name__ == '__main__':
s_jwt = create_signed_jwt(sa_filename, sa_password, sa_email, scopes)
print_jwt(s_jwt)
verify_signed_jwt(s_jwt)
推荐阅读
- snowflake-cloud-data-platform - 大写 SQL 语句在 Snowflake 存储过程中不起作用
- matlab - 来自 Madgwick AHRS 的结果值小于 -180° 的问题
- java - 在 java 上读取图像的性能不佳
- python - Python PyQt5在不重新打开窗口的情况下传递类变量
- tensorflow2.0 - ValueError:形状在assign_add()中的等级必须相等
- r - 如何在 plotrix 或 ggplot2 上绘制 3 个 y 轴图?
- python - 如何在 Python 中处理选项参数?(只能是一组值的参数)
- reactjs - React Native Flatlist 嵌套循环
- php - Laravel Nova 中的用户管理;允许创建用户,但仅限于某些角色
- c - 如何将 mvprintw 与指向字符串的指针数组一起使用