首页 > 解决方案 > ConcurrentBag 跳过一些项目 C#

问题描述

我正在使用 concurrentbag 来抓取 URL,现在它对于 500 / 100 个 url 工作正常,但是当我试图抓取 8000 个 url 时。所有未处理的 URL 和 inputQueue 中待处理的一些项目。

但我正在使用 while (!inputQueue.IsEmpty) 。因此,它应该循环运行,直到输入队列中存在任何项目。

我只想最多运行 100 个线程。因此,我首先创建 100 个线程并调用“Run()”方法,在该方法中我运行一个循环来获取项目,直到项目退出输入队列并在抓取 url 后添加到输出队列中。

public ConcurrentBag<Data> inputQueue = new ConcurrentBag<Data>();
    public ConcurrentBag<Data> outPutQueue = new ConcurrentBag<Data>();

    public List<Data> Scrapes(List<Data> scrapeRequests)
    {
        ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true;
        string proxy_session_id = new Random().Next().ToString();

        numberOfRequestSent = 0;

        watch.Start();

        foreach (var sRequest in scrapeRequests)
        {
            inputQueue.Add(sRequest);
        }
        //inputQueue.CompleteAdding();

        var taskList = new List<Task>();
        for (var i = 0; i < n_parallel_exit_nodes; i++) //create 100 threads only
        {
            taskList.Add(Task.Factory.StartNew(async () =>
            {
               await Run();
            }, TaskCreationOptions.RunContinuationsAsynchronously));
        }

        Task.WaitAll(taskList.ToArray());   //Waiting

        //print result
        Console.WriteLine("Number Of URLs Found - {0}", scrapeRequests.Count);
        Console.WriteLine("Number Of Request Sent - {0}", numberOfRequestSent);

        Console.WriteLine("Input Queue - {0}", inputQueue.Count);

        Console.WriteLine("OutPut Queue - {0}", outPutQueue.ToList().Count);
        Console.WriteLine("Success - {0}", outPutQueue.ToList().Where(x=>x.IsProxySuccess==true).Count().ToString());
        Console.WriteLine("Failed - {0}", outPutQueue.ToList().Where(x => x.IsProxySuccess == false).Count().ToString());
        Console.WriteLine("Process Time In - {0}", watch.Elapsed);

        return outPutQueue.ToList();
    }


    async Task<string> Run()
    {
        while (!inputQueue.IsEmpty)
        {
            var client = new Client(super_proxy_ip, "US");

            if (!client.have_good_super_proxy())
                client.switch_session_id();
            if (client.n_req_for_exit_node == switch_ip_every_n_req)
                client.switch_session_id();

            var scrapeRequest = new ProductResearch_ProData();
            inputQueue.TryTake(out scrapeRequest);

            try
            {
                numberOfRequestSent++;

                // Console.WriteLine("Sending request for - {0}", scrapeRequest.URL);
                scrapeRequest.HTML = client.DownloadString((string)scrapeRequest.URL);
                //Console.WriteLine("Response done for - {0}", scrapeRequest.URL);

                scrapeRequest.IsProxySuccess = true;

                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response();
            }
            catch (WebException e)
            {
                Console.WriteLine("Failed");

                scrapeRequest.IsProxySuccess = false;
                Console.WriteLine(e.Message);
                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response(e);
            }

            client.clean_connection_pool();
            client.Dispose();
        }

        return await Task.Run(() => "Done");
    }

标签: c#multithreadingconcurrencytask-parallel-library

解决方案


这里有多个问题,但似乎没有一个是导致inputQueue.Count最终值非零的原因。无论如何,我想指出我能看到的问题。

var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) // create 100 threads only
{
    taskList.Add(Task.Factory.StartNew(async () =>
    {
        await Run();
    }, TaskCreationOptions.RunContinuationsAsynchronously));
}

该方法Task.Factory.StartNew不理解异步委托,因此当使用异步 lambda 作为参数调用它时,它会返回一个嵌套任务。在这种情况下,它返回一个Task<Task<string>>. 您将此嵌套任务存储在List<Task>集合中,这是可能的,因为类型Task<TResult>继承自 type Task,但是这样做您将失去等待内部任务完成(并获得结果)的能力。您只持有对外部任务的引用。奇迹般的是,在这种情况下这不是问题(通常是),因为外部任务完成了所有工作,而内部任务基本上什么都不做(除了使用线程池线程返回一个"Done"在任何地方都不需要的字符串) .

您也没有将任何延续附加到外部任务,因此该标志TaskCreationOptions.RunContinuationsAsynchronously似乎是多余的。

// create 100 threads only

您不会创建 100 个线程,而是创建 100 个任务。这些任务被安排在 中ThreadPool,由于任务长时间运行,这些任务将立即被饿死,并且将开始每 500 毫秒注入一个新线程,直到所有计划任务都分配给一个线程。

var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);

在这里,您实例化了一个类型的对象,该对象ProductResearch_ProData立即被丢弃并在下一行有资格进行垃圾收集。该TryTake方法将返回从包中删除的对象,或者null如果包为空。您忽略了该方法的返回值TryTake,这完全有可能是false因为同时袋子可能已被另一个工人清空,然后继续执行scrapeRequest可能具有 null 值的 a ,导致这种情况为 a NullReferenceException

值得注意的是,您ProductResearch_ProData从 a中提取了一个类型的对象ConcurrentBag<Data>,因此该类Data继承自基类ProductResearch_ProData,或者代码中存在转录错误。


推荐阅读