using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace RobotLib.Communication
{
public class MqttPublisherSubscriber : IPublisherSubscriber
{
private MqttClient client;
private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger();
private object clientLock = new object();
///
/// Singleton pattern.
///
public static MqttPublisherSubscriber Instance { get; } = new MqttPublisherSubscriber();
///
/// Event to catch for subscribed messages. Topic needs to be checked, since other classes might also subscribe to topics.
///
public event EventHandler NewMessageArrived;
///
/// Event to catch for the connection state to the broker.
///
public event EventHandler ConnectionStateChanged;
///
/// Connection state to the broker.
///
public bool IsConnected
{
get { return _isConnected; }
private set
{
// generate event if the value has changed
if(_isConnected != value)
{
this.ConnectionStateChanged?.Invoke(this, value);
}
_isConnected = value;
}
}
private bool _isConnected;
private MqttPublisherSubscriber()
{ }
///
/// Connect anonymously to the broker.
///
/// IP of the broker
/// success
public bool Connect(string brokerIp)
{
return this.Connect(brokerIp, null, null);
}
///
/// Connect with username/password to the broker.
///
/// IP of the broker
/// username
/// password
/// success
public bool Connect(string brokerIp, string username, string password)
{
bool success = false;
lock (clientLock)
{
// protect from multiple connects
if (IsConnected) return true;
IsConnected = true;
log.Info($"Connecting to Mqtt Broker.");
// creating an MqttClient object
client = new MqttClient(brokerIp);
// register to message received and disconnect
client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;
client.ConnectionClosed += Client_ConnectionClosed;
// generate a clientID and connect to Broker
string clientId = Guid.NewGuid().ToString();
try
{
if (String.IsNullOrEmpty(username) || String.IsNullOrEmpty(password))
{
client.Connect(clientId);
}
else
{
client.Connect(clientId, username, password);
}
}
catch (Exception ex)
{
log.Error(ex.Message);
}
success = client.IsConnected;
log.Info($"Connecting done. Success = {success}");
}
return success;
}
public void Disconnect()
{
lock (clientLock)
{
if (IsConnected == false) return;
try
{
client.Disconnect();
}
catch { }
log.Info($"Disconnect was called.");
IsConnected = false;
}
}
private void Client_ConnectionClosed(object sender, EventArgs e)
{
lock (clientLock)
{
IsConnected = false;
log.Info($"Connection was closed.");
}
}
private void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
lock (clientLock)
{
SubscribedMsgArrivedEventArgs eventArgs = new(e.Topic, Encoding.UTF8.GetString(e.Message));
log.Trace($"Message received on topic {eventArgs.Topic}: \n{eventArgs.Message}");
NewMessageArrived?.Invoke(this, eventArgs);
}
}
public void Publish(string topic, string message)
{
lock (clientLock)
{
var msgId = client.Publish(topic, Encoding.ASCII.GetBytes(message));
log.Trace($"Published to topic '{topic}'. MessageId = {msgId}");
}
}
public void Subscribe(string topic)
{
lock (clientLock)
{
var msgId = client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
log.Info($"Subscribed to topic '{topic}'. MessageId = {msgId}");
}
}
}
}