首页 > 解决方案 > 向 p-queue 添加函数以处理并发停止队列

问题描述

我正在将p-queue与 Puppeteer 一起使用。目标是运行 X 数量的 Chrome 实例,其中 p-queue 限制了并发量。当队列中的任务发生异常时,我想重新排队。但是当我这样做时,队列就会停止。

我有以下内容:

getAccounts 它只是解析 JSON 文件的辅助方法。对于每个条目,我都会为其创建一个任务并将其提交到队列中。

    async init() {
        let accounts = await this.getAccounts();
        accounts.map(async () => {
            await queue.add(() => this.test());
        });
        await queue.onIdle();
        console.log("ended, with count: " + this._count)
    }

测试方法:

    async test() {
        this._count++;
        const browser = await puppeteer.launch({headless: false});
        try {
            const page = await browser.newPage();
            await page.goto(this._url);

            if (Math.floor(Math.random() * 10) > 4) {
                throw new Error("Simulate error");
            }

            await browser.close();
        } catch (error) {
            await browser.close();
            await queue.add(() => this.test());
            console.log(error);
        }
    }

如果我在没有 的情况下运行await queue.add(() => this.test());它,它运行良好并将并发限制为 3。但是有了它,只要它进入 catch,当前的 Chrome 实例就会停止。

它也不会记录错误,也不会记录 this console.log("ended, with count: " + this._count)

这是节点模块的错误,还是我做错了什么?

标签: node.jspuppeteer

解决方案


我建议检查Apify SDK 包,您可以在其中简单地使用帮助类之一来管理 puppeteer 页面/浏览器。

PuppeteerPool:它为您管理浏览器实例。如果您为每个浏览器设置一页。每个新页面都会创建一个新的浏览器实例。

const puppeteerPool = new PuppeteerPool({
    maxOpenPagesPerInstance: 1,
});

const page1 = await puppeteerPool.newPage();
const page2 = await puppeteerPool.newPage();
const page3 = await puppeteerPool.newPage();

// ... do something with the pages ...

// Close all browsers.
await puppeteerPool.destroy();

或者PuppeteerCrawler更强大,有几个选项和助手。您可以在那里管理 puppeteer 中的整个爬虫。您可以查看PuppeteerCrawler 示例

编辑:使用 PuppeteerCrawler 10 并发的示例

const Apify = require('apify');

Apify.main(async () => {
    // Apify.openRequestQueue() is a factory to get a preconfigured RequestQueue instance.
    // We add our first request to it - the initial page the crawler will visit.
    const requestQueue = await Apify.openRequestQueue();
    await requestQueue.addRequest({ url: 'https://news.ycombinator.com/' }); // Adds URLs you want to process

    // Create an instance of the PuppeteerCrawler class - a crawler
    // that automatically loads the URLs in headless Chrome / Puppeteer.
    const crawler = new Apify.PuppeteerCrawler({
        requestQueue,
        maxConcurrency: 10, // Set max concurrency
        puppeteerPoolOptions: {
            maxOpenPagesPerInstance: 1, // Set up just one page for one browser instance
        },
        // The function accepts a single parameter, which is an object with the following fields:
        // - request: an instance of the Request class with information such as URL and HTTP method
        // - page: Puppeteer's Page object (see https://pptr.dev/#show=api-class-page)
        handlePageFunction: async ({ request, page }) => {
            // Code you want to process on each page
        },

        // This function is called if the page processing failed more than maxRequestRetries+1 times.
        handleFailedRequestFunction: async ({ request }) => {
            // Code you want to process when handlePageFunction failed
        },
    });

    // Run the crawler and wait for it to finish.
    await crawler.run();

    console.log('Crawler finished.');
});

使用请求列表的示例:

const Apify = require('apify');

Apify.main(async () => {
    const requestList = new Apify.RequestList({
        sources: [
            // Separate requests
            { url: 'http://www.example.com/page-1' },
            { url: 'http://www.example.com/page-2' },
            // Bulk load of URLs from file `http://www.example.com/my-url-list.txt`
            { requestsFromUrl: 'http://www.example.com/my-url-list.txt', userData: { isFromUrl: true } },
        ],
        persistStateKey: 'my-state',
        persistSourcesKey: 'my-sources',
    });

    // This call loads and parses the URLs from the remote file.
    await requestList.initialize();
    const crawler = new Apify.PuppeteerCrawler({
        requestList,
        maxConcurrency: 10, // Set max concurrency
        puppeteerPoolOptions: {
            maxOpenPagesPerInstance: 1, // Set up just one page for one browser instance
        },
        // The function accepts a single parameter, which is an object with the following fields:
        // - request: an instance of the Request class with information such as URL and HTTP method
        // - page: Puppeteer's Page object (see https://pptr.dev/#show=api-class-page)
        handlePageFunction: async ({ request, page }) => {
            // Code you want to process on each page
        },

        // This function is called if the page processing failed more than maxRequestRetries+1 times.
        handleFailedRequestFunction: async ({ request }) => {
            // Code you want to process when handlePageFunction failed
        },
    });

    // Run the crawler and wait for it to finish.
    await crawler.run();

    console.log('Crawler finished.');
});

推荐阅读