首页 > 解决方案 > 使用'http'/'https'模块的NodeJS(Typescript ES2020)中的并行HTTPs(出站)GET请求

问题描述

我正在尝试使用 NodeJS worker_threads 发出并行 HTTPs 请求。除了https.get()函数之外,一切都并行工作。它阻止其余的请求。这是我到目前为止所尝试的。

import 'source-map-support/register';

import http from 'http';
import https, { RequestOptions } from 'https';
import { URL } from 'url';
import { isMainThread, parentPort, Worker, workerData } from 'worker_threads';

export class Downloader {

  downloadStringsAsync<Type>(urls: string[],
      callback: Function,
      timeout?: number,
      parameters?: Record<string, any>): Promise<Type[]> {
    let threadCount = 8;

    return new Promise<Type[]>((resolve, reject) => {
      // if thread count is greater than the number of urls...
      if (urls.length < threadCount) {
        // we set the number of urls as thread count...
        threadCount = urls.length;
      }

      const urlsPerThread = Math.floor(urls.length / threadCount);
      let remainingUrls = urls.length % threadCount;
      let startIndex = 0;
      let endIndex = urlsPerThread;
      let count = 0;
      let contents: Type[] = [];

      for (let i = 0; i < threadCount; i++) {
        if (remainingUrls > 0) {
          endIndex++;
          remainingUrls--;
        }

        // spawns new worker thread...
        this.spawnWorkerAsync<Type>({
          identifier: i,
          startIndex: startIndex,
          endIndex: endIndex,
          timeout: timeout,
          urls: urls
        }, callback, parameters).then((_contents) => {
          contents = contents.concat(_contents);
          count++;

          if (count === threadCount) {
            resolve(contents);
          }
        }).catch(error => {
          console.log(`An error occurred in download worker thread.`, {
            identifier: i,
            startIndex: startIndex,
            endIndex: endIndex,
            error: error,
          });

          reject(error);
        });

        startIndex = endIndex;
        endIndex += urlsPerThread;
      }
    });
  }

  private spawnWorkerAsync<Type>(workerData: Record<string, any>,
      callback: Function,
      parameters?: Record<string, any>): Promise<Type[]> {
    return new Promise((resolve, reject) => {
      const {startIndex, endIndex} = workerData;
      let count = 0;
      const urlCount = endIndex - startIndex;
      const contents: Type[] = [];
      const worker = new Worker(__filename, {
        workerData: workerData
      });

      worker.on('message', (value) => {
        const content = callback(value, parameters);

        contents.push(content);

        count++;

        count === urlCount && resolve(contents);
      });

      worker.on('error', (error) => {
        console.log(error);

        reject(error);
      });

      worker.on('exit', (exitCode) => {
        if (exitCode !== 0) {
          const error = new Error(`Worker stopped with exit code ${exitCode}`);

          console.log(error);

          reject(error);
        }
      });
    });
  }

  private static extractPath(host: string, urlString: string): string {
    const indexOfHost = urlString.indexOf(host);

    if (indexOfHost === -1) { return ''; }

    const path = urlString.substring(indexOfHost + host.length);

    return path;
  }

  private static downloadStringAsync(urlString: string, timeout?: number): Promise<string> {
    return new Promise<string>((resolve, reject) => {
      // parses the URL...
      const url = new URL(urlString);
      // checks if protocol is 'http' or 'https'...
      const isHttpsProtocol = urlString.startsWith('https://');
      // selects appropriate http client based on URL...
      const httpClient = isHttpsProtocol ? https : http;
      // creating agent and setting 'keepAlive'...
      const agent = new httpClient.Agent({ keepAlive: true });
      // preparing request options...
      const requestOptions: RequestOptions = {
        method: 'GET',
        host: url.host,
        port: isHttpsProtocol ? 443 : 80,
        path: this.extractPath(url.host, urlString),
        agent: agent,
        headers: { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36' },
      };
      // requests for content...
      // ################## THIS IS WHERE THE BLOCK IS. REQUESTS WAIT HERE FOR OTHER REQUESTS TO FINISH #########################
      const request = httpClient.get(requestOptions, (response) => {
        const { statusCode } = response;

        if (statusCode === 200) {
          // initializing content with empty string...
          let content = '';

          // when data is received, it appends received chunk to content...
          response.on('data', (chunk) => {
            content += chunk;
          });

          // when data receive ends, it resolves content...
          response.on('end', () => {
            resolve(content);
          });
        } else if (response.statusCode === 301 || response.statusCode === 302) {
          const { location } = response.headers;

          if (!location) {
            reject(new Error(`Server responded with status code ${statusCode} for URL ${urlString} with no location set in the header.`));

            return;
          }

          // NOTE: NEED TO ADD SUPPORT FOR RELATIVE PATH IN LOCATION HEADER...
          // follows redirects until 200 status code is received...
          this.downloadStringAsync(location)
            .then(content => resolve(content))
            .catch(error => reject(error));
        } else {
          reject(new Error(`Server responded with status code ${statusCode} for URL ${urlString}.`));
        }
      });

      // handles error...
      request.on('error', error => { reject(error); });

      // if timeout is provided, we set request timeout...
      timeout && request.setTimeout(timeout, () => {
        // destroys the request object on timeout...
        request.destroy();
        // rejects a new error...
        reject(new Error(`Request timed out for URL ${urlString} after ${timeout / 1000.0} seconds.`));
      });
    });
  }

  public static executeWorkerTask(): void {
    const {
      startIndex, endIndex, urls,
      identifier, timeout,
    } = workerData;

    for (let i = startIndex; i < endIndex; i++) {
      this.downloadStringAsync(urls[i], timeout)
        .then((content) => {
          parentPort?.postMessage({
            identifier: identifier,
            url: urls[i],
            index: i,
            content: content,
          });
        })
        .catch((error) => {
          parentPort?.postMessage({
            identifier: identifier,
            url: urls[i],
            index: i,
            error: error,
          });
        });
    }
  }
}

// if this file isn't executed by main thread...
if (!isMainThread) {
  // executes worker thread task...
  Downloader.executeWorkerTask();
} else {
  const downloader = new Downloader();

  downloader.downloadStringsAsync<string>([
    'https://www.google.com',
    'https://www.wikipedia.com',
    'https://www.facebook.com',
    'https://www.youtube.com',
    'https://www.wwe.com',
  ], (response: any, parameters?: Record<string, any>) => {
    const {url, content, error, identifier} = response;

    if (error || !content || content.length === 0) { return ''; }

    return content;
  }).then(contents => {
    console.log(contents.length);
  }).catch(error => {
    console.log(error);
  });
}

代码正在运行,但 HTTP 请求是同步发出的,等待彼此。非常感谢任何帮助。

请不要建议任何第 3 方模块。我正在使用带有 target =“es2020”和 lib = [“es2020”] 的 Typescript v4.3.4。

提前致谢。

标签: node.jstypescripthttpasynchronousnode-worker-threads

解决方案


推荐阅读