首页 > 解决方案 > 如何通过大量查询避免nodejs mysql中的死锁?

问题描述

我有很多 url,对于我调用函数 load(url) 的每个 url,此函数解析 html,提取所需的数据并构建批量插入查询,如您在我的 test.js 代码中所见。问题是,如果我有很多 url(比如 100+),我会收到一个错误:来自 mysql 的 ER_LOCK_DEADLOCK。我尝试使用 async.queue 但这在某种程度上不起作用(我不知道为什么,也许我使用的是错误的)。如何一个接一个地运行多个 url + 查询,避免我认为会导致死锁的并行执行?即使使用 async.queue 会导致死锁(并非总是如此)。

测试.js

const request = require('request');
const async = require('async');
const pool = require('./database');

const urls = [
    'https://www.quora.com/What-is-the-best-way-to-have-delayed-job-queue-with-node-js',
    'https://de.wikipedia.org/wiki/Reinhardt-Zimmermann-L%C3%B6sung',
    'https://towardsdatascience.com/the-5-clustering-algorithms-data-scientists-need-to-know-a36d136ef68'
]

let load = function(url) {
    request({url: url}, function(error, response, html) {
        if(!error) {
            console.log(html);
            /**
             * 1. Parse HTML
             * 2. Create Array of Values
             * 3. Call pool.query(sql, [values], function(error) { ... })
             */
            let data = [{}];
            let sql = "INSERT IGNORE INTO tbl_test (title, content) VALUES ?";
            let values = [];

            data.forEach((item) => { values.push(item) });

            pool.query(sql, [values], function(error) { 
                if(error) throw error;
             })
        } else {
            console.log("handle error...");
        }
    })
}

let jobs = []

/*urls.forEach((url) => {
    //jobs.push(load(url)); // --> Works but fails if the urls list is to big -> mysql deadlock error!
    jobs.push(function(callback) { callback(load(url)) });
})*/

let q = async.queue(function(task, callback) {
    console.log("Task:", task.uri);
    callback();
})

q.drain = function() {
    console.log('all task completed');
    pool.end();
}

urls.forEach((url) => {
    q.push({uri: url}, function(err) {
        console.log('finished processing ...')
    });
});

数据库.js

require('dotenv').config();

const mysql = require('mysql');

let pool = mysql.createPool(
    {
        connectionLimit: 10,
        host: process.env.DB_HOST,
        port: process.env.DB_PORT,
        user: process.env.DB_USER,
        password: process.env.DB_PASSWORD,
        database: process.env.DB_NAME
    }
);

pool.getConnection((err, connection) => {
    if(err) {
        if(err.code === 'PROTOCOL_CONNECTION_LOST') {
            console.log('Database connection lost.')
        }

        if(err.code === 'ER_CON_COUNT_ERROR') {
            console.log('Database has too many connections.')
        }

        if(err.code === 'ECONNREFUSED') {
            console.log('Database connection refused.')
        }

        if(err.code === 'POOL_CLOSED') {
            console.log('Pool is closed.')
        }
    }

    if(connection) {
        connection.release()
    }

    return;
});

module.exports = pool;

标签: mysqlnode.jsdeadlock

解决方案


我已将代码更改为使用 async.series 而不是 async.queue,因为这些任务将在队列中并行运行(请参阅:https ://caolan.github.io/async/docs.html#queue )。

测试.js

...
let tasks = [];

context.forEach((ctx) => {
    tasks.push(function(callback) { load(ctx, callback) });
});

async.series(tasks, function(err) {
    if(err) return next(err);
});

推荐阅读