Aprendiendo MQTT 4: Construyendo Nuestro Propio Broker Usando c#, dotnet y MQTTnet. mosquitto. Instalación con Docker

Mqtt implementando un broker

En esta cuarta entrega de nuestra serie sobre MQTT, pondremos el foco en uno de los componentes más cruciales de cualquier sistema de mensajería en tiempo real: el broker. Imagina un escenario de domótica en el que múltiples dispositivos necesitan comunicarse entre sí de manera eficiente. En este contexto, el broker actúa como el cerebro operativo, encargado de distribuir mensajes desde los publicadores—en nuestro caso, sensores—hacia los suscriptores, como podría ser un display que muestra datos en tiempo real. Acompáñame mientras exploramos cómo construir nuestro propio broker y lo llevamos al siguiente nivel mediante su contenerización con Docker y la integración con Mosquitto.

Repositorio del proyecto: https://github.com/elrincondeada/mqttdemo

mosquitto logo
mosquitto logo
Video sobre este post del rincón de Ada

Mosquitto es un broker ampliamente adoptado en la comunidad de entusiastas del Internet de las Cosas (IoT), especialmente en entornos de código abierto. Se integra de manera sencilla con sistemas domóticos populares como Home Assistant. Para aquellos interesados en probarlo, Docker ofrece una forma sencilla y eficiente de hacerlo. Puedes encontrar instrucciones detalladas para utilizar la imagen oficial de Mosquitto en Docker en su página oficial en Docker Hub. Además, en el repositorio del proyecto, he incluido un ejemplo práctico utilizando Docker Compose. Al ejecutar este ejemplo, podrás observar cómo los sensores se comunican en tiempo real con el display. Muy parecido a como haciamos con la imagen de EMQX.

Implementando nuestro broker con c# y MQTTnet

¡Pero no nos quedaremos ahí! Este blog también tiene un enfoque en programación, y en esa línea, vamos a elevar el listón. Crearemos nuestro propio broker MQTT desde cero, y lo mejor de todo es que sólo necesitaremos unas pocas líneas de código. Gracias a la programación basada en eventos, podremos capturar todas las actividades esenciales que suceden en el broker, desde nuevas suscripciones y publicaciones en diferentes tópicos, hasta el envío de mensajes en tiempo real. Para lograr esto, utilizaremos la potente y flexible librería MQTTnet, que nos permite crear tanto clientes MQTT—ya sean suscriptores o publicadores—como servidores o brokers. También construiremos una imagen de Docker para nuestro broker, permitiéndonos desplegarlo en cualquier contenedor. Esto es especialmente útil en entornos de domótica, donde es común tener un servidor dedicado al control de dispositivos y servicios. Utilizar contenedores Docker para estos servicios es una estrategia inteligente; no sólo para un broker MQTT, sino también para otros servicios como Home Assistant o Node-RED. Personalmente, para estos propósitos, utilizo una Raspberry Pi.

Para que nuestro broker funcione como servicio crearemos una aplicación Host en dotnet. Registraremos un broker con el servidor MQTTnet. Simplemente tenemos dos archivos:

Program.cs

Este archivo es el punto de entrada para tu aplicación de servidor MQTT en .NET. Utiliza el modelo de hospedaje genérico de .NET para configurar y ejecutar servicios en segundo plano.

namespace DotNetMqtt.Server;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;

internal class Program
{
    private static async Task Main(string[] args)
    {
        var hostBuilder = Host.CreateDefaultBuilder(args);
        hostBuilder.ConfigureServices((hostContext, services) =>
        {
            services.AddHostedService<MqttServerWorker>();
        });
        await hostBuilder.RunConsoleAsync();
    }
}

Explicación

  • Host.CreateDefaultBuilder(args): Este método configura un nuevo host con algunas configuraciones predeterminadas.
  • ConfigureServices: Aquí es donde registras tus servicios. En este caso, estás añadiendo MqttServerWorker como un servicio alojado.
  • RunConsoleAsync(): Este método inicia el host y lo ejecuta hasta que se cierra.

MqttServerWorker.cs

Este archivo contiene la lógica principal para tu servidor MQTT. Implementa la interfaz IHostedService, lo que significa que se iniciará y detendrá automáticamente con el host de la aplicación.

  • StartAsync: Este método se llama cuando el servicio se inicia. Aquí es donde configuras y arrancas tu servidor MQTT.
    • MqttFactory: Crea una nueva instancia de un servidor MQTT.
    • MqttServerOptionsBuilder: Configura las opciones para el servidor MQTT.
    • StartAsync(): Inicia el servidor MQTT.
  • StopAsync: Este método se llama cuando el servicio se detiene. Aquí es donde detienes tu servidor MQTT.
  • Eventos como ClientConnectedAsync, InterceptingSubscriptionAsync, etc.: Estos son manejadores de eventos que se disparan cuando ocurren ciertas acciones en el servidor MQTT. Utilizas estos para registrar información útil.
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;

namespace DotNetMqtt.Server;

public class MqttServerWorker : IHostedService, IDisposable
{
    private readonly ILogger<MqttServerWorker> _logger;
    private MqttServer? _mqttServer;

    public MqttServerWorker(ILogger<MqttServerWorker> logger)
    {
        _logger = logger;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {

        _logger.LogInformation("Iniciando servidor MQTT");
        var mqttFactory = new MqttFactory();
        var optionsBuilder = new MqttServerOptionsBuilder()
                             .WithDefaultEndpoint();

        _mqttServer = mqttFactory.CreateMqttServer(optionsBuilder.Build());

        _mqttServer.ClientConnectedAsync += ClienteConectado;
        _mqttServer.ClientConnectedAsync += ClienteDesconectado;
        _mqttServer.InterceptingSubscriptionAsync += InterceptandoSubscripcion;
        _mqttServer.InterceptingUnsubscriptionAsync += InterceptandoDesuscripcion;
        _mqttServer.InterceptingPublishAsync += InterceptandoPublicacion;
        _mqttServer.InterceptingClientEnqueueAsync += InterceptandoColaCliente;
        await _mqttServer.StartAsync();
        _logger.LogInformation("Servidor MQTT iniciado");

    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if(_mqttServer != null)
        {
            _logger.LogInformation("Deteniendo servidor MQTT");
            await _mqttServer.StopAsync();
            _logger.LogInformation("Servidor MQTT detenido");
        }
    }


    private Task InterceptandoColaCliente(InterceptingClientApplicationMessageEnqueueEventArgs args)
    {
        _logger.LogInformation($"Cliente publicador {args.SenderClientId} suscriptor {args.ReceiverClientId} encola mensaje {args.ApplicationMessage.Topic} - {args.ApplicationMessage.ConvertPayloadToString()}");
        return Task.CompletedTask;
    }

    private Task InterceptandoPublicacion(InterceptingPublishEventArgs args)
    {
        _logger.LogInformation($"Cliente {args.ClientId} publicó en {args.ApplicationMessage.Topic} - {args.ApplicationMessage.ConvertPayloadToString()}");
        return Task.CompletedTask;
    }


    private Task InterceptandoDesuscripcion(InterceptingUnsubscriptionEventArgs args)
    {
       _logger.LogInformation($"Cliente {args.ClientId} se ha desuscrito a {args.Topic}");
        return Task.CompletedTask;
    }


    private Task InterceptandoSubscripcion(InterceptingSubscriptionEventArgs args)
    {
        _logger.LogInformation($"Cliente {args.ClientId} se ha suscrito a {args.TopicFilter.Topic}");
        return Task.CompletedTask;
    }


    private Task ClienteDesconectado(ClientConnectedEventArgs args)
    {
        _logger.LogInformation($"Cliente Desconectado {args.ClientId} {args.Endpoint}");
        return Task.CompletedTask;
    }


    private  Task ClienteConectado(ClientConnectedEventArgs args)
    {
        _logger.LogInformation($"Cliente conectado {args.ClientId} {args.Endpoint}");
        return Task.CompletedTask;
    }


    public void Dispose()
    {
        if(_mqttServer != null)
        {
            _mqttServer.ClientConnectedAsync -= ClienteConectado;
            _mqttServer.ClientConnectedAsync -= ClienteDesconectado;
            _mqttServer.InterceptingSubscriptionAsync -= InterceptandoSubscripcion;
            _mqttServer.InterceptingUnsubscriptionAsync -= InterceptandoDesuscripcion;
            _mqttServer.InterceptingPublishAsync -= InterceptandoPublicacion;
            _mqttServer.InterceptingClientEnqueueAsync -= InterceptandoColaCliente;
            _mqttServer.Dispose();
        }
    }
}

Interesante ver estamos usando dos patrones de desarrollo el builder y la factory para construir el Host y el servidor.

Docker del servidor

Para llevernos nuestro servidor a nuestro contenedor tenemos que crear la imagen primero. Con este Dockerfile lo realizamos:

# Utilizar la imagen base de .NET
FROM mcr.microsoft.com/dotnet/aspnet:latest AS base
WORKDIR /app
EXPOSE 1883

# Compilar la aplicación
FROM mcr.microsoft.com/dotnet/sdk:latest AS build
WORKDIR /src
COPY ["DotNetMqtt.Server.csproj", "/src"]
RUN dotnet restore "DotNetMqtt.Server.csproj"
COPY . .
WORKDIR "/src"
RUN dotnet build "DotNetMqtt.Server.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "DotNetMqtt.Server.csproj" -c Release -o /app/publish

# Copiar y ejecutar la aplicación
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DotNetMqtt.Server.dll"]

En detalle esto es lo que hace

  • FROM mcr.microsoft.com/dotnet/aspnet:latest AS base: Aquí se está utilizando la última imagen de ASP.NET Core disponible como imagen base y se le da el alias «base».
  • WORKDIR /app: Establece el directorio de trabajo en el contenedor a /app.
  • EXPOSE 1883: Informa a Docker que el contenedor escuchará en el puerto 1883, que es el puerto estándar para MQTT.
  • FROM mcr.microsoft.com/dotnet/sdk:latest AS build: Utiliza la última imagen del SDK de .NET y le da el alias «build».
  • WORKDIR /src: Establece el directorio de trabajo en el contenedor a /src.
  • COPY ["DotNetMqtt.Server.csproj", "/src"]: Copia el archivo de proyecto .csproj al contenedor.
  • RUN dotnet restore "DotNetMqtt.Server.csproj": Restaura las dependencias del proyecto.
  • COPY . .: Copia todos los archivos del directorio actual al contenedor.
  • RUN dotnet build "DotNetMqtt.Server.csproj" -c Release -o /app/build: Compila el proyecto en modo «Release» y coloca los archivos compilados en /app/build.
  • Publicación de la Aplicación:
  • FROM build AS publish: Utiliza la imagen «build» como base y le da el alias «publish».
  • RUN dotnet publish "DotNetMqtt.Server.csproj" -c Release -o /app/publish: Publica la aplicación en modo «Release» y coloca los archivos en /app/publish.
  • FROM base AS final: Utiliza la imagen «base» como la imagen final.
  • WORKDIR /app: Establece el directorio de trabajo en el contenedor a /app.
  • COPY --from=publish /app/publish .: Copia los archivos publicados desde la imagen «publish» al directorio de trabajo actual en la imagen.
  • ENTRYPOINT ["dotnet", "DotNetMqtt.Server.dll"]: Establece el punto de entrada de la aplicación, indicando que se debe ejecutar DotNetMqtt.Server.dll usando dotnet.

En resumen dos etapas: una de copia del código, construcción binarios con el sdk y otra de publicación, en la imagen final que es una imagén básica con solo en runtime de dotnet

Finalmente:

docker build -t tu_usuario_dockerhub/nombre_de_la_imagen:etiqueta .

¿Y después que viene?

En las próximas entregas vamos a elevar nuestro desarrollo al siguiente nivel llevándolo directamente a la nube. ¿Te imaginas gestionar tu proyecto en plataformas líderes como Azure y AWS? Pues eso es exactamente lo que haremos. Configuraremos un escenario completo en Azure, y no solo eso: globalizaremos nuestro desarrollo para que puedas integrarlo con Home Assistant, Alexa y mucho más. ¡No querrás perdértelo!

Referencias:

Aprendiendo MQTT: El Protocolo, Aplicaciones y un Ejemplo Práctico

Mqtt elementos y comunicación entre ellos

Como punto de inicio para este blog, me he propuesto ofrecer un enfoque práctico del protocolo de comunicaciones MQTT, una herramienta esencial en el mundo del Internet de las Cosas (IoT). Para comenzar, presentaré tres ejemplos concretos que ilustran los componentes clave del protocolo: el publicador, el suscriptor y el intermediario o ‘broker’. A continuación, encontrarás un video resumen que anticipa lo que abordaremos en detalle. Tras el video, profundizaremos en algunos conceptos teóricos para entender mejor la tecnología que estamos manejando.

¿Qué es MQTT?

El Protocolo de Cola de Mensajes Telemétricos (MQTT, por sus siglas en inglés de Message Queuing Telemetry Transport) es un protocolo de comunicación diseñado para entornos de bajo ancho de banda, alta latencia o redes inestables. Originalmente desarrollado por IBM en 1999, el MQTT ha ganado relevancia en la era del Internet de las Cosas (IoT) debido a su diseño ligero y eficiente. En este post, exploraremos los fundamentos de MQTT y veremos ejemplos de cómo se aplica en escenarios del mundo real. MQTT es un protocolo de mensajería basado en el modelo de publicación/suscripción. En este modelo, hay tres componentes principales:

  1. Publicador: Envía mensajes a un tema específico.
  2. Suscriptor: Escucha mensajes en un tema específico.
  3. Broker: Sirve como intermediario, recibiendo mensajes de los publicadores y enviándolos a los suscriptores.

A diferencia de otros protocolos de mensajería como HTTP, MQTT es más ligero, consume menos recursos y tiene la capacidad de mantener la conexión activa durante más tiempo, lo que lo hace ideal para dispositivos IoT.

Características Principales

  • QoS (Calidad del Servicio): MQTT soporta diferentes niveles de calidad de servicio para el envío de mensajes.
  • Retención de Mensajes: El broker puede retener el último mensaje en un tema para que los nuevos suscriptores puedan recibirlo inmediatamente después de suscribirse.
  • Last Will and Testament (Testamento): Un mensaje que se envía cuando el cliente se desconecta inesperadamente.
  • Seguridad: Aunque MQTT no define medidas de seguridad propias, se puede implementar mediante TLS/SSL.

Es importante mencionar que MQTT ofrece diferentes niveles de «Calidad de Servicio» (QoS) para el envío de mensajes:

  • QoS 0: El mensaje se entrega en el mejor esfuerzo posible, pero no hay confirmación de entrega. Puede haber pérdida de mensajes.
  • QoS 1: El mensaje se entrega al menos una vez, y se confirma con un mensaje PUBACK. Esto asegura la entrega, pero puede haber duplicados.
  • QoS 2: El mensaje se entrega exactamente una vez al receptor. Este es el nivel más alto de QoS y utiliza un intercambio de cuatro pasos para asegurarse de que no hay pérdidas ni duplicados.

Según el nivel de QoS, el broker y el cliente pueden intercambiar mensajes adicionales para asegurar la entrega, pero una vez que la entrega se ha confirmado, el mensaje ya no se guarda en el broker (a menos, como dije antes, que se utilice la bandera «Retain»).

Entonces sí, en general, los mensajes se entregan a los clientes suscritos y luego se «borran» o no se retienen en el broker, a menos que se especifique lo contrario.

Tópicos (Topics) en MQTT

Los tópicos son una de las características más fundamentales en MQTT. Actúan como canales de mensajería a los que los clientes pueden suscribirse o en los cuales pueden publicar mensajes. Cada tópico es simplemente una cadena de texto que suele estar estructurada en niveles, separados por el carácter ‘/’. Por ejemplo, casa/sala/temperatura podría ser un tópico utilizado para publicar la temperatura en la sala de una casa. Cuando un cliente se suscribe a un tópico, recibe todos los mensajes que se publican en ese tópico por parte de cualquier publicador (publisher). De manera similar, cualquier mensaje que un cliente publique en un tópico se enviará a todos los clientes suscritos a ese tópico.

Comodines (Wildcards) en MQTT

Los comodines son caracteres especiales que permiten a los clientes suscribirse a múltiples tópicos simultáneamente. MQTT soporta dos tipos de comodines:

  1. Signo más (+): Este comodín puede sustituir a un solo nivel en un tópico. Por ejemplo, si te suscribes al tópico casa/+/temperatura, recibirías mensajes de casa/sala/temperatura, casa/cocina/temperatura, etc.
  2. Almohadilla (#): Este comodín puede sustituir a múltiples niveles en un tópico. Debe ser utilizado solo al final de un tópico. Por ejemplo, casa/# te suscribiría a todos los tópicos que comiencen con casa, incluyendo casa/sala/temperatura, casa/sala/humedad, casa/cocina/luz, etc.

También es posible utilizar ambos comodines en una sola suscripción. Por ejemplo, casa/+/+/estado podría recibir mensajes de casa/sala/luz/estado, casa/cocina/ventilador/estado, etc.

Los tres elementos en la práctica.

En los siguientes vídeos y posts iremos viendo en detalle los tres elmentos que componen el protocolo. El publicador será un sensor de temperatura y humedad AHT20 gobernado por una placa ESP32 S3. La naturaleza del envío de mensajes va a ser síncrona y se realizará cada cierto intervalo de tiempo. Este es un ejemplo que estaría dentro del loop del framework de arduino.

 long now = millis();
  static long lastMsg = 0;

  if (now - lastMsg > 10000)
  { // Enviar telemetría cada 10 segundos
    lastMsg = now;

    // Aquí, pon el código para leer tu sensor
    float temperatura = e_temp.temperature;
    float humedad = e_humedad.relative_humidity;

    char msgt[50];
    snprintf(msgt, 50, "Valor del sensor: %f", temperatura);
    Serial.print("Publicando mensaje: ");
    Serial.println(msgt);

    // Publicar el mensaje
    client.publish("esp32/temperatura", msgt);

    char msgh[50];
    snprintf(msgh, 50, "Valor del sensor: %f", humedad);
    Serial.print("Publicando mensaje: ");
    Serial.println(msgh);

    // Publicar el mensaje
    client.publish("esp32/humedad", msgh);
  }

El broker, de momento, usaremos EMQX (sitio web EMQX) Nos podemos montar un servidor de pruebas con docker fácilmente. Este sería un ejemplo de Dockerfile:

# Usar la imagen oficial de EMQ X
FROM emqx/emqx
# Exponer los puertos MQTT, MQTT/SSL, WebSockets
EXPOSE 1883 8883 8083
# Ejecutar EMQ X
CMD ["emqx", "start"]

Después, construir la imagen y lanzar la instancia

docker build -t my_emqx .
docker run -d --name my_emqx_instance -p 1883:1883 -p 8083:8083 -p 8883:8883 my_emqx

Si lo preferéis, podemos usar directamente:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 emqx/emqx

Estos son los puertos que expone y usa el broker. Los puertos del protocolo son los estándares.

  • 1883: Puerto estándar para MQTT sin cifrado.
  • 8883: Puerto estándar para MQTT con cifrado SSL/TLS.
  • 8083: Puerto para el protocolo MQTT sobre WebSockets sin cifrado.
  • 8084: Puerto para el protocolo MQTT sobre WebSockets con cifrado SSL/TLS (esto es lo que añadiste en tu comando).
  • 18083: Puerto para la interfaz de administración web de EMQ X (esto también es adicional en tu comando).

Por último, vamos a hablar del consumidor o suscriptor. La comunicación o recepción de mensajes aquí es de naturaleza asíncrona. En programación, una función de callback, recibirá los datos del mensaje una vez que nos hayamos suscrito a un tópico. La implementación del consumidor la he hecho en c# con dotnet y la librería MQTTnet https://github.com/dotnet/MQTTnet (imaginaros un servicio en la nube que analiza datos) pero también podría ser un actuador que se implentara en otra placa ESP32.

//Evento que se va a lanzar cada vez que se publique en uno de
//los "topic" a los que estamos suscritos
mqttClient.ApplicationMessageReceivedAsync += e =>
{
	Console.WriteLine($"Recibido mensaje. topic:{e.ApplicationMessage.Topic}");
	Console.WriteLine(Encoding.ASCII.GetString(e.ApplicationMessage.PayloadSegment));
	return Task.CompletedTask;
};

Ejemplos de usos de MQTT y aplicaciones:

Automatización del Hogar

MQTT es ampliamente utilizado en sistemas de automatización del hogar como Home Assistant o Domoticz. Por ejemplo, un sensor de temperatura puede publicar lecturas en un tema, y un termostato puede suscribirse a ese tema para ajustar la calefacción o la refrigeración.

Monitoreo de Salud

En un hospital, los dispositivos médicos como monitores de signos vitales pueden utilizar MQTT para enviar datos en tiempo real a un sistema centralizado que los médicos puedan supervisar.

Vehículos Autónomos

Los vehículos autónomos generan grandes cantidades de datos de sensores que deben procesarse en tiempo real. MQTT puede facilitar la transmisión eficiente de estos datos a una estación central para su análisis y toma de decisiones.

Agricultura Inteligente

Sensores de humedad y temperatura en un campo agrícola pueden usar MQTT para enviar sus datos a un sistema central. Basado en estos datos, el sistema puede ajustar automáticamente el riego y la fertilización.

Logística y Seguimiento

Las empresas de logística pueden utilizar MQTT para monitorear en tiempo real la ubicación y el estado de sus vehículos y mercancías, lo que permite una mejor planificación y eficiencia en la entrega.

Conclusión

MQTT se ha establecido como un protocolo esencial para el Internet de las Cosas y otras aplicaciones donde la eficiencia, la simplicidad y la fiabilidad son cruciales. Desde la automatización del hogar hasta aplicaciones empresariales y sanitarias, las posibilidades son prácticamente infinitas. En los siguientes vídeos y ejemplos iremos desgrando cada uno de los elementos del protocolos y aprenderemos experimentando con el. Os animo a estar pendiente y a divertiros.

Referencias:

El código fuente:

https://github.com/elrincondeada/mqttdemo