c# 使用Task实现非阻塞式的I/O操作
作者:一只独行的猿 发布时间:2023-07-21 23:27:39
在前面的《基于任务的异步编程模式(TAP)》文章中讲述了.net 4.5框架下的异步操作自我实现方式,实际上,在.net 4.5中部分类已实现了异步封装。如在.net 4.5中,Stream类加入了Async方法,所以基于流的通信方式都可以实现异步操作。
1、异步读取文件数据
public static void TaskFromIOStreamAsync(string fileName)
{
int chunkSize = 4096;
byte[] buffer = new byte[chunkSize];
FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);
Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
task.ContinueWith((readTask) =>
{
int amountRead = readTask.Result;
//必须在ContinueWith中释放文件流
fileStream.Dispose();
Console.WriteLine($"Async(Simple) Read {amountRead} bytes");
});
}
上述代码中,异步读取数据只读取了一次,完成读取后就将执行权交还主线程了。但在真实场景中,需要从流中读取多次才能获得全部的数据(如文件数据大于给定缓冲区大小,或处理来自网络流的数据(数据还没全部到达机器))。因此,为了完成异步读取操作,需要连续从流中读取数据,直到获取所需全部数据。
上述问题导致需要两级Task来处理。外层的Task用于全部的读取工作,供调用程序使用。内层的Task用于每次的读取操作。
第一次异步读取会返回一个Task。如果直接返回调用Wait或者ContinueWith的地方,会在第一次读取结束后继续向下执行。实际上是希望调用者在完成全部读取操作后才执行。因此,不能把第一个Task发布会给调用者,需要一个“伪Task”在完成全部读取操作后再返回。
上述问题需要使用到TaskCompletionSource<T>类解决,该类可以生成一个用于返回的“伪Task”。当异步读取操作全部完成后,调用其对象的TrySetResult,让Wait或ContinueWith的调用者继续执行。
public static Task<long> AsynchronousRead(string fileName)
{
int chunkSize = 4096;
byte[] buffer = new byte[chunkSize];
//创建一个返回的伪Task对象
TaskCompletionSource<long> tcs = new TaskCompletionSource<long>();
MemoryStream fileContents = new MemoryStream();//用于保存读取的内容
FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);
fileContents.Capacity += chunkSize;//指定缓冲区大小。好像Capacity会自动增长,设置与否没关系,后续写入多少数据,就增长多少
Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
//在ContinueWith中循环读取,读取完成后,再返回tcs的Task
return tcs.Task;
}
/// <summary>
/// 继续读取数据
/// </summary>
/// <param name="task">读取数据的线程</param>
/// <param name="fileStream">文件流</param>
/// <param name="fileContents">文件存放位置</param>
/// <param name="buffer">读取数据缓存</param>
/// <param name="tcs">伪Task对象</param>
private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs)
{
if (task.IsCompleted)
{
int bytesRead = task.Result;
fileContents.Write(buffer, 0, bytesRead);//写入内存区域。似乎Capacity会自动增长
if (bytesRead > 0)
{
//虽然看似是一个新的任务,但是使用了ContinueWith,所以使用的是同一个线程。
//没有读取完,开启另一个异步继续读取
Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length);
//此处做了一个循环
newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
}
else
{
//已经全部读取完,所以需要返回数据
tcs.TrySetResult(fileContents.Length);
fileStream.Dispose();
fileContents.Dispose();//应该是在使用了数据之后才释放数据缓冲区的数据
}
}
}
2、适应Task的异步编程模式
.NET Framework中的旧版异步方法都带有“Begin-”和“End-”前缀。这些方法仍然有效,为了接口的一致性,它们可以被封装到Task中。
FromAsyn方法把流的BeginRead和EndRead方法作为参数,再加上存放数据的缓冲区。BeginRead和EndRead方法会执行,并在EndRead完成后调用Continuation Task,把控制权交回主代码。上述例子会关闭流并返回转换的数据
const int ReadSize = 256;
/// <summary>
/// 从文件中获取字符串
/// </summary>
/// <param name="path">文件路径</param>
/// <returns>字符串</returns>
public static Task<string> GetStringFromFile(string path)
{
FileInfo file = new FileInfo(path);
byte[] buffer = new byte[1024];//存放数据的缓冲区
FileStream fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length,
FileOptions.DeleteOnClose | FileOptions.Asynchronous);
Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
buffer, 0, ReadSize, null);//此参数为BeginRead需要的参数
TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs));
return tcs.Task;
}
/// <summary>
/// 读取数据
/// </summary>
/// <param name="taskRead">读取任务</param>
/// <param name="fileStream">文件流</param>
/// <param name="buffer">读取数据存放位置</param>
/// <param name="offset">读取偏移量</param>
/// <param name="tcs">伪Task</param>
private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs)
{
int readLength = taskRead.Result;
if (readLength > 0)
{
int newOffset = offset + readLength;
Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null);
task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs));
}
else
{
tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length));
fileStream.Dispose();
}
}
3、使用async 和 await方式读取数据
下面的示例中,使用了async和await关键字实现异步读取一个文件的同时进行压缩并写入另一个文件。所有位于await关键字之前的操作都运行于调用者线程,从await开始的操作都是在Continuation Task中运行。但有无法使用这两个关键字的场合:①Task的结束时机不明确时;②必须用到多级Task和TaskCompletionSource时
/// <summary>
/// 同步方法的压缩
/// </summary>
/// <param name="lstFiles">文件清单</param>
public static void SyncCompress(IEnumerable<string> lstFiles)
{
byte[] buffer = new byte[16384];
foreach(string file in lstFiles)
{
using (FileStream inputStream = File.OpenRead(file))
{
using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
{
using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
{
int read = 0;
while((read=inputStream.Read(buffer,0,buffer.Length))>0)
{
compressStream.Write(buffer, 0,read);
}
}
}
}
}
}
/// <summary>
/// 异步方法的文件压缩
/// </summary>
/// <param name="lstFiles">需要压缩的文件</param>
/// <returns></returns>
public static async Task AsyncCompress(IEnumerable<string> lstFiles)
{
byte[] buffer = new byte[16384];
foreach(string file in lstFiles)
{
using (FileStream inputStream = File.OpenRead(file))
{
using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
{
using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
{
int read = 0;
while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await compressStream.WriteAsync(buffer, 0, read);
}
}
}
}
}
}
来源:https://www.cnblogs.com/pilgrim/p/11197503.html
猜你喜欢
- 本文实例为大家分享了C#超市收银系统设计的具体代码,供大家参考,具体内容如下1.登录界面代码如下:using System;using Sy
- 一直使用Eclipse环境开发Android,也尝鲜使用过Android Studio去开发,各种IDE配合Android SDK及SDK原
- 本文实例讲述了C++语言实现线性表之链表实现方法。分享给大家供大家参考。具体分析如下:插入、删除结点的代码有点多,但这样提高了代码的可读性,
- 1. 并行和并发有什么区别?并行:多个处理器或多核处理器同时处理多个任务。并发:多个任务在同一个 CPU 核上,按细分的时间片轮流(交替)执
- 1、找准入口,使用ClassPathXmlApplicationContext的构造方法加载配置文件,用于加载classPath下的配置文件
- 各位亲们可以尝试以下代码:注:这里我就只有一个html标签对来说明问题了,首部之类的东西,自己添加。<html> &n
- 背景:在Android中按照数据保存的方式,可以分为如下几种Content Provider (用的SQLite实现),SQLite,Sha
- 本文实例讲述了C++实现的O(n)复杂度内查找第K大数算法。分享给大家供大家参考,具体如下:题目:是在一组数组(数组元素为整数,可正可负可为
- android中提供了4中动画: AlphaAnimation 透明度动画效果 ScaleAnimation 缩放动画效果 Translat
- 1.ArrayList 是基数组结构的,需要连续的内存空间从构造函数可以看出,ArrayList内部用一个Object数组来保存数据。对于无
- 本文实例为大家分享了C语言实现两个矩阵相乘的具体代码,供大家参考,具体内容如下程序功能:实现两个矩阵相乘的C语言程序,并将其输出代码如下:#
- 好久没有做web了,JSON目前比较流行,闲得没事,所以动手试试将对象序列化为JSON字符(尽管DotNet Framework
- 背景后台系统需要接入 企业微信登入,满足企业员工快速登入系统流程图简单代码说明自定义一套 springsecurity 认证逻辑主要就是 根
- 已知两个链表list1和list,2,各自非降序排列,将它们合并成另外一个链表list3,并且依然有序,要求保留所有节点。实现过程中,lis
- Console.WriteLine("This is a Client, host name is {0}", Dns.
- 前言Flutter (Channel stable, 2.10.3, on Microsoft Windows [Version 10.0.
- 在windows环境下,我们通常在IDE如VS的工程中开发C++项目,对于生成和使用静态库(*.lib)与动态库(*.dll)可能都已经比较
- 什么是队列结构一种线性结构,具有特殊的运算法则【只能在一端(队头)删除,在另一端(队尾)插入】。分类:顺序队列结构链式队列结构基本操作:入队
- 现如今打开一个 App,比如头条、微博,都会有长列表,随着我们不断地滑动,视窗内的内容也会不断地更新。今天就用 Flutter 实现一下这种
- 1、右值1.1 简介首先区分一下左右值:左值是指存储在内存中、有明确存储地址(可取地址)的数据;右值是指可以提供数据值的数据(不可取地址)如