python - Django Keycloak 集成流程
问题描述
尝试按照以下步骤在不使用内置contrib.auth
应用程序的情况下将 keycloak 与 Django 系统集成。
我的问题是:
- 如果我不使用内置
User
对象,如何使用SSO
生成的会话 Keycloak 确定用户是否经过身份验证? - 第5步,将其与会话关联,这是什么意思?
谢谢!
解决方案
这可能有点晚,但可能对将来的某人有所帮助。
contrib.auth
当您只想实现 Django 身份验证时,这很有用。
给定的方法假设您正在使用来自 Keycloak 的用户并且没有 Django 用户对象可以使用。如果您正在寻找以后的案例,请查看此包 - https://django-keycloak.readthedocs.io/en/latest/
我将 Keycloak 与 React 前端和 REST DRF 后端集成的用例。为了与 keycloak 集成,您可以制作一个自定义中间件,负责解包从 keycloak 生成的 access_tokens,在每个请求的授权标头中发送,并根据您需要从各自客户端的 keycloak 导出的客户端配置文件验证权限。
这是我用于我的用例的中间件
import re
import logging
from django.conf import settings
from django.http.response import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from keycloak import KeycloakOpenID
from keycloak.exceptions import (
KeycloakInvalidTokenError,
)
from rest_framework.exceptions import (
PermissionDenied,
AuthenticationFailed,
NotAuthenticated,
)
logger = logging.getLogger(__name__)
class KeycloakMiddleware(MiddlewareMixin):
"""
Custom KeyCloak Middleware for Authentication and Authorization
"""
def __init__(self, get_response):
"""
:param get_response:
"""
super().__init__(get_response=get_response)
self.config = settings.KEYCLOAK_CONFIG
# Read configurations
try:
self.server_url = self.config["KEYCLOAK_SERVER_URL"]
self.client_id = self.config["KEYCLOAK_CLIENT_ID"]
self.realm = self.config["KEYCLOAK_REALM"]
except KeyError as keycloak_snippet_no_exist:
raise Exception(
"KEYCLOAK_SERVER_URL, KEYCLOAK_CLIENT_ID or KEYCLOAK_REALM not found."
) from keycloak_snippet_no_exist
self.client_secret_key = self.config.get("KEYCLOAK_CLIENT_SECRET_KEY", None)
self.client_public_key = self.config.get("KEYCLOAK_CLIENT_PUBLIC_KEY", None)
self.default_access = self.config.get("KEYCLOAK_DEFAULT_ACCESS", "DENY")
self.method_validate_token = self.config.get("KEYCLOAK_METHOD_VALIDATE_TOKEN", "INTROSPECT")
self.keycloak_authorization_config = self.config.get("KEYCLOAK_AUTHORIZATION_CONFIG", None)
# Create Keycloak instance
self.keycloak = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=self.realm,
client_secret_key=self.client_secret_key,
)
# Read policies
if self.keycloak_authorization_config:
self.keycloak.load_authorization_config(self.keycloak_authorization_config)
# Django
self.get_response = get_response
@property
def keycloak(self):
"""
Getter KeyCloak Instance
"""
return self._keycloak
@keycloak.setter
def keycloak(self, value):
self._keycloak = value
@property
def config(self):
"""
Getter Config Instance
"""
return self._config
@config.setter
def config(self, value):
self._config = value
@property
def server_url(self):
"""
Getter Server URL
"""
return self._server_url
@server_url.setter
def server_url(self, value):
self._server_url = value
@property
def client_id(self):
"""
Getter Client_ID of KeyCloak Client
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""
Getter Client Secret Key
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def client_public_key(self):
"""
Getter Client Public Key
"""
return self._client_public_key
@client_public_key.setter
def client_public_key(self, value):
self._client_public_key = value
@property
def realm(self):
"""
Getter KeyClaok Realm
"""
return self._realm
@realm.setter
def realm(self, value):
self._realm = value
@property
def keycloak_authorization_config(self):
"""
Getter KeyCloak Authorization Config
"""
return self._keycloak_authorization_config
@keycloak_authorization_config.setter
def keycloak_authorization_config(self, value):
self._keycloak_authorization_config = value
@property
def method_validate_token(self):
"""
Getter Validate Token Private Method
"""
return self._method_validate_token
@method_validate_token.setter
def method_validate_token(self, value):
self._method_validate_token = value
def __call__(self, request):
"""
:param request:
:return:
"""
return self.get_response(request)
def process_view(self, request, view_func, view_args, view_kwargs):
# pylint: disable=unused-argument
"""
Validate only the token introspect.
:param request: django request
:param view_func:
:param view_args: view args
:param view_kwargs: view kwargs
:return:
"""
if hasattr(settings, "KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS"):
path = request.path_info.lstrip("/")
if any(re.match(m, path) for m in settings.KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS):
logger.debug("** exclude path found, skipping")
return None
try:
view_scopes = view_func.cls.keycloak_scopes
except AttributeError as _keycloak_attribute_error:
logger.debug(
"Allowing free acesss, since no authorization configuration (keycloak_scopes) \
found for this request route :%s",
request,
)
return None
if "HTTP_AUTHORIZATION" not in request.META:
return JsonResponse(
{"detail": NotAuthenticated.default_detail},
status=NotAuthenticated.status_code,
)
auth_header = request.META.get("HTTP_AUTHORIZATION").split()
token = auth_header[1] if len(auth_header) == 2 else auth_header[0]
# Get default if method is not defined.
required_scope = (
view_scopes.get(request.method, None)
if view_scopes.get(request.method, None)
else view_scopes.get("DEFAULT", None)
)
# DEFAULT scope not found and DEFAULT_ACCESS is DENY
if not required_scope and self.default_access == "DENY":
return JsonResponse(
{"detail": PermissionDenied.default_detail},
status=PermissionDenied.status_code,
)
try:
# >>>>>>> Added Options kwargs to verify decode permissions to django middleware
options = {
"verify_signature": True,
"verify_aud": False,
"verify_exp": True,
}
user_permissions = self.keycloak.get_permissions(
token,
method_token_info=self.method_validate_token.lower(),
key=self.client_public_key,
options=options,
)
except KeycloakInvalidTokenError as _keycloak_invalid_token_error:
return JsonResponse(
{"detail": AuthenticationFailed.default_detail},
status=AuthenticationFailed.status_code,
)
for perm in user_permissions:
if required_scope in perm.scopes:
return None
# User Permission Denied
return JsonResponse(
{"detail": PermissionDenied.default_detail},
status=PermissionDenied.status_code,
)
使用这种方法,您可以直接在类似于此的视图类中定义策略授权。
class GetMockData(APIView):
"""
Another GET API to Fetch fake Data
"""
keycloak_scopes = {"GET": "medicine:view"}
def get(self, request: HttpRequest) -> HttpResponse:
"""
V1 API to get some medicine data from the database
"""
data = get_all_medicines()
if not data:
res = CustomResponse(
success=False,
payload=None,
error=E_RANGE_MESSAGE,
status=status.HTTP_404_NOT_FOUND,
)
return res.send_response()
logger.warning("Testing a fake warning >>>>>>>> WARNING <<<<<<<<?")
data = MedicineSerializer(data, many=True).data
res = CustomResponse(success=True, payload=data)
return res.send_response()
推荐阅读
- android - Firebase 托管和数据库同时进行
- java - [签名验证错误]:签名长度不正确:得到 780 但预期为 256
- c - 如何在 C 中访问全局二维数组的元素?
- java - 如何以列优先顺序从数组中提取字符?
- c# - (已修复)应用程序做得很奇怪?C#
- javascript - 如何在 JavaScript 中切换样式
- sql-server - 更改 Linux 上的默认 SQL Server 时区?
- python - Flask - 由于 functools.wraps,一条 Flash 消息不会消失?
- google-cloud-platform - 增加谷歌云平台的配额
- javascript - 如何在 JavaScript 中使用来自 Django 的 TextField?