C#集合之并发集合的用法
作者:Ruby_Lu 发布时间:2023-12-24 09:30:21
.NET 4 开始,在System.Collection.Concurrent中提供了几个线程安全的集合类。线程安全的集合可防止多个线程以相互冲突的方式访问集合。
为了对集合进行线程安全的访问,定义了IProducerConsumerCollection<T>接口。这个接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作就可能失败。TryAdd()方法返回一个布尔值,以说明操作成功还是失败。TryTake()同样,在成功时返回集合中的项。
*ConcurrentQueue<T>————这个集合类用一种免锁定的算法实现,使用在内部合并到一个链表中的32项数组。该类有Enqueue(),TryDequeue()和TryPeek()方法。因为这个类实现了 IProducerConsumerCollection<T>接口,所以TryAdd()和TryTake()方法仅调用Enqueue和TryDequeue方法。
*ConcurrentStack<T>————类似于ConcurrentQueue<T>。该类定义了Push(),PushRange(),TryPeek(),TryPop()和TryPopRange()方法。在内部这个类使用其元素的链表。
*ConcurrentBag<T>————该类没有定义添加或提取项的任何顺序。这个类使用一个线程映射到内部使用的数组上的概念,因此尝试减少锁定。方法:Add(),TryPeek(),TryTake()。
*ConcurrentDictionary<TKey,TValue>————这是一个线程安全的键值集合。TryAdd(),TryGetValue(),TryRemove()和TryUpdate()方法以非阻塞的方式访问成员。因为元素基于键和值,所以ConcurrentDictionary<TKey,TValue>没有实现IProducerConsumerCollection<T>接口。
*BlockingCollection<T>————这个集合在可以添加或提取元素之前,会阻塞线程并一直等待。BlockingCollection<T>集合提供了一个接口,以使用Add()和Take()方法来删除和添加元素。这些方法会阻塞线程。Add()方法有一个重载版本,其中可以给该重载版本传递一个CancellationToken令牌。这个令牌允许取消被阻塞的调用。如果不希望无限的等待下去,且不希望从外部取消调用,就可以使用TryAdd()和TryTake()方法,在这些方法中,也可以指定一个超时值。
BlockingCollection<T>是对实现了 IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还可以给构造函数传递任何实现了 IProducerConsumerCollection<T>接口的类。
下面使用一个例子演示BlockingCollection<T>的使用,一个任务向一个集合写入,同时另一个任务从这个集合读取。
static void Main(string[] args)
{
StartPipeline();
Console.ReadLine();
}
private static async void StartPipeline()
{
//存储文件名
var fileNames = new BlockingCollection<string>();
//存储文件的每一行内容
var lines = new BlockingCollection<string>();
//存储每一行的每个单词,单词为键,单词个数为值
var words = new ConcurrentDictionary<string, int>();
//存储words信息
var items = new BlockingCollection<Info>();
var coloredItems = new BlockingCollection<Info>();
Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
ConsoleHelper.WriteLine("started stage 1");
Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
ConsoleHelper.WriteLine("started stage 2");
Task t3 = PipelineStages.ProcessContentAsync(lines, words);
await Task.WhenAll(t1, t2, t3);
ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
//当上面三个任务完成时,才执行下面的任务
Task t4 = PipelineStages.TransferContentAsync(words, items);
Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
Task t6 = PipelineStages.ShowContentAsync(coloredItems);
ConsoleHelper.WriteLine("stages 4, 5, 6 started");
await Task.WhenAll(t4, t5, t6);
ConsoleHelper.WriteLine("all stages finished");
}
public static class PipelineStages
{
public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
{
return Task.Run(() =>
{
foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
{
output.Add(filename);
ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
}
//调用CompleteAdding,通知所有读取器不应再等待集合中的任何额外项
//如果不调用该方法,读取器会在foreach循环中等待更多的项被添加
output.CompleteAdding();
});
}
public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
{
//使用读取器读取集合时,需要使用GetConsumingEnumerable获取阻塞集合的枚举器,
//如果直接使用input迭代集合,这只会迭代当前状态的集合,不会迭代以后添加的项
foreach (var filename in input.GetConsumingEnumerable())
{
using (FileStream stream = File.OpenRead(filename))
{
var reader = new StreamReader(stream);
string line = null;
while ((line = await reader.ReadLineAsync()) != null)
{
output.Add(line);
ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
}
}
}
output.CompleteAdding();
}
public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
{
return Task.Run(() =>
{
foreach (var line in input.GetConsumingEnumerable())
{
string[] words = line.Split(' ', ';', '\t', '{', '}', '(', ')', ':', ',', '"');
foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
{
//这里使用了字典的一个扩展方法
output.AddOrIncrementValue(word);
ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
}
}
});
}
public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
{
return Task.Run(() =>
{
foreach (var word in input.Keys)
{
int value;
if (input.TryGetValue(word, out value))
{
var info = new Info { Word = word, Count = value };
output.Add(info);
ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
}
}
output.CompleteAdding();
});
}
public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
{
return Task.Run(() =>
{
foreach (var item in input.GetConsumingEnumerable())
{
if (item.Count > 40)
{
item.Color = "Red";
}
else if (item.Count > 20)
{
item.Color = "Yellow";
}
else
{
item.Color = "Green";
}
output.Add(item);
ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
}
output.CompleteAdding();
});
}
public static Task ShowContentAsync(BlockingCollection<Info> input)
{
return Task.Run(() =>
{
foreach (var item in input.GetConsumingEnumerable())
{
ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
}
});
}
}
//创建一个字典的扩展方法
public static class ConcurrentDictionaryExtension
{
public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
{
bool success = false;
while (!success)
{
int value;
if (dict.TryGetValue(key, out value))
{
if (dict.TryUpdate(key, value + 1, value))
{
success = true;
}
}
else
{
if (dict.TryAdd(key, 1))
{
success = true;
}
}
}
}
}
这里使用了一个管道模型的编程模式,上面的添加内容,下面处理内容
来源:https://www.cnblogs.com/afei-24/p/6836976.html
猜你喜欢
- 一、引言在许多编程语言中,都有函数回调这一概念。C 和 C++ 中有函数指针,因此可以将函数作为参数传给其它函数,以便过后调用。而在 Jav
- 一、在学习枚举之前,首先来听听枚举的优点。1、枚举能够使代码更加清晰,它允许使用描述性的名称表示整数值。2、枚举使代码更易于维护,有助于确保
- 池塘里养:Object;一、设计与原理1、基础案例首先看一个基于common-pool2对象池组件的应用案例,主要有工厂类、对象池、对象三个
- 在实战中学习Spring,本系列的最终目的是完成一个实现用户注册登录功能的项目。预想的基本流程如下:1、用户网站注册,填写用户名、密码、em
- 本文实例讲述了C#使用GDI+创建缩略图的方法,分享给大家供大家参考。具体方法分析如下:C#的Gdi+还是相当好用的。创建缩略图步骤如下:1
- 在一个项目中,如果我们既用到了Struts2又用到了Servlet,项目运行时有可能无法正常访问Servlet,原因是在配置Struts的过
- 前言本文是精讲RestTemplate第8篇,前篇的blog访问地址如下:RestTemplate在Spring或非Spring环境下使用精
- 背景:用习惯了idea再去用eclipse实在用的不习惯,于是将老的eclipse项目导入到eclipse,网上有很多教程,看了很多博客都不
- 本文实例讲述了C# WinForm制作异形窗体与控件的方法。分享给大家供大家参考,具体如下:制作异形窗体或控件的思路一般都是想办法生成一个r
- 方法的覆盖在类继承中,子类可以修改从父类继承来的方法,也就是说子类能创建一个与父类方法有不同功能的方法,但具有相同的名称、返回值类型、参数列
- 是否还记得在博文「IntelliJ IDEA 安装目录的核心文件讲解」中,这张充满神秘色彩的图片呢?进入她,让我们一起感受她的魅力吧!如上图
- 0.写在前面2020-5-18更新这个东西已经是两年前的了,现在问我具体细节我也不是很清楚了,而且现在review两年前的代码感觉写的好烂。
- 一、概念哈希算法(hash algorithm):是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值。哈希表(hash
- 前言这几天看《Java并发编程之美》的时候又遇到了ThradLocal这个类,不得不说,这个类在平时很多场景都遇得到,所以对其进行一个系统性
- android的快捷方式比较简单,就是发一个系统的广播,然后为快捷方式设置Intent---package com.xikang.andro
- 一、什么是过滤器过滤器是对数据进行过滤,预处理过程,当我们访问网站时,有时候会发布一些敏感信息,发完以后有的会用*替代,还有就是登陆权限控制
- 一、Unity Shader基础1、创建和使用Shader在Unity中Shader一般由两种用途:指定给材质,用于物理渲染;指定给脚本,用
- 在基于UI元素的自动化测试中, 无论是桌面的UI自动化测试,还是Web的UI自动化测试. 首先我们需要查找和识别UI
- 什么是JMX?什么是JMX,Java Management Extensions,即Java管理扩展,是一个为应用程序、设备、系统等植入管理
- 本次内容主要介绍基于Ehcache 3.0来快速实现Spring Boot应用程序的数据缓存功能。在Spring Boot应用程序中,我们可