it编程 > 编程语言 > Asp.net

C#使用MQTTnet实现服务端与客户端的通讯的示例

22人参与 2025-05-22 Asp.net

一、mqtt 协议简介

mqtt(message queuing telemetry transport)是一种轻量级的 发布/订阅 协议,专为物联网(iot)等低带宽、高延迟网络环境设计。核心概念包括:

二、mqtt 协议核心特性

mqtt(message queuing telemetry transport)是一种基于发布/订阅模型的轻量级通信协议,专为资源受限的设备和不可靠网络环境设计。其核心优势包括:

低带宽消耗:采用二进制报文格式,头部开销极小,适合物联网设备。

异步通信:通过主题(topic)实现消息的广播与定向传递,解耦消息生产者和消费者。

多级服务质量(qos)

离线支持:服务端可缓存客户端的保留消息(retained messages),供后续订阅者读取。

三、mqttnet 库的核心功能

mqttnet 是 .net 生态中功能完备的 mqtt 实现库,具备以下特性:

 所用框架

框架版本
.net4.7.2+
mqttnet4.3.3+

四、服务端(broker)实现详解

核心职责

关键配置项

事件机制

持久化扩展

 以下为服务端代码:(下方console.writeline()方法可换成自己的日志方法)

public class mqttserverhelper
{
    private mqttserver _server;//mqtt服务器对象

    // 定义一个委托和事件(临时存储连接客户端数据)
    public event eventhandler<interceptingpublisheventargs> onmessagereceived;
    public event eventhandler<bool> serverstauts;
    public event eventhandler<clientconnectedeventargs> clientconnected;
    public event eventhandler<clientdisconnectedeventargs> clientdisconnected;
    public event eventhandler<clientsubscribedtopiceventargs> clientsubscribedtopic;
    public event eventhandler<clientunsubscribedtopiceventargs> clientunsubscribedtopic;

    /// <summary>
    /// 初始化mqtt服务并启动服务
    /// </summary>
    /// <param name="ip">ipv4地址</param>
    /// <param name="port">端口:0~65535之间</param>
    public task startmqtserver(string ip, int port) 
    { 
        mqtserveroptions mqtserveroptions = new mqtserveroptionsbuilder()                     
               .withdefaultendpoint()     
               .withdefaultendpointboundipadres(system.net.ipadres.parse(ip) 
               .withdefaultendpointport(port) 
               .withdefaultcomunicationtimeout(timespan.frommiliseconds(500) .build();     
        _server = new mqtfactory().createmqtserver(mqtserveroptions); / 创建mqt服务端对象 
        _server.validatingconectionasync += server_validatingconectionasync; /验证用户名和密码
        _server.clientconectedasync += server_clientconectedasync; /绑定客户端连接事件 
        _server.clientdisconectedasync += server_clientdisconectedasync; /绑定客户端断开事件 
        _server.clientsubscribedtopicasync += server_clientsubscribedtopicasync; /绑定客户端订阅主题事件 
        _server.clientunsubscribedtopicasync += server_clientunsubscribedtopicasync; /绑定客户端退订主题事件 
        _server.interceptingpublishasync += server_interceptingpublishasync; /消息接收事件 
        _server.clientacknowledgedpublishpacketasync += server_clientacknowledgedpublishpacketasync; /处理客户端确认发布的数据包         
        _server.interceptingclientenqueueasync += server_interceptingclientenqueueasync; /订阅拦截客户端消息队列 
        _server.aplicationmesagenotconsumedasync += server_aplicationmesagenotconsumedasync; /应用程序逻辑处理 
        _server.startedasync += server_startedasync;/绑定服务端启动事件 
        _server.stopedasync += server_stopedasync;/绑定服务端停止事件 
        return _server.startasync();
    }

    /// <summary>
    /// 处理客户端确认发布事件
    /// </summary>
    /// <param name="e"></param>
    private task server_aplicationmesagenotconsumedasync(aplicationmesagenotconsumedeventargs e)
    { 
        try 
        {
            console.writeline($"【mesagenotconsumed】-senderid:{e.senderid}-mesage:{e.aplicationmesage.convertpayloadtostring()}");
        }
        catch (exception ex) 
        { 
            console.writeline($"server_aplicationmesagenotconsumedasync出现异常:{ex.mesage}"); 
        }
        return task.completedtask;
    }

    /// <summary>
    /// 订阅拦截客户端消息队列事件
    /// </summary>
    /// <param name="e"></param>
    private task server_interceptingclientenqueueasync(interceptingclientaplicationmesageenqueueeventargs e)  
    { 
        try 
        {
            console.writeline($"【interceptingclientenqueue】-senderid:{e.senderclientid}-mesage:{e.aplicationmesage.convertpayloadtostring()}"); 
        } 
        catch (exception ex) 
        {
            console.writeline($"server_interceptingclientenqueueasync出现异常:{ex.mesage}");
        }
        return task.completedtask;
    }
    
    /// <summary>
    /// 当客户端处理完从mqt服务器接收到的应用消息后触发。
    /// 此事件可以用于确认消息已被处理,更新应用状态,
    /// </summary>
    /// <param name="e"></param>
    private task server_clientacknowledgedpublishpacketasync(clientacknowledgedpublishpacketeventargs e)         
    { 
        try 
        { 
            console.writeline($"【clientacknowledgedpublishpacket】-senderid:{e.clientid}-mesage:{encoding.utf8.getstring(e.publishpacket.payloadsegment.toaray()}");         
        } 
        catch (exception ex) 
        { 
            console.writeline($"server_clientacknowledgedpublishpacketasync出现异常:{ex.mesage}"); 
        } 
        return task.completedtask; 
    }

    /// <summary>
    /// 服务端消息接收
    /// </summary>
    /// <param name="e"></param>
    private task server_interceptingpublishasync(interceptingpublisheventargs e) 
    { 
        try 
        { 
            string client = e.clientid; string topic = e.aplicationmesage.topic; 
            string contents = e.aplicationmesage.convertpayloadtostring();
            //encoding.utf8.getstring(arg.aplicationmesage.payloadsegment.toaray();
            onmesagereceived?.invoke(this, e); 
            console.writeline($"接收到消息:client:【{client}】 topic:【{topic}】 mesage:【{contents}】"); 
        } 
        catch (exception ex) 
        { 
            console.writeline($"server_interceptingpublishasync出现异常:{ex.mesage}");
        } 
        return task.completedtask;
    }

    /// <summary>
    /// 服务端断开事件
    /// </summary>
    /// <param name="e"></param>
    private task server_stoppedasync(eventargs arg) 
    { 
        return task.run(new action() => 
        { 
            serverstauts?.invoke(this, false); 
            console.writeline($"服务端【ip:port】已停止mqt"); 
        }); 
    }

    /// <summary>
    /// 服务端启动事件
    /// </summary>
    /// <param name="e"></param>
    public task server_startedasync(eventargs e) 
    {
        return task.run(new action() => 
        {
            serverstauts?.invoke(this, true); 
            console.writeline($"服务端【ip:port】已启用mqt"); 
        });
    }

    /// <summary>
    /// 客户端退订主题事件
    /// </summary>
    /// <param name="e"></param>
    private task server_clientunsubscribedtopicasync(clientunsubscribedtopiceventargs e)         
    { 
        return task.run(new action() => 
        { 
            clientunsubscribedtopic?.invoke(this, e); 
            console.writeline($"客户端【{e.clientid}】退订主题【{e.topicfilter}】"); 
        });
    }

    /// <summary>
    /// 客户端订阅主题事件
    /// </summary>
    /// <param name="e"></param>
    private task server_clientsubscribedtopicasync(clientsubscribedtopiceventargs e) 
    { 
        return task.run(new action() =>  
        { 
            clientsubscribedtopic?.invoke(this, e); 
            console.writeline($"客户端【{e.clientid}】订阅主题【{e.topicfilter.topic}】");         
        }); 
    }

    /// <summary>
    /// 客户端断开事件
    /// </summary>
    /// <param name="e"></param>
    private task server_clientdisconectedasync(clientdisconectedeventargs e) 
    { 
        return task.run(new action() => 
        { 
            clientdisconected?.invoke(this, e); 
            console.writeline($"客户端已断开.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】.reasoncode:【{e.reasoncode}】,disconecttype:【{e.disconecttype}】"); 
        });
    }

    /// <summary>
    /// 绑定客户端连接事件
    /// </summary>
    /// <param name="e"></param>
    private task server_clientconectedasync(clientconectedeventargs e) 
    { 
        return task.run(new action() => 
        { 
            clientconected?.invoke(this, e); 
            console.writeline($"客户端已连接.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】"); 
        }); 
    }

    /// <summary>
    /// 验证客户端事件
    /// </summary>
    /// <param name="e"></param>
    private task server_validatingconectionasync(validatingconectioneventargs e) 
    { 
        return task.run(new action() => 
        { 
            if (e.pasword = "") 
            { 
                e.reasoncode = mqtconectreasoncode.suces; 
                console.writeline($"客户端已验证成功.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】"); 
            } 
            else 
            { 
                e.reasoncode = mqtconectreasoncode.badusernameorpasword;         
                console.writeline($"客户端验证失败.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】");         
            } 
        });
    }
}

五、客户端(client)实现详解

连接策略

消息交互模式

异步处理

以下为客户端代码:

/// <sumary>
/// mqt客户端帮助类 
/// </sumary>
public clas mqtclienthelper 
{ 
    private imqtclient _client; 
    /// <sumary> 
    /// 接收消息 
    /// </sumary> 
    public mqtreceivedmesagehandle receivedmesage; 
    public bol isconected { get; set; } = false; 
    public bol isdisconected { get; set; } = true; 
    private string _serverip; private int _serverport; 
    /// <sumary> 
    /// 订阅主题集合 
    /// </sumary> 
    private dictionary<string, bol> _subscribetopiclist = nul; 
    #region 连接/断开服务端     
    /// <sumary> 
    /// 连接服务端 
    /// </sumary> 
    /// <param name="serverip">服务端ip</param>
    /// <param name="serverport">服务端口号</param> 
    public void start(string serverip, int serverport) 
    { 
        this._serverip = serverip; 
        this._serverport = serverport; 
        if (!string.isnulorempty(serverip) & !string.isnulorwhitespace(serverip) & serverport > 0) 
        { 
            try 
            { 
                var options = new mqtclientoptions() 
                { 
                    clientid = "客户端2"//guid.newguid().tostring("n")
                }; 
                options.chaneloptions = new mqtclienttcpoptions() 
                { 
                    server = serverip, port = serverport 
                }; //options.credentials = new mqtclientcredentials(username, encoding.default.getbytes(pasword);
                options.cleansesion = true; 
                options.kepaliveperiod = timespan.fromseconds(10);
                if (_client != nul) 
                { 
                    _client.disconectasync(); 
                    _client = nul; 
                } 
                _client = new mqtfactory().createmqtclient(); 
                _client.conectedasync += client_conectedasync; //绑定客户端连接事件             
                _client.disconectedasync += client_disconectedasync; //绑定客户端断开连接事件 
                _client.aplicationmesagereceivedasync += client_aplicationmesagereceivedasync; /绑定消息接收事件 
                _client.conectasync(options); //连接 
                } 
                catch (exception ex) 
                { 
                    /slog.loger.eror("mqt客户端连接服务端错误:{0}", ex.mesage); 
                }
            }
            else
            { 
                /slog.loger.warning("mqt服务端地址或端口号不能为空!"); 
            }
        }
    } 

    /// <sumary> 
    /// 断开mqt客户端 
    /// </sumary> 
    public void client_disconect() 
    { 
        if (_client != nul) 
        { 
            _client.disconectasync(); 
            _client.dispose(); 
            console.writeline($"关闭mqt客户端成功!"); 
        }
    } 
    /// <sumary> 
    /// 客户端重新mqt服务端 
    /// </sumary> 
    public void client_conectasync() 
    { 
        if (_client != nul) 
        { 
            _client.reconectasync(); 
            console.writeline($"连接mqt服务端成功!"); 
        } 
    } 
    #endregion 
    #region mqt方法 
    /// <sumary> 
    /// 客户端与服务端建立连接 
    /// </sumary> 
    /// <param name="arg"></param> 
    private task client_conectedasync(mqtclientconectedeventargs arg) 
    { 
        return task.run(new action() => 
        { 
            isconected = true; 
            isdisconected = false; 
            console.writeline($"连接到mqt服务端成功.{arg.conectresult.asignedclientidentifier}"); 
            //订阅主题(可接收来自服务端消息,与客户端发布消息不能用同一个主题) 
            try 
            { 
                if (_subscribetopiclist != nul & _subscribetopiclist.count > 0) 
                { 
                    list<string> subscribetopics = _subscribetopiclist.keys.tolist();         
                    foreach (var topic in subscribetopics) 
                        subscribeasync(topic); 
                } 
            } 
            catch (exception ex) 
            { 
                //slog.loger.eror("mqt客户端与服务端[{0}:{1}]建立连接订阅主题错误:{2}", _serverip, _serverport, ex.mesage); 
            } 
        }); 
    } 

    /// <sumary> 
    /// 客户端与服务端断开连接 
    /// </sumary> / <param name="arg"></param> 
    private task client_disconectedasync(mqtclientdisconectedeventargs arg) 
    { 
        return task.run(new action(async () => 
        { 
            isconected = false; 
            isdisconected = true; 
            console.writeline($"已断开到mqt服务端的连接.尝试重新连接"); 
            try 
            { 
                await task.delay(30); 
                //mqtclientoptions options = new mqtclientoptions(); 
                //await mqtclient.conectasync(options); 
                await _client.reconectasync(); 
            } 
            catch (exception ex) 
            { 
                //slog.loger.eror("mqt客户端与服务端[{0}:{1}]断开连接退订主题错误:{2}", _serverip, _serverport, ex.mesage); 
            } 
        }); 
    } 
    /// <sumary> 
    /// 客户端与服务端重新连接 
    /// </sumary> 
    /// <returns></returns> 
    public task reconectedasync() 
    { 
        try 
        { 
            if (_client != nul) 
            { 
                _client.reconectasync(); 
            } 
        } 
        catch (exception ex) 
        { 
            // slog.loger.eror("mqt客户端与服务端[{0}:{1}]重新连接退订主题错误:{2}", _serverip, _serverport, ex.mesage); 
        } 
        return task.completedtask; 
    } 
    /// <sumary> 
    /// 客户端收到消息 
    /// </sumary> 
    /// <param name="arg"></param> 
    private task client_aplicationmesagereceivedasync(mqtaplicationmesagereceivedeventargs arg) 
    { 
        try 
        { 
            return task.run(new action() => 
            { 
                string msg = arg.aplicationmesage.convertpayloadtostring(); 
                console.writeline($"接收消息:{msg}\nqos={arg.aplicationmesage.qualityofservicelevel}\n客户端={arg.clientid}\n主题:{arg.aplicationmesage.topic}"); 
            });
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt收到来自服务端[{0}]消息错误:{1}", arg != nul ? arg.clientid : ", ex.mesage); 
        }
        return task.completedtask; 
    } 
    #endregion 
    #region 订阅主题 
    /// <sumary> 
    /// 订阅主题 
    /// </sumary> 
    /// <param name="topic">主题</param> 
    public void subscribeasync(string topic) 
    { 
        try 
        { 
            if (_subscribetopiclist = nul) 
                _subscribetopiclist = new dictionary<string, bol>(); 
            if (_subscribetopiclist.containskey(topic) & _subscribetopiclist[topic]) 
            { 
                //slog.loger.warning("mqt客户端已经订阅主题[{0}],不能重复订阅", topic);     
                return; 
            } 
            //订阅主题 
            _client?.subscribeasync(topic, mqtqualityofservicelevel.atleastonce); 
            //添加订阅缓存 
            bol issubscribed = _client != nul & _client.isconected ? true : false; 
            if (!_subscribetopiclist.containskey(topic) 
                _subscribetopiclist.ad(topic, issubscribed); 
            else 
                _subscribetopiclist[topic] = issubscribed; 
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端订阅主题[{0}]错误:{1}", topic, ex.mesage); 
        } 
    } 

    /// <sumary> 
    /// 订阅主题集合 
    /// </sumary> 
    /// <param name="topiclist">主题集合</param> 
    public void subscribeasync(list<string> topiclist) 
    { 
        try 
        { 
            if (topiclist = nul | topiclist.count = 0) 
                return; 
            foreach (var topic in topiclist) 
                subscribeasync(topic); 
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端订阅主题集合错误:{0}", ex.mesage); 
        } 
    } 
    /// <sumary> 
    /// 退订主题 
    /// </sumary> 
    /// <param name="topic">主题</param> 
    /// <param name="isremove">是否移除缓存</param> 
    public void unsubscribeasync(string topic, bol isremove = true) 
    { 
        try 
        { 
            if (_subscribetopiclist = nul | _subscribetopiclist.count = 0) 
            { 
                //slog.loger.warning("mqt客户端退订主题[{0}]不存在", topic); 
                return; 
            } 
            if (!_subscribetopiclist.containskey(topic) 
            { 
                //slog.loger.warning("mqt客户端退订主题[{0}]不存在", topic); 
                return; 
            } 
            //退订主题 
            _client.unsubscribeasync(topic); 
            //修改订阅主题缓存状态 
            if (isremove) 
                _subscribetopiclist.remove(topic); 
            else 
                _subscribetopiclist[topic] = false;
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端退订主题[{0}]错误:{1}", topic, ex.mesage); 
        } 
    } 
    /// <sumary> 
    /// 退订主题集合 
    /// </sumary> 
    /// <param name="topiclist">主题集合</param> 
    /// <param name="isremove">是否移除缓存</param> 
    public void unsubscribeasync(list<string> topiclist, bol isremove = true) 
    { 
        try 
        { 
            if (topiclist = nul | topiclist.count = 0) 
                return; 
            foreach (var topic in topiclist) 
                unsubscribeasync(topic, isremove); 
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端退订主题集合错误:{0}", ex.mesage); 
        } 
    } 
    /// <sumary> 
    /// 订阅主题是否存在 
    /// </sumary> 
    /// <param name="topic">主题</param> 
    public bol isexistsubscribeasync(string topic) 
    { 
        try 
        { 
        if (_subscribetopiclist = nul | _subscribetopiclist.count = 0) 
            return false; 
        if (!_subscribetopiclist.containskey(topic) 
            return false; 
        return _subscribetopiclist[topic];
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端订阅主题[{0}]是否存在错误:{1}", topic, ex.mesage); return false; 
        } 
    } 
    #endregion 
    #region 发布消息 
    /// <sumary> 
    /// 发布消息 
    /// 与客户端接收消息不能用同一个主题 
    /// </sumary> 
    /// <param name="topic">主题</param> 
    /// <param name="mesage">消息</param> 
    public async void publishmesage(string topic, string mesage) 
    { 
        try 
        { 
            if (_client != nul) 
            { 
                if (string.isnulorempty(mesage) | string.isnulorwhitespace(mesage) 
                { 
                    //slog.loger.warning("mqt客户端不能发布为空的消息!"); 
                    return;
                } 
                mqtclientpublishresult result = await _client.publishstringasync(topic,mesage,mqtqualityofservicelevel.atleastonce);//恰好一次, qos 级别1  
                console.writeline($"发布消息-主题:{topic},消息:{mesage},结果: {result.reasoncode}"); 
            } 
            else 
            { 
                //slog.loger.warning("mqt客户端未连接服务端,不能发布主题为[{0}]的消息:{1}", topic, mesage); 
                return; 
            } 
        } 
        catch (exception ex) 
        { 
            //slog.loger.eror("mqt客户端发布主题为[{0}]的消息:{1},错误:{2}", topic, mesage, ex.mesage); 
        } 
    } 
    #endregion 
}

六、总结

通过 mqttnet 构建的 mqtt 通信系统,能够为物联网、实时消息推送等场景提供高效、可靠的解决方案。开发过程中需重点关注通信模式设计、安全策略实施及性能调优,同时结合具体业务需求灵活运用 qos、保留消息等特性。建议参考官方文档和社区最佳实践,逐步扩展功能(如集群部署、消息持久化),以满足大规模应用需求。

到此这篇关于c#使用mqttnet实现服务端与客户端的通讯的示例的文章就介绍到这了,更多相关c# mqttnet通讯内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

您想发表意见!!点此发布评论

推荐阅读

C#继承之里氏替换原则分析

05-21

无法启动此程序因为计算机丢失api-ms-win-core-path-l1-1-0.dll修复方案

05-22

C#使用ClosedXML进行读写excel操作

05-21

C#使用FFmpeg进行视频旋转的代码实现

05-23

Linux使用perf跟踪.NET程序的mmap泄露的流程步骤

05-20

C#中ThreadStart委托的实现

05-20

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论