mysql - 从数据块收到的 Sql 服务器连接丢失错误。连接池有用吗?
问题描述
我的 databricks 笔记本随机给出错误107: Transport connection error connection timed out for MySQL server
。在 50 次运行中,系统会收到此类错误。我正在尝试从数据块访问 SQL 服务器。我只使用一台 SQL 服务器。SQL 服务器的配置存储在我需要在数据台中进行计算的表中。SQL 服务器上也存在审计表,因此通过数据块,我在每个进程之后在 SQL 数据库中插入记录。
当我向 Microsoft 提出此错误时,他们建议使用连接池来缓解错误。连接池会有所帮助还是我应该尝试其他方法?
用于连接 MySQL 的代码如下:
def execute_query_mysql(self, query, get_single_result=None):
""" Module for executing queries"""
mysql_connection = None
try:
status_message = "Starting function to execute a MySQLquery : "+str(query)
logger.info(status_message, extra=self.execution_context.get_context())
if query:
query = query.strip()
mysql_connection = self.get_my_sql_connection()
cursor = mysql_connection.cursor(dictionary=True)
status_message = "Created connection to MySQL"
#logger.debug(status_message, extra=self.execution_context.get_context())
status_message = "Executing query on MySQL"
#logger.debug(status_message, extra=self.execution_context.get_context())
if query.lower().startswith("select"):
cursor.execute(query)
if get_single_result:
result = cursor.fetchone()
else:
result = cursor.fetchall()
elif query.lower().startswith("insert into"):
result = cursor.execute(query)
if result != 0:
result = cursor.lastrowid
cursor.execute("commit")
else:
cursor.execute(query)
cursor.execute("commit")
result = ""
#closing Mysql connections
mysql_connection.close()
cursor.close()
status_message = "Executed query on MySQL with result : " + str(result)
logger.debug(status_message, extra=self.execution_context.get_context())
return result
except Exception as exception:
error = "ERROR in " + self.execution_context.get_context_param("current_module") + \
" ERROR MESSAGE: " + str(traceback.format_exc())
self.execution_context.set_context({"traceback": error})
logger.error(error, extra=self.execution_context.get_context())
raise exception
def get_my_sql_connection(self):
"""Module for getting mySQL connection"""
try:
status_message = "Starting function to fetch my-sql connection"
logger.info(status_message, extra=self.execution_context.get_context())
secret = json.loads(json.dumps(self.configuration.get_configuration([CommonConstants.ENVIRONMENT_PARAMS_KEY, "di_keys"])))
host = secret["mysql_host"]
user = secret["mysql_username"]
encoded_password = secret["mysql_password"]
password = self.decrypt_value(encoded_password)
port = secret["mysql_port"]
db = secret["mysql_db"]
ssl_ca_file_path = self.configuration.get_configuration(
[CommonConstants.ENVIRONMENT_PARAMS_KEY, "mysql_certificate_file_path"])
if len(host) == 0 or len(user) == 0 or len(password) == 0 or len(db) == 0 \
or len(port) == 0:
status_message = "Invalid configurations for " + MODULE_NAME + " in " + \
CommonConstants.ENVIRONMENT_CONFIG_FILE
logger.error(status_message, extra=self.execution_context.get_context())
raise Exception
if ssl_ca_file_path:
mysql_connection = connection.MySQLConnection(
host=host, user=user, passwd=password, db=db, port=int(port),
ssl_verify_cert=True, ssl_ca=ssl_ca_file_path, connect_timeout=6000)
else:
mysql_connection = connection.MySQLConnection(
host=host, user=user, passwd=password, db=db, port=int(port))
status_message = "Connection to MySQL successful"
return mysql_connection
except Exception as exception:
error = "ERROR in " + self.execution_context.get_context_param("current_module") + \
" ERROR MESSAGE: " + str(exception)
self.execution_context.set_context({"traceback": error})
logger.error(error, extra=self.execution_context.get_context())
raise exception
def decrypt_value(self, encoded_string):
"""
Purpose : This method decrypts encoded string
Input : encoded value
Output : decrypted value
"""
try:
status_message = "Started decoding for value:" + encoded_string
#logger.debug(status_message, extra=self.execution_context.get_context())
decoded_string = base64.b64decode(encoded_string).decode()
status_message = "Completed decoding value"
#logger.info(status_message, extra=self.execution_context.get_context())
return decoded_string
except Exception as exception:
status_message = "Error occured in decrypting value"
error = "ERROR in " + self.execution_context.get_context_param("module_name") + \
" ERROR MESSAGE: " + str(traceback.format_exc())
self.execution_context.set_context({"traceback": error})
logger.error(status_message, extra=self.execution_context.get_context())
raise exception
解决方案
推荐阅读
- python - Elasticsearch 搜索 API 未返回所有结果
- c# - 没有刷新.net核心mvc的发布表单
- php - 我如何将 bindingValues 与数组名称 PDO PHP 结合起来
- javascript - 在 React hooks 状态下循环遍历 json 数据
- python - 如何获取列表中字符串的公共前缀
- assembly - 从汇编代码中检查可能的危险
- r - 相当于R中的对比度(Stata)
- java - Android 将大整数转换为科学记数法
- r - rmarkdown 和 pandoc 参考问题
- css - Next.js 样式 - 页脚组件由于某种原因不在屏幕底部?