首页 > 解决方案 > 排队 SQL 查询

问题描述

我有一个通过 UDP 监听事件(字符串)的套接字。每次它获取一个字符串时,都会生成一个 SQL 请求。我的问题是让它异步运行,以便请求排队,而不是在另一个查询运行时尝试访问数据库。

伪:

socket.on('message', function (msg) {
    switch (msg) {
        case "case1":
            storeRows();
            break;
    //there's a new message every 1-2 secs
    ...
}
var rows = []; //push results of all queries here
function storeRows() {
    rows.push(getFromDB());
}

function getFromDB() {
    var sqlString = "SELECT * ..."
    var req = new Req(sqlString, function(err, rowCount) {
...
    }
var resultsArray = [];
req.on('row', function (cols) {
    //add results to resultsArray
}
return resultsArray;
}

基本上我需要 getFromDB() 异步运行,等待上一个查询完成后再运行下一个。这个我不知道怎么办。我正在使用 tedious.js 访问 SQL Server 数据库。

编辑:

var config = {
    userName: "admin",
    password: "pw",
    server: "test",
    domain: "test",
    options: {
        port: '10322'
    }
}
connection = new Connection(config);
connection.on('connect') {
    isConnected = true;
}

getCoordinates(vehicleID, fromTime, toTime) { 
    var SQLString = "Select * FROM coordinates WHERE TimeStamp BETWEEN '" + fromTime + "' AND '" + toTime + "' AND vehicleID = " + vehicleID;
    var rowsToSend = [];
    var req = new Req(SQLString, function(err, rowCount) {
        if (err) console.log(err)
        else console.log(rowCount);
    }

    req.on('row', function (columns) {
        var rowArray = [];
        columns.forEach(function (column) {
            var colValue = column.value;
            switch (column.metadata.colName) {
                case "ID":
                    if (rowArray.length > 0)                   
                        rowsToSend.push(rowArray);
                        rowArray = new Array();
                        break;

                default:                     
                    rowArray.push(colValue);
                    break;
                }
         });
         rowsToSend.push(rowArray);
    });
    connection.execSql(req);

    req.on('doneProc', function () {
        return(rowsToSend);
    }  
}

//called every few seconds
function runQueries() {
    someArray.push(getCoordinates ());
}

标签: javascriptsqlnode.jsasynchronouscallback

解决方案


您可以构建类似处理器的东西,它构建基于数组的队列,并且任何时候只允许在数据库上执行一个查询。这是一个使用带有 Promise 的 async/await 函数的示例:

class Processor {
  constructor() {
    this.queue = [];
    this.queryRunning = false;
  }
  
  // Main query function.
  // Add the query to a queue, and try to process the queue.
  query(query) {
    return new Promise((resolver) => {
      this.queue.push({
        query,
        resolver
      });
      
      this.tryProcessNext();
    });
  }
  
  // Try to process the next element of the queue
  tryProcessNext() {
    let self = this;
    
    // If it's running, or queue is empty, simply do nothing
    if (this.queue.length === 0 || this.queryRunning)
      return;
    
    this.queryRunning = true;
    
    // Get the first added element to the queue (And shorten the array at the same time)
    let qry = this.queue.shift();
    
    // Run query, and use `qry.resolver()` to resolve the original promise.
    setTimeout(function() {
      qry.resolver(`Result: ${qry.query}`);
      self.queryRunning = false;
      // If there's anything next in the queue, try to process that next
      self.tryProcessNext();
    }, 1500);
  }
  
}

const proccessor = new Processor();

let i = 0;
// A simple function to demonstrate adding queries via a websocket.
setInterval(async function() {
  console.log(`Adding query: QUERY ${++i}`);
  var res = await proccessor.query(`QUERY ${i}`);
  
  console.log(res);
}, 1000);


推荐阅读