首页 > 解决方案 > Django Keycloak 集成流程

问题描述

尝试按照以下步骤在不使用内置contrib.auth应用程序的情况下将 keycloak 与 Django 系统集成。在此处输入图像描述

我的问题是:

  1. 如果我不使用内置User对象,如何使用SSO生成的会话 Keycloak 确定用户是否经过身份验证?
  2. 第5步,将其与会话关联,这是什么意思?

谢谢!

标签: pythondjangokeycloak

解决方案


这可能有点晚,但可能对将来的某人有所帮助。

contrib.auth当您只想实现 Django 身份验证时,这很有用。

给定的方法假设您正在使用来自 Keycloak 的用户并且没有 Django 用户对象可以使用。如果您正在寻找以后的案例,请查看此包 - https://django-keycloak.readthedocs.io/en/latest/

我将 Keycloak 与 React 前端和 REST DRF 后端集成的用例。为了与 keycloak 集成,您可以制作一个自定义中间件,负责解包从 keycloak 生成的 access_tokens,在每个请求的授权标头中发送,并根据您需要从各自客户端的 keycloak 导出的客户端配置文件验证权限。

这是我用于我的用例的中间件

import re
import logging
from django.conf import settings
from django.http.response import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from keycloak import KeycloakOpenID
from keycloak.exceptions import (
    KeycloakInvalidTokenError,
)
from rest_framework.exceptions import (
    PermissionDenied,
    AuthenticationFailed,
    NotAuthenticated,
)

logger = logging.getLogger(__name__)


class KeycloakMiddleware(MiddlewareMixin):
    """
    Custom KeyCloak Middleware for Authentication and Authorization
    """

    def __init__(self, get_response):
        """
        :param get_response:
        """
        super().__init__(get_response=get_response)

        self.config = settings.KEYCLOAK_CONFIG

        # Read configurations
        try:
            self.server_url = self.config["KEYCLOAK_SERVER_URL"]
            self.client_id = self.config["KEYCLOAK_CLIENT_ID"]
            self.realm = self.config["KEYCLOAK_REALM"]
        except KeyError as keycloak_snippet_no_exist:
            raise Exception(
                "KEYCLOAK_SERVER_URL, KEYCLOAK_CLIENT_ID or KEYCLOAK_REALM not found."
            ) from keycloak_snippet_no_exist

        self.client_secret_key = self.config.get("KEYCLOAK_CLIENT_SECRET_KEY", None)
        self.client_public_key = self.config.get("KEYCLOAK_CLIENT_PUBLIC_KEY", None)
        self.default_access = self.config.get("KEYCLOAK_DEFAULT_ACCESS", "DENY")
        self.method_validate_token = self.config.get("KEYCLOAK_METHOD_VALIDATE_TOKEN", "INTROSPECT")
        self.keycloak_authorization_config = self.config.get("KEYCLOAK_AUTHORIZATION_CONFIG", None)
        # Create Keycloak instance
        self.keycloak = KeycloakOpenID(
            server_url=self.server_url,
            client_id=self.client_id,
            realm_name=self.realm,
            client_secret_key=self.client_secret_key,
        )

        # Read policies
        if self.keycloak_authorization_config:
            self.keycloak.load_authorization_config(self.keycloak_authorization_config)

        # Django
        self.get_response = get_response

    @property
    def keycloak(self):
        """
        Getter KeyCloak Instance
        """
        return self._keycloak

    @keycloak.setter
    def keycloak(self, value):
        self._keycloak = value

    @property
    def config(self):
        """
        Getter Config Instance
        """
        return self._config

    @config.setter
    def config(self, value):
        self._config = value

    @property
    def server_url(self):
        """
        Getter Server URL
        """
        return self._server_url

    @server_url.setter
    def server_url(self, value):
        self._server_url = value

    @property
    def client_id(self):
        """
        Getter Client_ID of KeyCloak Client
        """
        return self._client_id

    @client_id.setter
    def client_id(self, value):
        self._client_id = value

    @property
    def client_secret_key(self):
        """
        Getter Client Secret Key
        """
        return self._client_secret_key

    @client_secret_key.setter
    def client_secret_key(self, value):
        self._client_secret_key = value

    @property
    def client_public_key(self):
        """
        Getter Client Public Key
        """
        return self._client_public_key

    @client_public_key.setter
    def client_public_key(self, value):
        self._client_public_key = value

    @property
    def realm(self):
        """
        Getter KeyClaok Realm
        """
        return self._realm

    @realm.setter
    def realm(self, value):
        self._realm = value

    @property
    def keycloak_authorization_config(self):
        """
        Getter KeyCloak Authorization Config
        """
        return self._keycloak_authorization_config

    @keycloak_authorization_config.setter
    def keycloak_authorization_config(self, value):
        self._keycloak_authorization_config = value

    @property
    def method_validate_token(self):
        """
        Getter Validate Token Private Method
        """
        return self._method_validate_token

    @method_validate_token.setter
    def method_validate_token(self, value):
        self._method_validate_token = value

    def __call__(self, request):
        """
        :param request:
        :return:
        """
        return self.get_response(request)

    def process_view(self, request, view_func, view_args, view_kwargs):
        # pylint: disable=unused-argument
        """
        Validate only the token introspect.
        :param request: django request
        :param view_func:
        :param view_args: view args
        :param view_kwargs: view kwargs
        :return:
        """

        if hasattr(settings, "KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS"):
            path = request.path_info.lstrip("/")

            if any(re.match(m, path) for m in settings.KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS):
                logger.debug("** exclude path found, skipping")
                return None

        try:
            view_scopes = view_func.cls.keycloak_scopes
        except AttributeError as _keycloak_attribute_error:
            logger.debug(
                "Allowing free acesss, since no authorization configuration (keycloak_scopes) \
                found for this request route :%s",
                request,
            )
            return None

        if "HTTP_AUTHORIZATION" not in request.META:
            return JsonResponse(
                {"detail": NotAuthenticated.default_detail},
                status=NotAuthenticated.status_code,
            )

        auth_header = request.META.get("HTTP_AUTHORIZATION").split()
        token = auth_header[1] if len(auth_header) == 2 else auth_header[0]

        # Get default if method is not defined.
        required_scope = (
            view_scopes.get(request.method, None)
            if view_scopes.get(request.method, None)
            else view_scopes.get("DEFAULT", None)
        )

        # DEFAULT scope not found and DEFAULT_ACCESS is DENY
        if not required_scope and self.default_access == "DENY":
            return JsonResponse(
                {"detail": PermissionDenied.default_detail},
                status=PermissionDenied.status_code,
            )

        try:
            # >>>>>>> Added Options kwargs to verify decode permissions to django middleware
            options = {
                "verify_signature": True,
                "verify_aud": False,
                "verify_exp": True,
            }
            user_permissions = self.keycloak.get_permissions(
                token,
                method_token_info=self.method_validate_token.lower(),
                key=self.client_public_key,
                options=options,
            )
        except KeycloakInvalidTokenError as _keycloak_invalid_token_error:
            return JsonResponse(
                {"detail": AuthenticationFailed.default_detail},
                status=AuthenticationFailed.status_code,
            )

        for perm in user_permissions:
            if required_scope in perm.scopes:
                return None

        # User Permission Denied
        return JsonResponse(
            {"detail": PermissionDenied.default_detail},
            status=PermissionDenied.status_code,
        )

使用这种方法,您可以直接在类似于此的视图类中定义策略授权。

class GetMockData(APIView):
    """
    Another GET API to Fetch fake Data
    """

    keycloak_scopes = {"GET": "medicine:view"}

    def get(self, request: HttpRequest) -> HttpResponse:
        """
        V1 API to get some medicine data from the database
        """
        data = get_all_medicines()
        if not data:
            res = CustomResponse(
                success=False,
                payload=None,
                error=E_RANGE_MESSAGE,
                status=status.HTTP_404_NOT_FOUND,
            )
            return res.send_response()
        logger.warning("Testing a fake warning >>>>>>>> WARNING <<<<<<<<?")
        data = MedicineSerializer(data, many=True).data
        res = CustomResponse(success=True, payload=data)
        return res.send_response()

推荐阅读