You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
190 lines
6.3 KiB
190 lines
6.3 KiB
using System;
|
|
using System.Collections.Generic;
|
|
using System.Text;
|
|
using uPLibrary.Networking.M2Mqtt;
|
|
using uPLibrary.Networking.M2Mqtt.Messages;
|
|
|
|
namespace RobotLib.Communication
|
|
{
|
|
public class MqttPublisherSubscriber : IPublisherSubscriber
|
|
{
|
|
private MqttClient client;
|
|
private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger();
|
|
private object clientLock = new object();
|
|
private List<string> toBeSubscribedTopics = new List<string>();
|
|
|
|
/// <summary>
|
|
/// Singleton pattern.
|
|
/// </summary>
|
|
public static MqttPublisherSubscriber Instance { get; } = new MqttPublisherSubscriber();
|
|
|
|
/// <summary>
|
|
/// Event to catch for subscribed messages. Topic needs to be checked, since other classes might also subscribe to topics.
|
|
/// </summary>
|
|
public event EventHandler<SubscribedMsgArrivedEventArgs> NewMessageArrived;
|
|
|
|
/// <summary>
|
|
/// Event to catch for the connection state to the broker.
|
|
/// </summary>
|
|
public event EventHandler<bool> ConnectionStateChanged;
|
|
|
|
/// <summary>
|
|
/// Connection state to the broker.
|
|
/// </summary>
|
|
public bool IsConnected
|
|
{
|
|
get { return _isConnected; }
|
|
private set
|
|
{
|
|
// generate event if the value has changed
|
|
if(_isConnected != value)
|
|
{
|
|
this.ConnectionStateChanged?.Invoke(this, value);
|
|
}
|
|
_isConnected = value;
|
|
}
|
|
}
|
|
private bool _isConnected;
|
|
|
|
private MqttPublisherSubscriber()
|
|
{ }
|
|
|
|
/// <summary>
|
|
/// Connect anonymously to the broker.
|
|
/// </summary>
|
|
/// <param name="brokerIp">IP of the broker</param>
|
|
/// <returns>success</returns>
|
|
public bool Connect(string brokerIp)
|
|
{
|
|
return this.Connect(brokerIp, null, null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Connect with username/password to the broker.
|
|
/// </summary>
|
|
/// <param name="brokerIp">IP of the broker</param>
|
|
/// <param name="username">username</param>
|
|
/// <param name="password">password</param>
|
|
/// <returns>success</returns>
|
|
public bool Connect(string brokerIp, string username, string password)
|
|
{
|
|
bool success = false;
|
|
lock (clientLock)
|
|
{
|
|
// protect from multiple connects
|
|
if (IsConnected) return true;
|
|
IsConnected = true;
|
|
|
|
log.Info($"Connecting to Mqtt Broker.");
|
|
// creating an MqttClient object
|
|
client = new MqttClient(brokerIp);
|
|
// register to message received and disconnect
|
|
client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;
|
|
client.ConnectionClosed += Client_ConnectionClosed;
|
|
// generate a clientID and connect to Broker
|
|
string clientId = Guid.NewGuid().ToString();
|
|
try
|
|
{
|
|
if (String.IsNullOrEmpty(username) || String.IsNullOrEmpty(password))
|
|
{
|
|
client.Connect(clientId);
|
|
}
|
|
else
|
|
{
|
|
client.Connect(clientId, username, password);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
log.Error(ex.Message);
|
|
}
|
|
|
|
success = client.IsConnected;
|
|
log.Info($"Connecting done. Success = {success}");
|
|
|
|
// subscribe to topics that have already been appended
|
|
if(success)
|
|
{
|
|
foreach (var topic in toBeSubscribedTopics)
|
|
{
|
|
this.OnlineSubscribe(topic);
|
|
}
|
|
}
|
|
}
|
|
return success;
|
|
}
|
|
|
|
public void Disconnect()
|
|
{
|
|
lock (clientLock)
|
|
{
|
|
if (IsConnected == false) return;
|
|
|
|
try
|
|
{
|
|
client.Disconnect();
|
|
}
|
|
catch { }
|
|
|
|
log.Info($"Disconnect was called.");
|
|
IsConnected = false;
|
|
}
|
|
}
|
|
|
|
private void Client_ConnectionClosed(object sender, EventArgs e)
|
|
{
|
|
lock (clientLock)
|
|
{
|
|
IsConnected = false;
|
|
log.Warn($"Connection was closed.");
|
|
}
|
|
}
|
|
|
|
private void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
|
|
{
|
|
lock (clientLock)
|
|
{
|
|
SubscribedMsgArrivedEventArgs eventArgs = new(e.Topic, Encoding.UTF8.GetString(e.Message));
|
|
log.Trace($"Message received on topic {eventArgs.Topic}: \n{eventArgs.Message}");
|
|
NewMessageArrived?.Invoke(this, eventArgs);
|
|
}
|
|
}
|
|
|
|
public void Publish(string topic, string message)
|
|
{
|
|
if(this.IsConnected == false)
|
|
{
|
|
log.Warn($"Message was discarded, Mqtt broker not connected. Destination topic={topic}");
|
|
return;
|
|
}
|
|
lock (clientLock)
|
|
{
|
|
var msgId = client.Publish(topic, Encoding.ASCII.GetBytes(message), 0, false);
|
|
log.Trace($"Published to topic '{topic}'. MessageId = {msgId}");
|
|
}
|
|
}
|
|
|
|
public void Subscribe(string topic)
|
|
{
|
|
// add to list if not already there
|
|
if(toBeSubscribedTopics.Contains(topic) == false)
|
|
{
|
|
toBeSubscribedTopics.Add(topic);
|
|
log.Debug($"Added topic '{topic}'. To the to be subscribed list.");
|
|
}
|
|
if (this.IsConnected)
|
|
{
|
|
OnlineSubscribe(topic);
|
|
}
|
|
}
|
|
|
|
private void OnlineSubscribe(string topic)
|
|
{
|
|
lock (clientLock)
|
|
{
|
|
var msgId = client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
|
|
log.Info($"Subscribed to topic '{topic}'. MessageId = {msgId}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|