threadmsg_demo.zip ~ 41KB 下载
threadmsg_src.zip ~ 65KB 下载
微软在 .NET 框架中供应了多种实用的线程同步伎俩,其中包括 monitor 类及 reader-writer锁。但跨进程的同步方法还是非常完美。此外,目前也没有便当的线程间及进程间传递消息的方法。例如C/S和SOA,又也许生产者/破费者方式中就常常需求传递消息。为此我编写了一个自力残缺的框架,完成了跨线程和跨进程的同步和通讯。这框架内包含了旗帜暗号量,信箱,内存映照文件,阻塞通道,及轻易消息流操纵器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD容许),你可以从这里下载到 www.cdrnet.net/projects/threadmsg/.
这个框架的目的是:
留心:我删除本文中全部代码的XML注释以节省空间。假设你想知道这些方法和参数的详细信息,请参考附件中的代码。
应用了这个库后,跨进程的消息传递将变得非常轻易。我将用一个小例子来作示范:一个操纵台次序,根据参数可以作为发送方也可以作为采取方运转。在发送次序里,你可以输出肯定的文本并发送到信箱内(前去key),采取次序将显示一切从信箱内收到的消息。你可以运转有数个发送次序和采取次序,可是每个消息只会被具体的某一个采取次序所收到。
[Serializable] struct Message { public string Text; } class Test { IMailBox mail; public Test() { mail = new ProcessMailBox("TMProcessTest",1024); } public void RunWriter() { Console.WriteLine("Writer started"); Message msg; while(true) { msg.Text = Console.ReadLine(); if(msg.Text.Equals("exit")) break; mail.Content = msg; } } public void RunReader() { Console.WriteLine("Reader started"); while(true) { Message msg = (Message)mail.Content; Console.WriteLine(msg.Text); } } [STAThread] static void Main(string[] args) { Test test = new Test(); if(args.Length > 0) test.RunWriter(); else test.RunReader(); } }
信箱一旦创建当前(这上面代码里是 ProcessMailBox ),采取消息只需求读取 Content 属性,发送消息只需求给这个属性赋值。当没有数据时,取得消息将会阻塞当前线程;发送消息时假设信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,全体次序是完好基于中断的,并且不会过火占用CPU(不需务实行轮询)。发送和采取的消息可所以尽情支持序列化(Serializable)的类型。
可是,理论上面前爆发的任务有点复杂:消息颠末内存映照文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在 pagefile 外面发作虚拟文件。对这个虚拟文件的访问是颠末 win32 旗帜暗号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需求声明Serializable属性。内存映照文件和 win32 旗帜暗号量都需求调用 NT内核的方法。多得了 .NET 框架中的 Marshal 类,我们可以避免编写不服安的代码。我们将不才面谈判更多的细节。
线程/进程间的通讯需求共享内存也许其他内建机制来发送/采取数据。即使是采用共享内存的办法,也还需求一组同步方法来容许并发访问。
一致个进程内的一切线程都共享公共的逻辑地址空间(堆)。关于不合进程,从 win2000 末尾就已经没法共享内存。可是,不合的进程可以读写一致个文件。WinAPI供应了多种系统调用方法来映照文件到进程的逻辑空间,及访问系统外调工具(会话)指向的 pagefile 外面的虚拟文件。不论是共享堆,还是共享文件,并发访问都有可能招致数据不一致。我们就这个问题轻易谈判一下,该怎么确保线程/进程调用的有序性及数据的一致性。
.NET 框架和 C# 供应了便当直观的线程同步方法,即 monitor 类和 lock 语句(本文将不会谈判 .NET 框架的互斥量)。关于线程同步,当然本文供应了其他方法,我们还是举荐应用 lock 语句。
void Work1() { NonCriticalSection1(); Monitor.Enter(this); try { CriticalSection(); } finally { Monitor.Exit(this); } NonCriticalSection2(); }
void Work2() { NonCriticalSection1(); lock(this) { CriticalSection(); } NonCriticalSection2(); }
Work1 和 Work2 是等价的。在C#外面,十分多人喜好第二个方法,因为它更短,且不随便出错。
旗帜暗号量是经典的同步基本观念之一(由 Edsger Dijkstra 引入)。旗帜暗号量是指一个有计数器及两个把持的Tools。它的两个把持是:取得(也叫P也许等待),释放(也叫V也许收到旗帜暗号)。旗帜暗号量在取得把持时假设计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。当然旗帜暗号量的事理很轻易,可是完成起来有点费事。幸而,内建的 monitor 类有阻塞特点,可以用来完成旗帜暗号量。
public sealed class ThreadSemaphore : ISemaphore { private int counter; private readonly int max; public ThreadSemaphore() : this(0, int.Max) {} public ThreadSemaphore(int initial) : this(initial, int.Max) {} public ThreadSemaphore(int initial, int max) { this.counter = Math.Min(initial,max); this.max = max; } public void Acquire() { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this)) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this,timeout)) throw new SemaphoreFailedException(); } } public void Release() { lock(this) { if(counter >= max) throw new SemaphoreFailedException(); if(counter < 0) Monitor.Pulse(this); counter++; } } }
旗帜暗号量在复杂的阻塞情况下越发有效,例如我们后面将要谈判的通道(channel)。你也可以应用旗帜暗号量来完成临界区的排他性(以下面的 Work3),可是我还是举荐应用内建的 lock 语句,像上面的 Work2 那样。
请留心:假设应用不当,旗帜暗号量也是有埋伏风险的。精确的做法是:当取得旗帜暗号量失败时,千万不要再调用释放把持;当取得成功时,不论爆发了什么差错,都要记得释放旗帜暗号量。按照多么的绳尺,你的同步步崆准确的。Work3 中的 finally 语句就是为了保证精确释放旗帜暗号量。留心:取得旗帜暗号量( s.Acquire() )的把持必须放到 try 语句的外面,只需多么,当取得失败时才不会调用释放把持。
ThreadSemaphore s = new ThreadSemaphore(1); void Work3() { NonCriticalSection1(); s.Acquire(); try { CriticalSection(); } finally { s.Release(); } NonCriticalSection2(); }
为了调和不合进程访问一致本钱,我们需求用到上面谈判过的观念。很不幸,.NET 中的 monitor 类不成以跨进程应用。可是,win32 API供应的内核旗帜暗号量Tools可以用来完成跨进程同步。 Robin Galloway-Lunn 引见了怎么将 win32 的旗帜暗号量映照到 .NET 中(见 Using Win32 Semaphores in C# )。我们的完成也类似:
[DllImport("kernel32",EntryPoint="CreateSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint CreateSemaphore( SecurityAttributes auth, int initialCount, int maximumCount, string name); [DllImport("kernel32",EntryPoint="WaitForSingleObject", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint WaitForSingleObject( uint hHandle, uint dwMilliseconds); [DllImport("kernel32",EntryPoint="ReleaseSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool ReleaseSemaphore( uint hHandle, int lReleaseCount, out int lpPreviousCount); [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable { private uint handle; private readonly uint interruptReactionTime; public ProcessSemaphore(string name) : this( name,0,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial) : this( name,initial,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial, int max, int interruptReactionTime) { this.interruptReactionTime = (uint)interruptReactionTime; this.handle = NTKernel.CreateSemaphore(null, initial, max, name); if(handle == 0) throw new SemaphoreFailedException(); } public void Acquire() { while(true) { //looped 0.5s timeout to make NT-blocked threads interruptable. uint res = NTKernel.WaitForSingleObject(handle, interruptReactionTime); try {System.Threading.Thread.Sleep(0);} catch(System.Threading.ThreadInterruptedException e) { if(res == 0) { //Rollback int previousCount; NTKernel.ReleaseSemaphore(handle,1,out previousCount); } throw e; } if(res == 0) return; if(res != 258) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { uint milliseconds = (uint)timeout.TotalMilliseconds; if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0) throw new SemaphoreFailedException(); } public void Release() { int previousCount; if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) throw new SemaphoreFailedException(); } #region IDisposable Member public void Dispose() { if(handle != 0) { if(NTKernel.CloseHandle(handle)) handle = 0; } } #endregion }
有一点很首要:win32中的旗帜暗号量是可以命名的。这容许其他进程颠末名字来创建呼应旗帜暗号量的句柄。为了让阻塞线程可以中断,我们应用了一个(不好)的交换方法:应用超时和 Sleep(0)。我们需求中断来安全封锁线程。更好的做法是:判定没有线程阻塞当前才释放旗帜暗号量,多么次序才可以完好释放本钱并精确参加。
你可能也留心到了:跨线程和跨进程的旗帜暗号量都应用了相似的接口。一切相关的类都应用了这类方式,以完成上面布景引见中提到的封闭性。需求留心:出于功用思考,你不应该将跨进程的旗帜暗号量用到跨线程的场景,也不应该将跨线程的完成用到单线程的场景。
我们已经完成了跨线程和跨进程的共享本钱访问同步。可是传递/采取消息还需求共享本钱。关于线程来说,只需求声明一个类成员变量就可以够了。可是关于跨进程来说,我们需求应用到 win32 API 供应的内存映照文件(Memory Mapped Files,简称MMF)。应用 MMF和应用 win32 旗帜暗号量差不多。我们需求先调用 CreateFileMapping 方法来创建一个内存映照文件的句柄:
[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr CreateFileMapping(uint hFile, SecurityAttributes lpAttributes, uint flProtect, uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, uint dwDesiredAccess, uint dwFileOffsetHigh, uint dwFileOffsetLow, uint dwNumberOfBytesToMap); [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name, FileAccess access, int size) { if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null, (uint)access,0,(uint)size,name); if(fileMapping == IntPtr.Zero) throw new MemoryMappingFailedException(); return new MemoryMappedFile(fileMapping,size,access); }
我们渴望直接应用 pagefile 中的虚拟文件,所以我们用 -1(0xFFFFFFFF) 来作为文件句柄来创建我们的内存映照文件句柄。我们也指定了必填的文件大小,和呼应的称呼。多么其他进程就可以够颠末这个称呼来同时访问该映照文件。创建了内存映照文件后,我们就可以够映照这个文件不合的部分(颠末偏移量和字节大小来指定)到我们的进程地址空间。我们颠末 MapViewOfFile 系统方法来指定:
public MemoryMappedFileView CreateView(int offset, int size, MemoryMappedFileView.ViewAccess access) { if(this.access == FileAccess.ReadOnly && access == MemoryMappedFileView.ViewAccess.ReadWrite) throw new ArgumentException( "Only read access to views allowed on files without write access", "access"); if(offset < 0) throw new ArgumentException("Offset must not be negative","size"); if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, (uint)access,0,(uint)offset,(uint)size); return new MemoryMappedFileView(mappedView,size,access); }
在不服安的代码中,我们可以将前去的指针逼迫转换成我们指定的类型。固然如此,我们不渴望有不服安的代码存在,所以我们应用 Marshal 类来从中读写我们的数据。偏移量参数是用来从那边末尾读写数据,绝对指定的映照视图的地址。
public byte ReadByte(int offset) { return Marshal.ReadByte(mappedView,offset); } public void WriteByte(byte data, int offset) { Marshal.WriteByte(mappedView,offset,data); } public int ReadInt32(int offset) { return Marshal.ReadInt32(mappedView,offset); } public void WriteInt32(int data, int offset) { Marshal.WriteInt32(mappedView,offset,data); } public void ReadBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) data[i] = Marshal.ReadByte(mappedView,offset+i); } public void WriteBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) Marshal.WriteByte(mappedView,offset+i,data[i]); }
可是,我们渴望读写全体Tools树到文件中,所以我们需求支持自动履行序列化和反序列化的方法。
public object ReadDeserialize(int offset, int length) { byte[] binaryData = new byte[length]; ReadBytes(binaryData,offset); System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); object data = formatter.Deserialize(ms); ms.Close(); return data; } public void WriteSerialize(object data, int offset, int length) { System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); byte[] binaryData = new byte[length]; System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); formatter.Serialize(ms,data); ms.Flush(); ms.Close(); WriteBytes(binaryData,offset); }
请留心:Tools序列化当前的大小不应该逾越映照视图的大小。序列化当前的大小总是比Tools本身占用的内存要大的。我没有试过直接将Tools内存流绑定到映照视图,那样做该当也可以,甚至可能带来少量的功用提升。
这里的信箱与 Email 及 NT 中的邮件槽(Mailslots)有关。它是一个只能保管一个Tools的安全共享内存结构。信箱的内容颠末一个属性来读写。假设信箱内容为空,试图读取该信箱的线程将会阻塞,直到别的一个线程往其中写内容。假设信箱已经有了内容,当一个线程试图往其中写内容时将被阻塞,直到别的一个线程将信箱内容读取出去。信箱的内容只能被读取一次,它的引用在读取后自动被删除。基于上面的代码,我们已经可以完成信箱了。
我们可以使用两个旗帜暗号量来完成一个信箱:一个旗帜暗号量在信箱内容为空时触发,别的一个在信箱有内容时触发。在读取内容之前,线程先等待信箱已经填充了内容,读取当前触发空旗帜暗号量。在写入内容之前,线程先等待信箱内容清空,写入当前触发满旗帜暗号量。留心:空旗帜暗号量在一末尾时就被触发了。
public sealed class ThreadMailBox : IMailBox { private object content; private ThreadSemaphore empty, full; public ThreadMailBox() { empty = new ThreadSemaphore(1,1); full = new ThreadSemaphore(0,1); } public object Content { get { full.Acquire(); object item = content; empty.Release(); return item; } set { empty.Acquire(); content = value; full.Release(); } } }
跨进程信箱与跨线程信箱的完成基本上一样轻易。不合的是我们应用两个跨进程的旗帜暗号量,并且我们应用内存映照文件来替换类成员变量。由于序列化可能会失败,我们应用了一小段异常处理往复滚信箱的形状。失败的启事有十分多(无效句柄,拒绝访问,文件大小问题,Serializable属性缺失等等)。
public sealed class ProcessMailBox : IMailBox, IDisposable { private MemoryMappedFile file; private MemoryMappedFileView view; private ProcessSemaphore empty, full; public ProcessMailBox(string name,int size) { empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1); full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1); file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox", MemoryMappedFile.FileAccess.ReadWrite,size); view = file.CreateView(0,size, MemoryMappedFileView.ViewAccess.ReadWrite); } public object Content { get { full.Acquire(); object item; try {item = view.ReadDeserialize();} catch(Exception e) { //Rollback full.Release(); throw e; } empty.Release(); return item; } set { empty.Acquire(); try {view.WriteSerialize(value);} catch(Exception e) { //Rollback empty.Release(); throw e; } full.Release(); } } #region IDisposable Member public void Dispose() { view.Dispose(); file.Dispose(); empty.Dispose(); full.Dispose(); } #endregion }
到这里我们已经完成了跨进程消息传递(IPC)所需求的组件。你可能需求再回头本文收尾的阿谁例子,看看 ProcessMailBox 该当怎样应用。
信箱最大的限制是它们每次只能保管一个Tools。假设一系列线程(应用一致个信箱)中的一个线程需求比拟长的时间来处理特定的号召,那么全体系列城市阻塞。但凡我们会应用缓冲的消息通道来处理,多么你可以在便当的时分从中读吊销息,而不会阻塞消息发送者。这类缓冲颠末通道来完成,这里的通道比信箱要复杂一些。异样,我们将区分从线程和进程级别来谈判通道的完成。
信箱和通道的别的一个首要的不合是:通道具有老部性。例如:自动将发送失败(可能由于线程等待锁的进程傍边被中断)的消息转存到一个内置的容器中。这意味着处理通道的线程可以安全地中断,同时不会损失行列中的消息。这颠末两个笼统类来完成, ThreadReliability 和 ProcessReliability。每个通道的完成类都承袭其中的一个类。
跨线程的通道基于信箱来完成,可是应用一个同步的行列来作为消息缓冲而不是一个变量。得益于旗帜暗号量,通道在空行列时阻塞采取线程,在行列满时阻塞发送线程。多么你就不会碰着由入队/出队激起的差错。为了完成这个结果,我们用行列大小来初始化空旗帜暗号量,用0来初始化满旗帜暗号量。假设某个发送线程在等待入队的时分被中断,我们将消息复制到内置容器中,并将异常往外面抛。在采取把持中,我们不需求做异常处理,因为即使线程被中断你也不会损失任何消息。留心:线程只需在阻塞形状才华被中断,就像调用旗帜暗号量的取得把持(Aquire)方法时。
public sealed class ThreadChannel : ThreadReliability, IChannel { private Queue queue; private ThreadSemaphore empty, full; public ThreadChannel(int size) { queue = Queue.Synchronized(new Queue(size)); empty = new ThreadSemaphore(size,size); full = new ThreadSemaphore(0,size); } public void Send(object item) { try {empty.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItem(item); throw e; } queue.Enqueue(item); full.Release(); } public void Send(object item, TimeSpan timeout) { try {empty.Acquire(timeout);} ... } public object Receive() { full.Acquire(); object item = queue.Dequeue(); empty.Release(); return item; } public object Receive(TimeSpan timeout) { full.Acquire(timeout); ... } protected override void DumpStructure() { lock(queue.SyncRoot) { foreach(object item in queue) DumpItem(item); queue.Clear(); } } }
2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务