首页 > 解决方案 > 如何从特定功能节点js中删除事件发射器

问题描述

我有一个通用的事件发射器。

var events = require('events');
var eventEmitter = new events.EventEmitter();

它发出暂停、恢复、取消等事件。

我在我的函数中听这个事件。但是这个函数在 for 循环中被调用。

let func = () =>{
   //some Action runs async;
   eventEmitter.on("pause",()=>{
     //some action;
   });
   eventEmitter.on("resume",()=>{
     //some action;
   }) 
  eventEmitter.on("cancel",()=>{
     //some action;
   }) 
  return 0;
}

for(let i=0;i<anyNumber;i++){
   func();
}

编辑:我真正的缩进是递归读取目录中的文件并上传到s3 Bucket,因为没有官方方法可以上传整个目录,我通过这个实现了。

上面提到的for循环实际上是一个fs.readdir,为了简单起见,我将它称为for循环。

在 func() 我有 s3 上传功能(多部分上传),当点击暂停按钮时我需要暂停上传(这意味着已经离开当前部分上传,并停止另一部分上传。)

而 resume 意味着部分上传继续,而 cancel 意味着我取消分段上传。

这是我的确切情况。

const readdirp = require('readdirp');
readdirp('.', {fileFilter: '*.js', alwaysStat: true})
 .on('data', (entry) => {
  const {path, stats: {size}} = entry;
  s3Fileupload(path)
})
.on('warn', error => console.error('non-fatal error', error))
.on('error', error => console.error('fatal error', error))
.on('end', () => console.log('done'));

你现在能帮帮我吗?

编辑:1

let func = () =>{
 let stream = es.map((data, next) => {
  queue.defer(function(details, done) {
    _this.s3MultiUpload(JSON.parse(details), options, done, details, next);
  }, data);
}); }
let stream = readdirp(path)
stream.pipe(this.func());

可能是我在这里使用的 d3Queue 可能导致内存泄漏,我在读取目录时一直在推动该功能?

标签: javascriptnode.jsevents

解决方案


要删除侦听器,请调用eventEmitter.removeListener(event, listener)。您需要保留所有听众的副本。eventEmitter.removeAllListeners()或者,如果发射器未在其他地方使用,您可以简单地调用。

如果我这样做,那之后我就不能听那个事件了,对吧?

你说的对。您需要等到不再需要这些事件,然后再删除它们。

理想情况下,您不希望附加太多听众。不要增加限制,而是在单个事件回调中完成所有工作。

这是我的做法:

// Create an event emitter
const events = require('events');
const eventEmitter = new events.EventEmitter();

// Build a list of tasks to run
let tasks = [];
let tasksDone = 0;
for (let file of files) {
    // Each task can be paused, resumed, canceled
    let task = tasks.push({
        pause: () => {/* TODO */},
        resume: () => {/* TODO */},
        cancel: () => {/* TODO */},
        start: async () => {
            // Do work
            // Send a signal when task is done
            eventEmitter.emit('done');
        }
    });
    tasks.push(task);
}

// Store the listeners
let listeners = [
    ['pause', () => {
        tasks.forEach(task => task.pause());
    }],
    ['resume', () => {
        tasks.forEach(task => task.resume());
    }],
    ['cancel', () => {
        tasks.forEach(task => task.cancel());
    }],
    ['done', () => {
        tasksDone++;
        if (tasksDone === task.length) {
            // All work done
            // Remove listeners
            listeners.forEach(([event, callback]) => {
                eventEmitter.removeListener(event, callback);
            });
        }
    }],
];

// Attach listeners
listeners.forEach(([event, callback]) => {
    eventEmitter.on(event, callback);
});

// Start tasks
tasks.forEach(task => task.start());

尽管如此,您的应用程序可能由于其他原因而崩溃。如果您同时打开太多文件或使用太多内存,您的应用程序可能会在任务完成之前崩溃。不用说,您还应该确保文件已关闭等。

我建议首先对任务进行排队,然后一次完成一项。如果您需要更高的吞吐量,请编写一个调度程序以确保您一次不会消耗太多资源。

最后,对于节点程序,您可以附加 Chrome 调试器以找出未释放内存的原因。如果问题仍然存在,您可以准确找出占用内存的内容。


推荐阅读