Assembly: mindtouch.dream
Namespace: MindTouch.Cues
A cue is hybrid construct combining the capabilities of a queue and a signaling object. It is used to send and coordinate work across threads. Cues are simple to use and very useful. A sender can add one or more items to a cue, which in turn notifies a receiver if enough items have been added. This two step process means that items can be added to a cue before a receiver is waiting or after, eliminating a common race condition. Cues are generic and can be used with any data-type. In MindTouch Dream, they are used to transport messages through the system using Cue<DreamMessage>.
There is only handful of basic methods on a cue. A sender can add an item, it can mark the cue as done, or it can indicate that it will not be able to add more items. A receiver can wait for items in various forms. Either it can wait for a single item, for multiple items, or for all items.
The following diagram depicts these basic methods.

These basic methods are grouped into two interfaces: IOutCue<T> and IInCue<T>. The former defines properties and methods used for sending items and the latter for receiving items. The most commonly used class is Cue<T>, which implements both IOutCue<T> and IInCue<T>. However, it is not necessary for custom implementations to support both interfaces. For example, HttpOutCue only implements IOutCue<DreamMessage>. This means that it is possible to implement only one end of a cue either to send or receive items from a custom transport.
The examples on this page assume that a cue with type 'int' was created.
Cue<int> cue = new Cue<int>();
This interface defines the properties and methods available for cues on which messages can be sent. An out-cue can be in two states: 'ready' and 'done'. The 'done' state indicates that no further messages can be sent. However, it does not indicate that all sent messages have been received. This kind of quality of service guarantee is not provided by cues.
The IsDone property is used to check if a cue has already been marked as 'done'. Once a cue is done, subsequent attempts to send messages or mark the cue as 'done' will throw an InvalidOperationException.
if(cue.IsDone) {
Console.WriteLine("Cue is marked as 'done'");
} else {
Console.WriteLine("Cue is 'ready'");
}
The OnCompletion event is triggered when a cue is marked as 'done'. The boolean parameter to the event handler indicates if the cue as marked as 'done' using Done (=true) or Fail (=false).
cue.OnCompletion += delegate(bool success) {
if(success) {
Console.WriteLine("Cue closed with Done()");
} else {
Console.WriteLine("Cue closed with Fail()");
}
}
The Add() method adds an item to a cue. An InvalidOperationException is thrown if the cue has already been marked as 'done'. The Add() method returns a reference to the out-cue so that multiple calls can be chained.
cue.Add(1).Add(2).Add(3).Done();
cue.Done();
The Fail() method marks a cue as 'done'. Registered OnCompletion event handlers are triggered with 'false' as their argument. As soon as a cue is marked as 'done', all subsequent attempts to send messages or mark the cue as 'done' will throw an InvalidOperationException.
Note: The difference between Done() and Fail() is subtle. Done() must be called when all items have been added successfully. Fail() can be called if the sender can't add more items, because of a failure. If the Fail() method is not called on failure, then the receiver will wait until it times out (since no further Add() or Done() calls occur). If the Fail() method is called, the receiver will timeout immediately since it now knows that no additional items will ever be added. Therefore, calling Fail() is an optional, but optimizes clean-up of runtime resources.
cue.Fail();
Let's illustrate this point with a slightly more involved example. Let's say we have a method that processes a batch of floating-point numbers by computing their inverse and sending them to a cue. Here is how we might deal with a division by zero error which would halt processing:
void ComputeInverse(float[] numbers, Cue<float> resultCue) {
foreach(float number in numbers) {
try {
resultCue.Add(1.0 / number);
} catch {
resultCue.Fail();
throw;
}
}
resultCue.Done();
}
This interface defines properties and methods for cues on which messages can be received. An in-cue can be in two states: 'done' or 'pending'. The 'pending' states indicates that the cue either has messages or can still receive messages.
Messages can be received by three different methods: Wait(), WaitMany(), and WaitAll(). Wait() receives at most a single message, WaitMany() receives up to a given count of messages, and, WaitAll() waits until the cue is marked as 'done' to return all accumulated messages.
The wait methods can be invoked in three different styles: synchronous, asynchronous, and iterative. The synchronous methods block the current thread of execution until the operation completes, the asynchronous methods take a handler to invoke when the operation completes, and the iterative methods use C#'s 'yield' mechanism to suspend the current execution context in a non-blocking fashion until the operation completes.
The wait methods return the cue state at time of completion. There are three possible values: 'Success', 'Done', and 'Timeout'. The 'Success' state indicates that the operation was completed successfully in its entirety. The 'Done' state indicates that the operation completed only partially because the cue is marked as 'done'. Subsequent wait operations will not provide additional results. And finally, the 'Timeout' state indicates that the operation completed only partially because the maximum wait elapsed or Fail() was invoked by the sender.
The HasPending property is used to check if a cue might still contain more items. This property will return 'true' if the cue has not been marked as 'done' or it has been marked as 'done', but there are still unreceived items in it.
if(cue.HasPending) {
Console.WriteLine("Cue could contain more items");
} else {
Console.WriteLine("Cue has no more items");
}
The OnAsyncException event is triggered when an exception occurs during the invocation of an asynchronous handler. It is equivalent to a generic catch statement around the invoked handler. The event includes the exception that was thrown as an argument.
cue.OnAsyncException += delegate(Exception e) {
Console.WriteLine("An exception was thrown " + e);
}
This Wait() method blocks the current thread until an item is received or the specified amount of time has elapsed.
Warning: This method should only be used when the receiving thread is not running on a threadpool thread. Blocking threadpool threads may lead to an unresponsive system. Whenever possible, use the asynchronous or iterative implementations of this method.
Returns: This method returns either 'Success', 'Done', or 'Timeout'. 'Success' indicates that the method received an item successfully. 'Done' indicates that no item was received because the cue is marked as 'done'. And 'Timeout' indicates that no item was received because the alloted time has elapsed.
int item;
CueState state = cue.Wait(1000, out item);
switch(state) {
case CueState.Success:
Console.WriteLine("An item was received: " + item);
break;
case CueState.Done:
Console.WriteLine("No item was received because the cue is marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("No item was received because the operation timed out");
break;
}
This WaitMany() method blocks the current thread until the specified number of items are received or the specified amount of time has elapsed.
Warning: This method should only be used when the receiving
thread is not running on a threadpool thread. Blocking threadpool
threads may lead to an unresponsive system. Whenever possible, use the
asynchronous or iterative implementations of this method.
Exceptions: An ArgumentException is thrown if 'count' is zero or less.
Returns: This method returns either 'Success', 'Done', or 'Timeout'. 'Success' indicates that the method received all requested items. 'Done' indicates that less items than requested were received because the cue is marked as 'done'. And 'Timeout' indicates that less items than requested were received because the alloted time has elapsed.
IList<int> items;
CueState state = cue.WaitMany(10, 1000, out items);
switch(state) {
case CueState.Success:
Console.WriteLine("All items were received.");
break;
case CueState.Done:
Console.WriteLine("Less items than expected were received because the cue is marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("Less items than expected were received because the operation timed out");
break;
}
Console.WriteLine("Items received: " + items.Count);
This WaitAll() method blocks the current thread until all items are received or the specified amount of time has elapsed.
Warning: This method should only be used when the receiving
thread is not running on a threadpool thread. Blocking threadpool
threads may lead to an unresponsive system. Whenever possible, use the
asynchronous or iterative alternative to this method.
Returns: This method returns either 'Done' or
'Timeout'. 'Done' indicates all items were received
and that the cue is now marked as 'done'. 'Timeout' indicates that the operation finished early because the alloted time has elapsed.
IList<int> items;
CueState state = cue.WaitAll(1000, out items);
switch(state) {
case CueState.Done:
Console.WriteLine("All items were received");
break;
case CueState.Timeout:
Console.WriteLine("Operation timed out with a partial result");
break;
}
Console.WriteLine("Items received: " + items.Count);
This WaitAll() method blocks the current thread, invoking the 'handler' for each item it receives or the specified amount of time has
elapsed between receiving any two items. Processing is aborted if the 'handler' returns 'false'.
Warning: This method should only be used when the receiving
thread is not running on a threadpool thread. Blocking threadpool
threads may lead to an unresponsive system. Whenever possible, use the
asynchronous or iterative alternative to this method.
Returns: This method returns either 'Success', 'Done', or
'Timeout'. 'Success' indicates that processing was interrupted by the 'handler'. 'Done' indicates all items were processed and that the cue is now marked as 'done'. 'Timeout' indicates that the operation finished early because the alloted time elapsed between two items.
CueState state = cue.WaitAll(1000, delegate(int i) {
Console.WriteLine("Received " + i);
return true;
});
switch(state) {
case CueState.Success:
Console.WriteLine("Processing stopped and cue may contain additional items");
break;
case CueState.Done:
Console.WriteLine("All items were received");
break;
case CueState.Timeout:
Console.WriteLine("Operation timed out");
break;
}
This Wait() method invokes the 'handler' when an item is received or the specified amount of time has elapsed. Since this method does not block and wait, the 'handler' may be invoked at any time during the application's execution.
Note: The CueState argument take on the same values as the return value for the synchronous Wait() method.
cue.Wait(1000, delegate(int item, CueState state) {
switch(state) {
case CueState.Success:
Console.WriteLine("An item was received: " + item);
break;
case CueState.Done:
Console.WriteLine("The cue is now marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("The operation has timed out");
break;
}
});
This WaitMany() method invokes the 'handler' when the specified number of items have been received or the specified amount of time has elapsed. Since this method does not block and wait, the 'handler' may be invoked at any time during the application's execution.
Exceptions: An ArgumentException is thrown if 'count' is zero or less.
Note: The CueState argument take on the same values as the return value for the synchronous WaitMany() method.
cue.WaitMany(10, 1000, delegate(IList<int> items, int expectedCount, CueState state) {
switch(state) {
case CueState.Success:
Console.WriteLine("All items were received.");
break;
case CueState.Done:
Console.WriteLine("Less items than expected were received because the cue is marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("Less items than expected were received because the operation timed out");
break;
}
Console.WriteLine("Items received: " + items.Count);
Console.WriteLine("Items expected: " + expectedCount);
});
This WaitAll() method invokes the 'handler' when all items have been received or the specified amount of time has elapsed. Since this method does not block and wait, the 'handler' may be invoked at any time during the application's execution.
Note: The CueState argument take on the same values as the return value for the synchronous WaitAll() method.
cue.WaitAll(1000, delegate(IList<int> items, int expectedCount, CueState state) {
switch(state) {
case CueState.Success:
Console.WriteLine("All items were received.");
break;
case CueState.Done:
Console.WriteLine("Less items than expected were received because the cue is marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("Less items than expected were received because the operation timed out");
break;
}
Console.WriteLine("Items received: " + items.Count);
});
This WaitAll() method invokes the 'handler' each time an item is received or the specified amount of time has elapsed between receiving any two items. Processing is aborted if the 'handler' returns 'false'. Since this method does not block and wait, the 'handler' may be invoked at any time during the application's execution.
cue.WaitAll(1000, delegate(int i, CueState state) {
switch(state) {
case CueState.Success:
Console.WriteLine("Received " + i);
break;
case CueState.Done:
Console.WriteLine("All items were received");
break;
case CueState.Timeout:
Console.WriteLine("Operation timed out");
break;
}
return true;
});
This Wait() method suspends the current execution context using the 'yield' statement. The context is restored when an item is received or the specified amount of time has elapsed. This technique is called iterative programming and has the advantage of combining the best of synchronous and asynchronous programming.
Returns: This method returns a CueYield<T> instance where T is of the same type as in Cue<T>. The CueYield.State returns the state of the cue when the operation completed. It can be either 'Success', 'Done', or
'Timeout'. 'Success' indicates that the method received an item
successfully. 'Done' indicates that no item was received because the cue
is marked as 'done'. And 'Timeout' indicates that no item was received
because the alloted time has elapsed. Received items can be accessed with the Items property ([] in C#).
IEnumerator<ITask> MyIterativeMethod(Cue<int> cue) {
CueYield<int> cy;
while(true) {
yield return cy = cue.Wait(1000);
switch(cy.State) {
case CueState.Success:
Console.WriteLine("An item was received: " + cy[0]);
break;
case CueState.Done:
Console.WriteLine("The cue is now marked as 'done'");
yield break;
case CueState.Timeout:
Console.WriteLine("The operation has timed out");
yield break;
}
}
}
This WaitMany() method suspends the current execution context using the
'yield' statement. The context is restored when the specified number of items have been received or
the specified amount of time has elapsed. This technique is called iterative programming and has the advantage of combining the best of synchronous and asynchronous programming.
Returns: This method returns a CueYield<T> instance where T is of the same type as in Cue<T>. The CueYield.State returns the state of the cue when the operation completed. It can be either be 'Success', 'Done', or 'Timeout'. 'Success' indicates that the method received all requested items. 'Done' indicates that less items than requested were received because the cue has been marked as 'done'. And 'Timeout' indicates that less items than requested were received because the alloted time elapsed. Received items can be accessed with the Items property ([] in C#). The expected number of items can be read with CueYield.ExpectedCount. The actual number of received items can be read with CueYield.Count. 'Success' always means that the expected count matches the actual count.
IEnumerator<ITask> MyIterativeMethod(Cue<int> cue) {
CueYield<int> cy;
yield return cy = cue.WaitMany(10, 1000);
switch(cy.State) {
case CueState.Success:
Console.WriteLine("All items were received.");
break;
case CueState.Done:
Console.WriteLine("Less items than expected were received because the cue is marked as 'done'");
break;
case CueState.Timeout:
Console.WriteLine("Less items than expected were received because the operation timed out");
break;
}
Console.WriteLine("Items received: " + cy.Count);
}
This WaitAll() method suspends the current execution context using the
'yield' statement. The context is restored when all items have been received or
the specified amount of time has elapsed. This technique is called iterative programming and has the advantage of combining the best of synchronous and asynchronous programming.
Returns: This method returns a CueYield<T> instance where T is of the same type as in Cue<T>. The CueYield.State returns the state of the cue when the operation completed. It can be either be 'Done' or 'Timeout'. 'Done' indicates that all items were received and that the cue is now marked as 'done'. 'Timeout' indicates that the operation finished because the alloted time elapsed. Received items can be accessed with the Items property ([] in C#). The number of received items can be read with CueYield.Count.
IEnumerator<ITask> MyIterativeMethod(Cue<int> cue) {
CueYield<int> cy;
yield return cy = cue.WaitAll(1000);
switch(cy.State) {
case CueState.Done:
Console.WriteLine("All items were received");
break;
case CueState.Timeout:
Console.WriteLine("Operation timed out with a partial result");
break;
}
Console.WriteLine("Items received: " + cy.Count);
}