首页 > 解决方案 > 如何在 python 脚本中为 sasl.mechanism PLAIN (API) 和 GSSAPI (Kerberos) 身份验证添加配置设置

问题描述

需要一些帮助来设置 sasl.mechanism PLAIN (API) 和 GSSAPI (Kerberos) 身份验证的配置。

我们在这里使用confluent Kafka,有两个脚本,一个是python 脚本,第二个是调用python 的bash 脚本。您可以在下面找到脚本。

我在这里先向您的帮助表示感谢!

import json
import os
import string
import random
import socket
import uuid
import re
from datetime import datetime
import time
import hashlib
import math
import sys
from functools import cache
from confluent_kafka import Producer, KafkaError, KafkaException

topic_name = os.environ['TOPIC_NAME']
partition_count = int(os.environ['PARTITION_COUNT'])
message_key_template = json.loads(os.environ['KEY_TEMPLATE'])
message_value_template = json.loads(os.environ['VALUE_TEMPLATE'])
message_header_template = json.loads(os.environ['HEADER_TEMPLATE'])
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
perf_counter_batch_size = int(os.environ.get('PERF_COUNTER_BATCH_SIZE', 100))
messages_per_aggregate = int(os.environ.get('MESSAGES_PER_AGGREGATE', 1))
max_message_count = int(os.environ.get('MAX_MESSAGE_COUNT', sys.maxsize))

def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

def acked(err, msg):
    if err is not None:
        print("Failed to send message: %s: %s" % (str(msg), str(err)))


producer_configs = {
    'bootstrap.servers': bootstrap_servers, 
    'client.id': socket.gethostname(),
    'error_cb': error_cb
}
# TODO: Need to support sasl.mechanism PLAIN (API) and GSSAPI (Kerberos) authentication.
# TODO: Need to support truststores for connecting to private DCs.

producer = Producer(producer_configs)


# generates a random value if it is not cached in the template_values dictionary
def get_templated_value(term, template_values):
    if not term in template_values:
        template_values[term] = str(uuid.uuid4())
    return template_values[term]

def fill_template_value(value, template_values):
    str_value = str(value)
    template_regex = '{{(.+?)}}'
    templated_terms = re.findall(template_regex, str_value)
    for term in templated_terms:
        str_value = str_value.replace(f"{{{{{term}}}}}", get_templated_value(term, template_values))
    return str_value

def fill_template(template, templated_terms):
    # TODO: Need to address metadata field, as it's treated as a string instead of a nested object.
    return  {field: fill_template_value(value, templated_terms) for field, value in template.items()}

@cache
def get_partition(lock_id):
    bits = 128
    bucket_size = 2**bits / partition_count
    
    partition = (int(hashlib.md5(lock_id.encode('utf-8')).hexdigest(), 16) / bucket_size)
    return math.floor(partition)

sequence_number = int(time.time() * 1000)
sequence_number = 0
message_count = 0
producing = True
start_time = time.perf_counter()
aggregate_message_counter = 0

# cache for templated term values so that they match across the different templates
templated_values = {}
try:
    while producing:
        sequence_number += 1
        aggregate_message_counter += 1
        message_count += 1

        if aggregate_message_counter % messages_per_aggregate == 0:
            # reset templated values
            templated_values = {}
        else:
            for term in list(templated_values):
                if term not in ['aggregateId', 'tenantId']:
                    del(templated_values[term])

        # Fill in templated field values
        message_key = fill_template(message_key_template, templated_values)
        message_value = fill_template(message_value_template, templated_values)
        message_header = fill_template(message_header_template, templated_values)

        ts = datetime.utcnow().isoformat()[:-3]+'Z'

        message_header['timestamp'] = ts
        message_header['sequence_number'] = str(sequence_number)
        message_value['timestamp'] = ts
        message_value['sequenceNumber'] = sequence_number

        lock_id = message_header['lock_id']
        partition = get_partition(lock_id)  # partition by lock_id, since key could be random, but a given aggregate_id should ALWAYS resolve to the same partition, regardless of key.

        # Send message
        producer.produce(topic_name, partition=partition, key=json.dumps(message_key), value=json.dumps(message_value), headers=message_header, callback=acked)

        if sequence_number % perf_counter_batch_size == 0:
            producer.flush()
            end_time = time.perf_counter()
            total_duration = end_time - start_time

            messages_per_second=(perf_counter_batch_size/total_duration)
            print(f'{messages_per_second} messages/second')

            # reset start time
            start_time = time.perf_counter()
        if message_count >= max_message_count:
            break
except Exception as e:
    print(f'ERROR: %s' % e)
    sys.exit(1)
finally:
    producer.flush()

标签: pythonkerberossaslconfluent-kafka-python

解决方案


推荐阅读