Mqtt implementando un broker

En esta cuarta entrega de nuestra serie sobre MQTT, pondremos el foco en uno de los componentes más cruciales de cualquier sistema de mensajería en tiempo real: el broker. Imagina un escenario de domótica en el que múltiples dispositivos necesitan comunicarse entre sí de manera eficiente. En este contexto, el broker actúa como el cerebro operativo, encargado de distribuir mensajes desde los publicadores—en nuestro caso, sensores—hacia los suscriptores, como podría ser un display que muestra datos en tiempo real. Acompáñame mientras exploramos cómo construir nuestro propio broker y lo llevamos al siguiente nivel mediante su contenerización con Docker y la integración con Mosquitto.

Repositorio del proyecto: https://github.com/elrincondeada/mqttdemo

mosquitto logo
mosquitto logo
Video sobre este post del rincón de Ada

Mosquitto es un broker ampliamente adoptado en la comunidad de entusiastas del Internet de las Cosas (IoT), especialmente en entornos de código abierto. Se integra de manera sencilla con sistemas domóticos populares como Home Assistant. Para aquellos interesados en probarlo, Docker ofrece una forma sencilla y eficiente de hacerlo. Puedes encontrar instrucciones detalladas para utilizar la imagen oficial de Mosquitto en Docker en su página oficial en Docker Hub. Además, en el repositorio del proyecto, he incluido un ejemplo práctico utilizando Docker Compose. Al ejecutar este ejemplo, podrás observar cómo los sensores se comunican en tiempo real con el display. Muy parecido a como haciamos con la imagen de EMQX.

Implementando nuestro broker con c# y MQTTnet

¡Pero no nos quedaremos ahí! Este blog también tiene un enfoque en programación, y en esa línea, vamos a elevar el listón. Crearemos nuestro propio broker MQTT desde cero, y lo mejor de todo es que sólo necesitaremos unas pocas líneas de código. Gracias a la programación basada en eventos, podremos capturar todas las actividades esenciales que suceden en el broker, desde nuevas suscripciones y publicaciones en diferentes tópicos, hasta el envío de mensajes en tiempo real. Para lograr esto, utilizaremos la potente y flexible librería MQTTnet, que nos permite crear tanto clientes MQTT—ya sean suscriptores o publicadores—como servidores o brokers. También construiremos una imagen de Docker para nuestro broker, permitiéndonos desplegarlo en cualquier contenedor. Esto es especialmente útil en entornos de domótica, donde es común tener un servidor dedicado al control de dispositivos y servicios. Utilizar contenedores Docker para estos servicios es una estrategia inteligente; no sólo para un broker MQTT, sino también para otros servicios como Home Assistant o Node-RED. Personalmente, para estos propósitos, utilizo una Raspberry Pi.

Para que nuestro broker funcione como servicio crearemos una aplicación Host en dotnet. Registraremos un broker con el servidor MQTTnet. Simplemente tenemos dos archivos:

Program.cs

Este archivo es el punto de entrada para tu aplicación de servidor MQTT en .NET. Utiliza el modelo de hospedaje genérico de .NET para configurar y ejecutar servicios en segundo plano.

namespace DotNetMqtt.Server;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;

internal class Program
{
    private static async Task Main(string[] args)
    {
        var hostBuilder = Host.CreateDefaultBuilder(args);
        hostBuilder.ConfigureServices((hostContext, services) =>
        {
            services.AddHostedService<MqttServerWorker>();
        });
        await hostBuilder.RunConsoleAsync();
    }
}

Explicación

  • Host.CreateDefaultBuilder(args): Este método configura un nuevo host con algunas configuraciones predeterminadas.
  • ConfigureServices: Aquí es donde registras tus servicios. En este caso, estás añadiendo MqttServerWorker como un servicio alojado.
  • RunConsoleAsync(): Este método inicia el host y lo ejecuta hasta que se cierra.

MqttServerWorker.cs

Este archivo contiene la lógica principal para tu servidor MQTT. Implementa la interfaz IHostedService, lo que significa que se iniciará y detendrá automáticamente con el host de la aplicación.

  • StartAsync: Este método se llama cuando el servicio se inicia. Aquí es donde configuras y arrancas tu servidor MQTT.
    • MqttFactory: Crea una nueva instancia de un servidor MQTT.
    • MqttServerOptionsBuilder: Configura las opciones para el servidor MQTT.
    • StartAsync(): Inicia el servidor MQTT.
  • StopAsync: Este método se llama cuando el servicio se detiene. Aquí es donde detienes tu servidor MQTT.
  • Eventos como ClientConnectedAsync, InterceptingSubscriptionAsync, etc.: Estos son manejadores de eventos que se disparan cuando ocurren ciertas acciones en el servidor MQTT. Utilizas estos para registrar información útil.
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;

namespace DotNetMqtt.Server;

public class MqttServerWorker : IHostedService, IDisposable
{
    private readonly ILogger<MqttServerWorker> _logger;
    private MqttServer? _mqttServer;

    public MqttServerWorker(ILogger<MqttServerWorker> logger)
    {
        _logger = logger;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {

        _logger.LogInformation("Iniciando servidor MQTT");
        var mqttFactory = new MqttFactory();
        var optionsBuilder = new MqttServerOptionsBuilder()
                             .WithDefaultEndpoint();

        _mqttServer = mqttFactory.CreateMqttServer(optionsBuilder.Build());

        _mqttServer.ClientConnectedAsync += ClienteConectado;
        _mqttServer.ClientConnectedAsync += ClienteDesconectado;
        _mqttServer.InterceptingSubscriptionAsync += InterceptandoSubscripcion;
        _mqttServer.InterceptingUnsubscriptionAsync += InterceptandoDesuscripcion;
        _mqttServer.InterceptingPublishAsync += InterceptandoPublicacion;
        _mqttServer.InterceptingClientEnqueueAsync += InterceptandoColaCliente;
        await _mqttServer.StartAsync();
        _logger.LogInformation("Servidor MQTT iniciado");

    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if(_mqttServer != null)
        {
            _logger.LogInformation("Deteniendo servidor MQTT");
            await _mqttServer.StopAsync();
            _logger.LogInformation("Servidor MQTT detenido");
        }
    }


    private Task InterceptandoColaCliente(InterceptingClientApplicationMessageEnqueueEventArgs args)
    {
        _logger.LogInformation($"Cliente publicador {args.SenderClientId} suscriptor {args.ReceiverClientId} encola mensaje {args.ApplicationMessage.Topic} - {args.ApplicationMessage.ConvertPayloadToString()}");
        return Task.CompletedTask;
    }

    private Task InterceptandoPublicacion(InterceptingPublishEventArgs args)
    {
        _logger.LogInformation($"Cliente {args.ClientId} publicó en {args.ApplicationMessage.Topic} - {args.ApplicationMessage.ConvertPayloadToString()}");
        return Task.CompletedTask;
    }


    private Task InterceptandoDesuscripcion(InterceptingUnsubscriptionEventArgs args)
    {
       _logger.LogInformation($"Cliente {args.ClientId} se ha desuscrito a {args.Topic}");
        return Task.CompletedTask;
    }


    private Task InterceptandoSubscripcion(InterceptingSubscriptionEventArgs args)
    {
        _logger.LogInformation($"Cliente {args.ClientId} se ha suscrito a {args.TopicFilter.Topic}");
        return Task.CompletedTask;
    }


    private Task ClienteDesconectado(ClientConnectedEventArgs args)
    {
        _logger.LogInformation($"Cliente Desconectado {args.ClientId} {args.Endpoint}");
        return Task.CompletedTask;
    }


    private  Task ClienteConectado(ClientConnectedEventArgs args)
    {
        _logger.LogInformation($"Cliente conectado {args.ClientId} {args.Endpoint}");
        return Task.CompletedTask;
    }


    public void Dispose()
    {
        if(_mqttServer != null)
        {
            _mqttServer.ClientConnectedAsync -= ClienteConectado;
            _mqttServer.ClientConnectedAsync -= ClienteDesconectado;
            _mqttServer.InterceptingSubscriptionAsync -= InterceptandoSubscripcion;
            _mqttServer.InterceptingUnsubscriptionAsync -= InterceptandoDesuscripcion;
            _mqttServer.InterceptingPublishAsync -= InterceptandoPublicacion;
            _mqttServer.InterceptingClientEnqueueAsync -= InterceptandoColaCliente;
            _mqttServer.Dispose();
        }
    }
}

Interesante ver estamos usando dos patrones de desarrollo el builder y la factory para construir el Host y el servidor.

Docker del servidor

Para llevernos nuestro servidor a nuestro contenedor tenemos que crear la imagen primero. Con este Dockerfile lo realizamos:

# Utilizar la imagen base de .NET
FROM mcr.microsoft.com/dotnet/aspnet:latest AS base
WORKDIR /app
EXPOSE 1883

# Compilar la aplicación
FROM mcr.microsoft.com/dotnet/sdk:latest AS build
WORKDIR /src
COPY ["DotNetMqtt.Server.csproj", "/src"]
RUN dotnet restore "DotNetMqtt.Server.csproj"
COPY . .
WORKDIR "/src"
RUN dotnet build "DotNetMqtt.Server.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "DotNetMqtt.Server.csproj" -c Release -o /app/publish

# Copiar y ejecutar la aplicación
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DotNetMqtt.Server.dll"]

En detalle esto es lo que hace

  • FROM mcr.microsoft.com/dotnet/aspnet:latest AS base: Aquí se está utilizando la última imagen de ASP.NET Core disponible como imagen base y se le da el alias «base».
  • WORKDIR /app: Establece el directorio de trabajo en el contenedor a /app.
  • EXPOSE 1883: Informa a Docker que el contenedor escuchará en el puerto 1883, que es el puerto estándar para MQTT.
  • FROM mcr.microsoft.com/dotnet/sdk:latest AS build: Utiliza la última imagen del SDK de .NET y le da el alias «build».
  • WORKDIR /src: Establece el directorio de trabajo en el contenedor a /src.
  • COPY ["DotNetMqtt.Server.csproj", "/src"]: Copia el archivo de proyecto .csproj al contenedor.
  • RUN dotnet restore "DotNetMqtt.Server.csproj": Restaura las dependencias del proyecto.
  • COPY . .: Copia todos los archivos del directorio actual al contenedor.
  • RUN dotnet build "DotNetMqtt.Server.csproj" -c Release -o /app/build: Compila el proyecto en modo «Release» y coloca los archivos compilados en /app/build.
  • Publicación de la Aplicación:
  • FROM build AS publish: Utiliza la imagen «build» como base y le da el alias «publish».
  • RUN dotnet publish "DotNetMqtt.Server.csproj" -c Release -o /app/publish: Publica la aplicación en modo «Release» y coloca los archivos en /app/publish.
  • FROM base AS final: Utiliza la imagen «base» como la imagen final.
  • WORKDIR /app: Establece el directorio de trabajo en el contenedor a /app.
  • COPY --from=publish /app/publish .: Copia los archivos publicados desde la imagen «publish» al directorio de trabajo actual en la imagen.
  • ENTRYPOINT ["dotnet", "DotNetMqtt.Server.dll"]: Establece el punto de entrada de la aplicación, indicando que se debe ejecutar DotNetMqtt.Server.dll usando dotnet.

En resumen dos etapas: una de copia del código, construcción binarios con el sdk y otra de publicación, en la imagen final que es una imagén básica con solo en runtime de dotnet

Finalmente:

docker build -t tu_usuario_dockerhub/nombre_de_la_imagen:etiqueta .

¿Y después que viene?

En las próximas entregas vamos a elevar nuestro desarrollo al siguiente nivel llevándolo directamente a la nube. ¿Te imaginas gestionar tu proyecto en plataformas líderes como Azure y AWS? Pues eso es exactamente lo que haremos. Configuraremos un escenario completo en Azure, y no solo eso: globalizaremos nuestro desarrollo para que puedas integrarlo con Home Assistant, Alexa y mucho más. ¡No querrás perdértelo!

Referencias:

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *