标签云

微信群

扫码加入我们

WeChat QR Code

I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.The data link object has an idle state that must wait for one of four conditions:A byte is receivedA message is available from the network threadThe keep-alive timer has expiredAll communication was cancelled by the network layerI'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.stream.ReadTimeout = 10000;await stream.ReadAsync(buf, 0, 1, cts.Token);orBlockingCollection<byte[]> SendQueue = new ...;...// Check for a message from network layer. Timeout after 10 seconds.// Monitor cancellation token.SendQueue.TryTake(out msg, 10000, cts.Token);What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.EDIT: ******** Thanks for the help everyone. Here's my solution ********First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.byte[] buf = new byte[1];using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()){CancellationToken internalToken = internalTokenSource.Token;CancellationToken stopToken = stopTokenSource.Token;using (CancellationTokenSource linkedCts =CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)){CancellationToken ct = linkedCts.Token;Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);// Wait for at least one task to completeawait Task.WhenAny(readTask, msgTask, keepAliveTask);// Next cancel the other tasksinternalTokenSource.Cancel();try {await Task.WhenAll(readTask, msgTask, keepAliveTask);} catch (OperationCanceledException e) {if (e.CancellationToken == stopToken)throw;}if (msgTask.IsCompleted)// Send the network layer messageelse if (readTask.IsCompleted)// Process the byte from the physical layerelseContract.Assert(keepAliveTask.IsCompleted);// Send a keep alive message}}


await Task.WhenAny(...) might help.

2019年04月19日05分36秒

Wish I had seen this earlier as I went and implemented my own AsyncQueue, albeit not as general as yours. Why do you await each individual task? Shouldn't it already be completed?

2019年04月19日05分36秒

BrianHeilig: It's necessary to retrieve results. These tasks have completed but their results haven't been observed. Results can be data, e.g., messageTask, but results can also be exceptions. keepAliveTimerTask can have an exception if it's canceled, and readByteTask can have an exception if it's canceled or if there's some I/O read error. await will (re-)raise those exceptions if present.

2019年04月20日05分36秒

I think you meant WhenAny()

2019年04月19日05分36秒

No. WhenAny returns the moment any of the tasks returns. WhenAll awaits for every task given to complete, and returns in itself a Task (as opposed to WaitAll()

2019年04月19日05分36秒

This doesn't work because the result of the byte read might be discarded. I assume he cares about that data not being lost.

2019年04月20日05分36秒

So after a Cancel you still want to wait for a byte or a message to come in? And vice versa?

2019年04月20日05分36秒

The original question says " The data link object has an idle state that must wait for four conditions" - in my head, that means "wait for all 4 things to happen". If you mean any of those 4 can happen, then of course, WaitAny() is what you want :)

2019年04月19日05分36秒

Sorry, I was a little short on detail. The cts is a CancellationTokenSource which is owned by the data link layer. Cancellation is also controlled by the data link layer; the network thread calls stop which cancels all communication and waits for the thread to complete. The goal is to gracefully and quickly stop the data link layer.

2019年04月20日05分36秒

And why would this answer not work? You can make TryTake abort on any condition you like.

2019年04月19日05分36秒

I think you link the token, cancel the linked cts when "message is available from the network thread" and also provide a timeout. That should get you all 4 conditions and never lose data from the queue.

2019年04月19日05分36秒

After cancellation you can check which one happened by examining the IsCancellationRequested property of all tokens involved.

2019年04月19日05分36秒

Frankly, I don't fully understand the scenario but I'm confident that you can make any cancellation scheme work using linked tokens and TryTake.

2019年04月20日05分36秒