using System; using System.Text; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace RobotLib.Communication { public class MqttPublisherSubscriber : IPublisherSubscriber { private MqttClient client; private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger(); private object clientLock = new object(); /// /// Singleton pattern. /// public static MqttPublisherSubscriber Instance { get; } = new MqttPublisherSubscriber(); /// /// Event to catch for subscribed messages. Topic needs to be checked, since other classes might also subscribe to topics. /// public event EventHandler NewMessageArrived; /// /// Event to catch for the connection state to the broker. /// public event EventHandler ConnectionStateChanged; /// /// Connection state to the broker. /// public bool IsConnected { get { return _isConnected; } private set { // generate event if the value has changed if(_isConnected != value) { this.ConnectionStateChanged?.Invoke(this, value); } _isConnected = value; } } private bool _isConnected; private MqttPublisherSubscriber() { } /// /// Connect anonymously to the broker. /// /// IP of the broker /// success public bool Connect(string brokerIp) { return this.Connect(brokerIp, null, null); } /// /// Connect with username/password to the broker. /// /// IP of the broker /// username /// password /// success public bool Connect(string brokerIp, string username, string password) { bool success = false; lock (clientLock) { // protect from multiple connects if (IsConnected) return true; IsConnected = true; log.Info($"Connecting to Mqtt Broker."); // creating an MqttClient object client = new MqttClient(brokerIp); // register to message received and disconnect client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived; client.ConnectionClosed += Client_ConnectionClosed; // generate a clientID and connect to Broker string clientId = Guid.NewGuid().ToString(); try { if (String.IsNullOrEmpty(username) || String.IsNullOrEmpty(password)) { client.Connect(clientId); } else { client.Connect(clientId, username, password); } } catch (Exception ex) { log.Error(ex.Message); } success = client.IsConnected; log.Info($"Connecting done. Success = {success}"); } return success; } public void Disconnect() { lock (clientLock) { if (IsConnected == false) return; try { client.Disconnect(); } catch { } log.Info($"Disconnect was called."); IsConnected = false; } } private void Client_ConnectionClosed(object sender, EventArgs e) { lock (clientLock) { IsConnected = false; log.Info($"Connection was closed."); } } private void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { lock (clientLock) { SubscribedMsgArrivedEventArgs eventArgs = new(e.Topic, Encoding.UTF8.GetString(e.Message)); log.Trace($"Message received on topic {eventArgs.Topic}: \n{eventArgs.Message}"); NewMessageArrived?.Invoke(this, eventArgs); } } public void Publish(string topic, string message) { lock (clientLock) { var msgId = client.Publish(topic, Encoding.ASCII.GetBytes(message)); log.Trace($"Published to topic '{topic}'. MessageId = {msgId}"); } } public void Subscribe(string topic) { lock (clientLock) { var msgId = client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); log.Info($"Subscribed to topic '{topic}'. MessageId = {msgId}"); } } } }