python - mlflow.exceptions.MlflowException: Changing param values is not allowed. Param with key='input_rows' was already logged with value='32205'
问题描述
I am using Mlflow as a work orchestration tool. I have a Machine Learning pipeline. In this pipeline, I have real-time data. I'm listening this data with Apache Kafka. Also, I'm doing this: Whenever 250 message comes to this topic, I'm gathering them, and I'm appending this message my previous data. After that, my training function is triggered. Thus, I am able to making new training in every 250 new data. With Mlflow, I can show the results, metrics and any other parameters of trained models. But After training occurred one time, the second one doesn't occurs, and It throws me this error which I have shown in title. Here it is my consumer:
topic_name = 'twitterdata'
train_every = 250
def consume_tweets():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9093'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
fetch_max_bytes=128,
max_poll_records=100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
tweet_counter = 0
for message in consumer:
tweets = json.loads(json.dumps(message.value))
# print(tweets['text'])
tweet_sentiment = make_prediction(tweets['text'])
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
else:
tweet_counter += 1
publish_prediction(tweet_sentiment, tweets['text'])
And here it is my train.py:
train_tweets = pd.read_csv(DATA_PATH)
# train_tweets = train_tweets[:20000]
tweets = train_tweets.tweet.values
labels = train_tweets.label.values
# Log data params
mlflow.log_param('input_rows', train_tweets.shape[0])
# Do preprocessing and return vectorizer with it
vectorizer, processed_features = embedding(tweets)
# Saving vectorizer
save_vectorizer(vectorizer)
# Split data
X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)
# Handle imbalanced data by using 'Smote' and log to Mlflow
smote = SMOTE('minority')
mlflow.log_param("over-sampling", smote)
X_train, y_train = smote.fit_sample(X_train, y_train)
# text_classifier = MultinomialNB()
text_classifier = LogisticRegression(max_iter=10000)
text_classifier.fit(X_train, y_train)
predictions = text_classifier.predict(X_test)
# Model metrics
(rmse, mae, r2) = eval_metrics(y_test, predictions)
mlflow.log_param('os-row-Xtrain', X_train.shape[0])
mlflow.log_param('os-row-ytrain', y_train.shape[0])
mlflow.log_param("model_name", text_classifier)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))
mlflow.sklearn.log_model(text_classifier, "model")
I couldn't solve the problem. MLflow is one of the newest tool, so issues and examples of Mlflow are very few.
解决方案
I think you need an MLflow "run" for every new batch of data, so that your parameters are logged independently for each new training.
So, try the following in your consumer:
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
with mlflow.start_run() as mlrun:
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
推荐阅读
- python - 从子目录打开文本文件
- java - 在父片段和子片段之间传递数据
- google-bigquery - 为什么 BigQuery 使用仅引用一列的 WHERE 子句对“SELECT *”进行全表扫描?
- javascript - 无法在 Vue.JS 上捕获自定义事件
- javascript - 字符串 Leetcode 中的第一个唯一字符 - 使用指针 (javascript)
- haskell - 将字符串列表转换为整数列表
- selenium-webdriver - 如何使用机器人框架获取跨度标签值?
- c# - 使用 .net 核心工作的全局鼠标(和可选的键盘)挂钩
- python - 需要有关弹性搜索中相关性分数排序的帮助
- graphviz - Graphviz:反转集群内的两个节点位置