首页 > 解决方案 > 从数据块收到的 Sql 服务器连接丢失错误。连接池有用吗?

问题描述

我的 databricks 笔记本随机给出错误107: Transport connection error connection timed out for MySQL server。在 50 次运行中,系统会收到此类错误。我正在尝试从数据块访问 SQL 服务器。我只使用一台 SQL 服务器。SQL 服务器的配置存储在我需要在数据台中进行计算的表中。SQL 服务器上也存在审计表,因此通过数据块,我在每个进程之后在 SQL 数据库中插入记录。

当我向 Microsoft 提出此错误时,他们建议使用连接池来缓解错误。连接池会有所帮助还是我应该尝试其他方法?

用于连接 MySQL 的代码如下:

    def execute_query_mysql(self, query, get_single_result=None):
        """ Module for executing queries"""
        mysql_connection = None
        try:
            status_message = "Starting function to execute a MySQLquery : "+str(query)
            logger.info(status_message, extra=self.execution_context.get_context())
            if query:
                query = query.strip()

            mysql_connection = self.get_my_sql_connection()
            cursor = mysql_connection.cursor(dictionary=True)
            status_message = "Created connection to MySQL"
            #logger.debug(status_message, extra=self.execution_context.get_context())
            status_message = "Executing query on MySQL"
            #logger.debug(status_message, extra=self.execution_context.get_context())
            if query.lower().startswith("select"):
                cursor.execute(query)
                if get_single_result:
                    result = cursor.fetchone()
                else:
                    result = cursor.fetchall()
            elif query.lower().startswith("insert into"):
                result = cursor.execute(query)
                if result != 0:
                    result = cursor.lastrowid
                cursor.execute("commit")
            else:
                cursor.execute(query)
                cursor.execute("commit")
                result = ""
            #closing Mysql connections
            mysql_connection.close()
            cursor.close()
            status_message = "Executed query on MySQL with result : " + str(result)
            logger.debug(status_message, extra=self.execution_context.get_context())
            return result
        except Exception as exception:
            error = "ERROR in " + self.execution_context.get_context_param("current_module") + \
                    " ERROR MESSAGE: " + str(traceback.format_exc())
            self.execution_context.set_context({"traceback": error})
            logger.error(error, extra=self.execution_context.get_context())
            raise exception

    def get_my_sql_connection(self):
        """Module for getting mySQL connection"""

        try:
            status_message = "Starting function to fetch my-sql connection"
            logger.info(status_message, extra=self.execution_context.get_context())

            secret = json.loads(json.dumps(self.configuration.get_configuration([CommonConstants.ENVIRONMENT_PARAMS_KEY, "di_keys"])))

            host = secret["mysql_host"]
            user = secret["mysql_username"]
            encoded_password = secret["mysql_password"]
            password = self.decrypt_value(encoded_password)
            port = secret["mysql_port"]
            db = secret["mysql_db"]
            ssl_ca_file_path = self.configuration.get_configuration(
                [CommonConstants.ENVIRONMENT_PARAMS_KEY, "mysql_certificate_file_path"])
            if len(host) == 0 or len(user) == 0 or len(password) == 0 or len(db) == 0 \
                    or len(port) == 0:
                status_message = "Invalid configurations for " + MODULE_NAME + " in " + \
                                 CommonConstants.ENVIRONMENT_CONFIG_FILE
                logger.error(status_message, extra=self.execution_context.get_context())
                raise Exception

            if ssl_ca_file_path:
                mysql_connection = connection.MySQLConnection(
                    host=host, user=user, passwd=password, db=db, port=int(port),
                    ssl_verify_cert=True, ssl_ca=ssl_ca_file_path, connect_timeout=6000)

            else:
                mysql_connection = connection.MySQLConnection(
                    host=host, user=user, passwd=password, db=db, port=int(port))
            status_message = "Connection to MySQL successful"
            return mysql_connection

        except Exception as exception:
            error = "ERROR in " + self.execution_context.get_context_param("current_module") + \
                    " ERROR MESSAGE: " + str(exception)
            self.execution_context.set_context({"traceback": error})
            logger.error(error, extra=self.execution_context.get_context())
            raise exception

    def decrypt_value(self, encoded_string):
        """
            Purpose :   This method decrypts encoded string
            Input   :   encoded value
            Output  :   decrypted value
        """
        try:
            status_message = "Started decoding for value:" + encoded_string
            #logger.debug(status_message, extra=self.execution_context.get_context())
            decoded_string = base64.b64decode(encoded_string).decode()
            status_message = "Completed decoding value"
            #logger.info(status_message, extra=self.execution_context.get_context())
            return decoded_string
        except Exception as exception:
            status_message = "Error occured in decrypting value"
            error = "ERROR in " + self.execution_context.get_context_param("module_name") + \
                    " ERROR MESSAGE: " + str(traceback.format_exc())
            self.execution_context.set_context({"traceback": error})
            logger.error(status_message, extra=self.execution_context.get_context())
            raise exception

标签: mysqlruntime-errordatabricks

解决方案


推荐阅读