首页 > 解决方案 > mlflow.exceptions.MlflowException: Changing param values is not allowed. Param with key='input_rows' was already logged with value='32205'

问题描述

I am using Mlflow as a work orchestration tool. I have a Machine Learning pipeline. In this pipeline, I have real-time data. I'm listening this data with Apache Kafka. Also, I'm doing this: Whenever 250 message comes to this topic, I'm gathering them, and I'm appending this message my previous data. After that, my training function is triggered. Thus, I am able to making new training in every 250 new data. With Mlflow, I can show the results, metrics and any other parameters of trained models. But After training occurred one time, the second one doesn't occurs, and It throws me this error which I have shown in title. Here it is my consumer:

topic_name = 'twitterdata'
train_every = 250


def consume_tweets():
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=['localhost:9093'],
        auto_offset_reset='latest',
        enable_auto_commit=True,
        auto_commit_interval_ms=5000,
        fetch_max_bytes=128,
        max_poll_records=100,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    tweet_counter = 0
    for message in consumer:
        tweets = json.loads(json.dumps(message.value))
        # print(tweets['text'])
        tweet_sentiment = make_prediction(tweets['text'])

        if tweet_counter == train_every:
            update_df()
            data_path = 'data/updated_tweets.csv'
            train(data_path)
            print("\nTraining with new data is completed!\n")
            tweet_counter = 0

        else:
            tweet_counter += 1

        publish_prediction(tweet_sentiment, tweets['text'])

And here it is my train.py:

train_tweets = pd.read_csv(DATA_PATH)
    # train_tweets = train_tweets[:20000]

    tweets = train_tweets.tweet.values
    labels = train_tweets.label.values

    # Log data params
    mlflow.log_param('input_rows', train_tweets.shape[0])

    # Do preprocessing and return vectorizer with it
    vectorizer, processed_features = embedding(tweets)

    # Saving vectorizer
    save_vectorizer(vectorizer)

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)

    # Handle imbalanced data by using 'Smote' and log to Mlflow
    smote = SMOTE('minority')
    mlflow.log_param("over-sampling", smote)

    X_train, y_train = smote.fit_sample(X_train, y_train)

    # text_classifier = MultinomialNB()
    text_classifier = LogisticRegression(max_iter=10000)
    text_classifier.fit(X_train, y_train)
    predictions = text_classifier.predict(X_test)

    # Model metrics
    (rmse, mae, r2) = eval_metrics(y_test, predictions)

    mlflow.log_param('os-row-Xtrain', X_train.shape[0])
    mlflow.log_param('os-row-ytrain', y_train.shape[0])
    mlflow.log_param("model_name", text_classifier)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))

    mlflow.sklearn.log_model(text_classifier, "model")

I couldn't solve the problem. MLflow is one of the newest tool, so issues and examples of Mlflow are very few.

标签: pythonmachine-learningapache-kafkamlflowreal-time-data

解决方案


I think you need an MLflow "run" for every new batch of data, so that your parameters are logged independently for each new training.

So, try the following in your consumer:

if tweet_counter == train_every:
            update_df()
            data_path = 'data/updated_tweets.csv'
            with mlflow.start_run() as mlrun:
               train(data_path)
            print("\nTraining with new data is completed!\n")
            tweet_counter = 0

推荐阅读