using System; using System.Text; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace RobotLib.Communication { internal class MqttPublisherSubscriber : IPublisherSubscriber { private MqttClient client; private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger(); private object clientLock = new object(); /// /// Singleton pattern. /// public static MqttPublisherSubscriber Instance { get; } = new MqttPublisherSubscriber(); /// /// Event to catch for subscribed messages. Topic needs to be checked, since other classes might also subscribe to topics. /// public event EventHandler NewMessageArrived; /// /// Connection state to the broker. /// public bool IsConnected { get ; private set; } private MqttPublisherSubscriber() { } public bool Connect(string brokerIp) { bool success = false; lock (clientLock) { // protect from multiple connects if (IsConnected) return true; IsConnected = true; log.Info($"Connecting to Mqtt Broker."); // creating an MqttClient object client = new MqttClient(brokerIp); // register to message received and disconnect client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived; client.ConnectionClosed += Client_ConnectionClosed; // generate a clientID and connect to Broker string clientId = Guid.NewGuid().ToString(); var result = client.Connect(clientId); success = result == 0 ? true : false; log.Info($"Connecting done. Success = {success}"); } return success; } public void Disconnect() { lock (clientLock) { if (IsConnected == false) return; client.Disconnect(); log.Info($"Disconnect was called."); IsConnected = false; } } private void Client_ConnectionClosed(object sender, EventArgs e) { lock (clientLock) { IsConnected = false; log.Info($"Connection was closed."); } } private void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { lock (clientLock) { SubscribedMsgArrivedEventArgs eventArgs = new(e.Topic, Encoding.UTF8.GetString(e.Message)); log.Trace($"Message received on topic {eventArgs.Topic}: {eventArgs.Message}"); NewMessageArrived?.Invoke(this, eventArgs); } } public void Publish(string topic, string message) { lock (clientLock) { var result = client.Publish(topic, Encoding.ASCII.GetBytes(message)); log.Trace($"Published to topic '{topic}'. Message = {message}", result == 0 ? true : false); } } public void Subscribe(string topic) { lock (clientLock) { var result = client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); log.Info($"Subscribed to topic '{topic}'. Success = %s", result == 0 ? true : false); } } } }