c# - ConcurrentBag 跳过一些项目 C#
问题描述
我正在使用 concurrentbag 来抓取 URL,现在它对于 500 / 100 个 url 工作正常,但是当我试图抓取 8000 个 url 时。所有未处理的 URL 和 inputQueue 中待处理的一些项目。
但我正在使用 while (!inputQueue.IsEmpty) 。因此,它应该循环运行,直到输入队列中存在任何项目。
我只想最多运行 100 个线程。因此,我首先创建 100 个线程并调用“Run()”方法,在该方法中我运行一个循环来获取项目,直到项目退出输入队列并在抓取 url 后添加到输出队列中。
public ConcurrentBag<Data> inputQueue = new ConcurrentBag<Data>();
public ConcurrentBag<Data> outPutQueue = new ConcurrentBag<Data>();
public List<Data> Scrapes(List<Data> scrapeRequests)
{
ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true;
string proxy_session_id = new Random().Next().ToString();
numberOfRequestSent = 0;
watch.Start();
foreach (var sRequest in scrapeRequests)
{
inputQueue.Add(sRequest);
}
//inputQueue.CompleteAdding();
var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) //create 100 threads only
{
taskList.Add(Task.Factory.StartNew(async () =>
{
await Run();
}, TaskCreationOptions.RunContinuationsAsynchronously));
}
Task.WaitAll(taskList.ToArray()); //Waiting
//print result
Console.WriteLine("Number Of URLs Found - {0}", scrapeRequests.Count);
Console.WriteLine("Number Of Request Sent - {0}", numberOfRequestSent);
Console.WriteLine("Input Queue - {0}", inputQueue.Count);
Console.WriteLine("OutPut Queue - {0}", outPutQueue.ToList().Count);
Console.WriteLine("Success - {0}", outPutQueue.ToList().Where(x=>x.IsProxySuccess==true).Count().ToString());
Console.WriteLine("Failed - {0}", outPutQueue.ToList().Where(x => x.IsProxySuccess == false).Count().ToString());
Console.WriteLine("Process Time In - {0}", watch.Elapsed);
return outPutQueue.ToList();
}
async Task<string> Run()
{
while (!inputQueue.IsEmpty)
{
var client = new Client(super_proxy_ip, "US");
if (!client.have_good_super_proxy())
client.switch_session_id();
if (client.n_req_for_exit_node == switch_ip_every_n_req)
client.switch_session_id();
var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);
try
{
numberOfRequestSent++;
// Console.WriteLine("Sending request for - {0}", scrapeRequest.URL);
scrapeRequest.HTML = client.DownloadString((string)scrapeRequest.URL);
//Console.WriteLine("Response done for - {0}", scrapeRequest.URL);
scrapeRequest.IsProxySuccess = true;
outPutQueue.Add(scrapeRequest); //add object to output queue
//lumanti code
client.handle_response();
}
catch (WebException e)
{
Console.WriteLine("Failed");
scrapeRequest.IsProxySuccess = false;
Console.WriteLine(e.Message);
outPutQueue.Add(scrapeRequest); //add object to output queue
//lumanti code
client.handle_response(e);
}
client.clean_connection_pool();
client.Dispose();
}
return await Task.Run(() => "Done");
}
解决方案
这里有多个问题,但似乎没有一个是导致inputQueue.Count
最终值非零的原因。无论如何,我想指出我能看到的问题。
var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) // create 100 threads only
{
taskList.Add(Task.Factory.StartNew(async () =>
{
await Run();
}, TaskCreationOptions.RunContinuationsAsynchronously));
}
该方法Task.Factory.StartNew
不理解异步委托,因此当使用异步 lambda 作为参数调用它时,它会返回一个嵌套任务。在这种情况下,它返回一个Task<Task<string>>
. 您将此嵌套任务存储在List<Task>
集合中,这是可能的,因为类型Task<TResult>
继承自 type Task
,但是这样做您将失去等待内部任务完成(并获得结果)的能力。您只持有对外部任务的引用。奇迹般的是,在这种情况下这不是问题(通常是),因为外部任务完成了所有工作,而内部任务基本上什么都不做(除了使用线程池线程返回一个"Done"
在任何地方都不需要的字符串) .
您也没有将任何延续附加到外部任务,因此该标志TaskCreationOptions.RunContinuationsAsynchronously
似乎是多余的。
// create 100 threads only
您不会创建 100 个线程,而是创建 100 个任务。这些任务被安排在 中ThreadPool
,由于任务长时间运行,这些任务将立即被饿死,并且将开始每 500 毫秒注入一个新线程,直到所有计划任务都分配给一个线程。
var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);
在这里,您实例化了一个类型的对象,该对象ProductResearch_ProData
立即被丢弃并在下一行有资格进行垃圾收集。该TryTake
方法将返回从包中删除的对象,或者null
如果包为空。您忽略了该方法的返回值TryTake
,这完全有可能是false
因为同时袋子可能已被另一个工人清空,然后继续执行scrapeRequest
可能具有 null 值的 a ,导致这种情况为 a NullReferenceException
。
值得注意的是,您ProductResearch_ProData
从 a中提取了一个类型的对象ConcurrentBag<Data>
,因此该类Data
继承自基类ProductResearch_ProData
,或者代码中存在转录错误。
推荐阅读
- assembly - 为什么 AL 寄存器周围没有括号?
- templates - D中的柯里化函数模板?
- git - 在 digitalocean 上使用 git 命令更新我的项目
- typescript - 未定义中的 TypeScript 错误(未定义,未定义)
- javascript - 如何让 node.js 中的调试器与检查一起工作
- python - 如何将基类对象转换为子类对象?
- r - 在 R 中更改 dygraph 中图例的位置
- ios - 如何阻止一些 Firebase 推送通知?
- java - Android Studio 3.5 开销
- linux - 如何将python脚本放入jetson nano启动?