@ -14,7 +14,9 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
protected readonly IMessenger messenger ;
protected readonly IMessenger messenger ;
private CancellationTokenSource cancellationTokenSource ;
private CancellationTokenSource cancellationTokenSource ;
private Thread ? readingThread ;
private Thread ? readingThread ;
private Thread ? bufferHandlingThread ;
private RecurringTimer ? bufferHandlingTimer ;
private const int bufferHandlingTimerIntervalMs = 2 0 ; // After x milliseconds the timer shall be called
private const int eventBlockSize = 5 0 ; // Block size for Received/Sent Data events
private ConcurrentQueue < ExtendedByte > ? receivedDataQueue ;
private ConcurrentQueue < ExtendedByte > ? receivedDataQueue ;
private ConcurrentQueue < ExtendedByte > ? sentDataQueue ;
private ConcurrentQueue < ExtendedByte > ? sentDataQueue ;
@ -63,7 +65,7 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
// perform all steps in a task, to prevent deadlock when this method is called from a thread that is joined.
// perform all steps in a task, to prevent deadlock when this method is called from a thread that is joined.
Task . Run ( ( ) = >
Task . Run ( ( ) = >
{
{
this . CancelThread s ( ) ;
this . CancelOngoingOperation s ( ) ;
this . logger . LogError ( $"'{nameof(OnUnintentionallyDisconnected)}()' called." , nameof ( CommunicationProtocol ) ) ;
this . logger . LogError ( $"'{nameof(OnUnintentionallyDisconnected)}()' called." , nameof ( CommunicationProtocol ) ) ;
// update state
// update state
@ -143,8 +145,8 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
this . receivedDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
this . receivedDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
this . sentDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
this . sentDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
// start interal buffer handling thread
// start interal buffer handling thread
this . bufferHandlingThread = new Thread ( ( ) = > this . HandleDataQueues ( this . cancellationTokenSource . Token ) ) ;
this . bufferHandlingTimer = new ( bufferHandlingTimerIntervalMs , delegate ( ) { this . HandleDataQueues ( ) ; } ) ;
this . bufferHandlingThread . Start ( ) ;
this . bufferHandlingTimer . Start ( ) ;
// start internal reading thread
// start internal reading thread
this . readingThread = new Thread ( ( ) = > this . InternalRead ( this . cancellationTokenSource . Token ) ) ;
this . readingThread = new Thread ( ( ) = > this . InternalRead ( this . cancellationTokenSource . Token ) ) ;
this . readingThread . Start ( ) ;
this . readingThread . Start ( ) ;
@ -166,69 +168,85 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
protected abstract void InternalDisconnect ( ) ;
protected abstract void InternalDisconnect ( ) ;
public void Disconnect ( )
public void Disconnect ( )
{
{
this . CancelThread s ( ) ;
this . CancelOngoingOperation s ( ) ;
this . InternalDisconnect ( ) ;
this . InternalDisconnect ( ) ;
this . State = ProtocolConnectionState . NotConnected ;
this . State = ProtocolConnectionState . NotConnected ;
}
}
/// <summary>
/// <summary>
/// Sets cancellation token and makes sure both thread s do not run anymore.
/// Sets cancellation token and makes sure ongoing operation s do not run anymore.
/// </summary>
/// </summary>
private void CancelThread s ( )
private void CancelOngoingOperation s ( )
{
{
this . cancellationTokenSource . Cancel ( ) ;
this . cancellationTokenSource . Cancel ( ) ;
// join both threads if they exist
// stop thread and timer
this . readingThread ? . Join ( ) ;
this . readingThread ? . Join ( ) ;
this . bufferHandlingThread ? . Join ( ) ;
this . bufferHandlingTimer ? . Stop ( ) ;
}
}
/// <summary>
/// <summary>
/// Internal handling of internal data queues.
/// Internal handling of internal data queues.
/// Arranges characters into blocks and raises <see cref="ReceivedDataEvent"/> and <see cref="SentDataEvent"/>.
/// </summary>
/// </summary>
/// <param name="ct">cancellation token</param>
private void HandleDataQueues ( )
private void HandleDataQueues ( CancellationToken ct )
{
while ( ct . IsCancellationRequested = = false )
{
{
bool receivedDataAvailable = this . receivedDataQueue ! = null & & this . receivedDataQueue . TryPeek ( out _ ) ;
bool receivedDataAvailable = this . receivedDataQueue ! = null & & this . receivedDataQueue . TryPeek ( out _ ) ;
bool sentDataAvailable = this . sentDataQueue ! = null & & this . sentDataQueue . TryPeek ( out _ ) ;
bool sentDataAvailable = this . sentDataQueue ! = null & & this . sentDataQueue . TryPeek ( out _ ) ;
List < ExtendedByte > internalList = new ( ) ;
List < ExtendedByte > duplicateInternalList ( )
{
// shallow copy of data
var duplicatedList = internalList . Select ( item = > ( ExtendedByte ) item . Clone ( ) ) . ToList ( ) ;
// clear internal list
internalList . Clear ( ) ;
return duplicatedList ;
}
// collect received data
// collect received data
if ( receivedDataAvailable )
if ( receivedDataAvailable )
{
{
// something is in the queue => block thread and try dequeue
// while data is available
ExtendedByte ? newReceivedData ;
while ( this . receivedDataQueue ! . TryDequeue ( out ExtendedByte ? newReceivedByte ) )
while ( this . receivedDataQueue ! . TryDequeue ( out newReceivedData ) = = false )
{
// fill it into the internal list
internalList . Add ( newReceivedByte ) ;
if ( internalList . Count > eventBlockSize )
{
{
Thread . Sleep ( 1 ) ;
// raise event with block of data
this . ReceivedDataEvent ? . Invoke ( this , new ReceivedDataEventArgs ( duplicateInternalList ( ) ) ) ;
}
}
// raise event
}
this . ReceivedDataEvent ? . Invoke ( this , new ReceivedDataEventArgs ( new ExtendedByte [ ] { newReceivedData } ) ) ;
// no more new data but still some data in the list => send it
if ( internalList . Count > 0 )
{
this . ReceivedDataEvent ? . Invoke ( this , new ReceivedDataEventArgs ( duplicateInternalList ( ) ) ) ;
}
// reset list for sent data
internalList = new ( ) ;
}
}
// collect sent data
// collect sent data
if ( sentDataAvailable )
if ( sentDataAvailable )
{
{
// something is in the queue => block thread and try dequeue
// while data is available
ExtendedByte ? newSentData ;
while ( this . sentDataQueue ! . TryDequeue ( out ExtendedByte ? newSentByte ) )
while ( this . sentDataQueue ! . TryDequeue ( out newSentData ) = = false )
{
{
Thread . Sleep ( 1 ) ;
// fill it into the internal list
internalList . Add ( newSentByte ) ;
if ( internalList . Count > eventBlockSize )
{
// raise event with block of data
this . SentDataEvent ? . Invoke ( this , new SentDataEventArgs ( duplicateInternalList ( ) ) ) ;
}
}
// raise event
this . SentDataEvent ? . Invoke ( this , new SentDataEventArgs ( new ExtendedByte [ ] { newSentData } ) ) ;
}
}
// no more new data but still some data in the list => send it
// generally wait some time to slow down thread, if no data available
if ( internalList . Count > 0 )
if ( ! receivedDataAvailable & & ! sentDataAvailable )
{
{
Thread . Sleep ( 5 ) ;
this . SentDataEvent ? . Invoke ( this , new SentDataEventArgs ( duplicateInternalList ( ) ) ) ;
}
}
// always sleep 1ms to keep UI rolling
Thread . Sleep ( 1 ) ;
}
}
}
}