在Parallel.ForEach中嵌套等待

在Metro应用程序中,我需要执行许多WCF调用。需要进行大量调用,因此我需要在并行循环中进行调用。问题在于并行循环在WCF调用全部完成之前退出。

您将如何重构它以使其按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>

{

ICustomerRepo repo = new CustomerRepo();

var cust = await repo.GetCustomer(i);

customers.Add(cust);

});

foreach ( var customer in customers )

{

Console.WriteLine(customer.ID);

}

Console.ReadKey();

回答:

背后的整个想法Parallel.ForEach()是,您有一组线程,每个线程处理集合的一部分。正如您所注意到的,这不适用于async-

await,您想在异步调用期间释放线程。

您可以通过阻塞ForEach()线程来“修复”该问题,但这使async- 的全部观点无效await

您可以做的是使用TPL Dataflow代替Parallel.ForEach(),它Task很好地支持异步。

具体来说,您的代码可以使用a编写TransformBlock,该代码Customer使用asynclambda 将每个id转换为a

。可以将该块配置为并行执行。您可以将该块链接到,ActionBlock然后将每个块写入Customer控制台。设置区块网络后,您可以将Post()每个ID分配到TransformBlock

在代码中:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(

async i =>

{

ICustomerRepo repo = new CustomerRepo();

return await repo.GetCustomer(i);

}, new ExecutionDataflowBlockOptions

{

MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded

});

var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));

getCustomerBlock.LinkTo(

writeCustomerBlock, new DataflowLinkOptions

{

PropagateCompletion = true

});

foreach (var id in ids)

getCustomerBlock.Post(id);

getCustomerBlock.Complete();

writeCustomerBlock.Completion.Wait();

尽管您可能希望将的并行性限制TransformBlock为一些小常数。另外,您可以限制的容量,TransformBlock并使用异步添加项目SendAsync(),例如,如果集合太大。

与您的代码(如果可行)相比,另一个好处是,写入将在单个项目完成后立即开始,而不必等到所有处理都完成了。

以上是 在Parallel.ForEach中嵌套等待 的全部内容, 来源链接: utcz.com/qa/419138.html

回到顶部