@ -1,7 +1,9 @@
using Common.Logging ;
using Common.Messaging ;
using CommunityToolkit.Mvvm.Messaging ;
using MultiTerm.Protocols.Model ;
using MultiTerm.Protocols.Types ;
using System.Collections.Concurrent ;
namespace MultiTerm.Protocols ;
@ -11,6 +13,9 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
protected readonly IMessenger messenger ;
private CancellationTokenSource cancellationTokenSource ;
private Thread ? readingThread ;
private Thread ? bufferHandlingThread ;
private ConcurrentQueue < ExtendedByte > ? receivedDataQueue ;
private ConcurrentQueue < ExtendedByte > ? sentDataQueue ;
public event EventHandler < ReceivedDataEventArgs > ? ReceivedDataEvent ;
public event EventHandler < SentDataEventArgs > ? SentDataEvent ;
@ -32,12 +37,12 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
/// <summary>
/// To be called when data was received from the connected device.
/// </summary>
protected void OnReceivedData ( ReceivedDataEventArgs e ) { this . ReceivedDataEvent ? . Invoke ( this , e ) ; }
protected void OnReceivedData ( ExtendedByte receivedByt e) { this . receivedDataQueue ? . Enqueue ( receivedByt e) ; }
/// <summary>
/// To be called when data was sent to the connected device.
/// </summary>
protected void OnSentData ( SentDataEventArgs e ) { this . SentDataEvent ? . Invoke ( this , e ) ; }
protected void OnSentData ( ExtendedByte sentByt e) { this . sentDataQueue ? . Enqueue ( sentByt e) ; }
/// <summary>
/// To be called whenever the protocol detected that it is disconnected, but this is not a known fact.
@ -46,8 +51,10 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
{
// update state
this . IsConnected = false ;
// log
this . CancelThreads ( ) ;
this . logger . LogError ( $"'{nameof(OnUnintentionallyDisconnected)}()' called." , nameof ( CommunicationProtocol ) ) ;
// raise event indicating an unintentional disconnect
this . DisconnectedEvent ? . Invoke ( this , new DisconnectedEventArgs ( true ) ) ;
}
@ -64,12 +71,8 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
// guard is not connected => log warning. user of this function shall only use SendBytes if IsConnected is true
if ( this . IsConnected = = false ) { this . logger . LogWarn ( $"'{nameof(SendBytes)}()' was reached with {nameof(IsConnected)} being false" , nameof ( CommunicationProtocol ) ) ; }
if ( this . InternalSendBytes ( bytes ) )
{
// todo implement sent bytes
// ONsentBytes()
}
else
// send bytes and if the sending was cancelled report error
if ( this . InternalSendBytes ( bytes ) = = false )
{
this . logger . LogError ( $"'{nameof(SendBytes)}()' failed to send during {nameof(InternalSendBytes)}." , nameof ( CommunicationProtocol ) ) ;
this . messenger . Send < IUserInterfaceMessage > ( new GenericUserInterfaceMessage ( "Failed to send message" , MessageImportance . High ) ) ;
@ -109,8 +112,13 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
// try connecting if settings are valid
if ( this . InternalConnect ( settings ) )
{
// renew token source
// renew token source and data buffers
this . cancellationTokenSource = new CancellationTokenSource ( ) ;
this . receivedDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
this . sentDataQueue = new ConcurrentQueue < ExtendedByte > ( ) ;
// start interal buffer handling thread
this . bufferHandlingThread = new Thread ( ( ) = > this . HandleDataQueues ( this . cancellationTokenSource . Token ) ) ;
this . bufferHandlingThread . Start ( ) ;
// start internal reading thread
this . readingThread = new Thread ( ( ) = > this . InternalRead ( this . cancellationTokenSource . Token ) ) ;
this . readingThread . Start ( ) ;
@ -131,16 +139,71 @@ public abstract class CommunicationProtocol : ICommunicationProtocol
protected abstract void InternalDisconnect ( ) ;
public void Disconnect ( )
{
// if reading thread exists and is running => cancel it and wait
if ( this . readingThread ! = null & & this . readingThread . IsAlive )
{
this . cancellationTokenSource . Cancel ( ) ;
this . readingThread . Join ( ) ;
}
this . CancelThreads ( ) ;
this . InternalDisconnect ( ) ;
this . IsConnected = false ;
// raise event indicating an intentional disconnect
this . DisconnectedEvent ? . Invoke ( this , new DisconnectedEventArgs ( false ) ) ;
}
/// <summary>
/// Sets cancellation token and makes sure both threads do not run anymore.
/// </summary>
private void CancelThreads ( )
{
this . cancellationTokenSource . Cancel ( ) ;
// join both threads if they exist
this . readingThread ? . Join ( ) ;
this . bufferHandlingThread ? . Join ( ) ;
}
/// <summary>
/// Internal handling of internal data queues.
/// </summary>
/// <param name="ct">cancellation token</param>
private void HandleDataQueues ( CancellationToken ct )
{
while ( ct . IsCancellationRequested = = false )
{
bool receivedDataAvailable = this . receivedDataQueue ! = null & & this . receivedDataQueue . TryPeek ( out _ ) ;
bool sentDataAvailable = this . sentDataQueue ! = null & & this . sentDataQueue . TryPeek ( out _ ) ;
// collect received data
if ( receivedDataAvailable )
{
// something is in the queue => block thread and try dequeue
ExtendedByte ? newReceivedData ;
while ( this . receivedDataQueue ! . TryDequeue ( out newReceivedData ) = = false )
{
Thread . Sleep ( 1 ) ;
}
// raise event
this . ReceivedDataEvent ? . Invoke ( this , new ReceivedDataEventArgs ( new ExtendedByte [ ] { newReceivedData } ) ) ;
}
// collect sent data
if ( sentDataAvailable )
{
// something is in the queue => block thread and try dequeue
ExtendedByte ? newSentData ;
while ( this . sentDataQueue ! . TryDequeue ( out newSentData ) = = false )
{
Thread . Sleep ( 1 ) ;
}
// raise event
this . SentDataEvent ? . Invoke ( this , new SentDataEventArgs ( new ExtendedByte [ ] { newSentData } ) ) ;
}
// generally wait some time to slow down thread, if no data available
if ( ! receivedDataAvailable & & ! sentDataAvailable )
{
Thread . Sleep ( 5 ) ;
}
// always sleep 1ms to keep UI rolling
Thread . Sleep ( 1 ) ;
}
}
}