下载地址:https://github.com/chkr1011/MQTTnet
一. 服务端
1. 创建配置参数
可以使用 `var options = new MqttServerOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttServerOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
| 函数名 | 功能说明 |
| Build | 构建配置参数 |
| WithApplicationMessageInterceptor | 允许处理来自客户端的所有已发布消息 |
| WithClientId | 服务端发布消息时使用的ClientId |
| WithConnectionBacklog | 设置要保留的连接数 |
| WithConnectionValidator | 验证连接 |
| WithDefaultCommunicationTimeout | 设置默认的通信超时 |
| WithDefaultEndpoint | 使用默认端点 |
| WithDefaultEndpointBoundIPAddress | 使用默认端点IPv4地址 |
| WithDefaultEndpointBoundIPV6Address | 使用默认端点IPv6地址 |
| WithDefaultEndpointPort | 使用默认端点端口 |
| WithEncryptedEndpoint | 使用加密的端点 |
| WithEncryptedEndpointBoundIPAddress | 使用加密的端点IPv4地址 |
| WithEncryptedEndpointBoundIPV6Address | 使用加密的端点IPv6地址 |
| WithEncryptedEndpointPort | 使用加密的端点端口 |
| WithEncryptionCertificate | 使用证书进行SSL连接 |
| WithEncryptionSslProtocol | 使用SSL协议级别 |
| WithMaxPendingMessagesPerClient | 每个客户端允许最多未决消息 |
| WithPersistentSessions | 保持会话 |
| WithStorage | 使用存储 |
| WithSubscriptionInterceptor | 允许处理来自客户端的所有订阅 |
| WithoutDefaultEndpoint | 禁用默认端点 |
| WithoutEncryptedEndpoint | 禁用默认(SSL)端点 |
验证账号密码
- options.WithConnectionValidator(c =>
- {
- if (c.Username != "seven")
- {
- c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- }
- });
2. 启动服务端
- var server = new MqttFactory().CreateMqttServer();
- server.StartAsync(options.Build());
服务启动事件
- server.StartedHandler = new MqttServerStartedHandlerDelegate(Started);
- static async Task Started(EventArgs e)
- {
- Console.WriteLine("Started");
- }
服务关闭事件
- server.StoppedHandler = new MqttServerStoppedHandlerDelegate(Stopped);
- static async Task Stopped(EventArgs e)
- {
- Console.WriteLine("Stopped");
- }
客户端连接事件
- server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(Connected);
- server.UseClientConnectedHandler(c => Connected(c));
- static async Task Connected(MqttServerClientConnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} connected");
- }
客户端断开事件
- server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(Disconnected);
- server.UseClientDisconnectedHandler(c => Disconnected(c));
- static async Task Disconnected(MqttServerClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} disconnected");
- }
客户端订阅Topic
- server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(Subscribed);
- static async Task Subscribed(MqttServerClientSubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} subscribed {e.TopicFilter.Topic}");
- }
客户端取消订阅Topic
- server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(c => Unsubscribed(c));
- static async Task Unsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} unsubscribed {e.TopicFilter}");
- }
消息接收
- server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceived);
- server.UseApplicationMessageReceivedHandler(c => MessageReceived(c));
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} get {e.ApplicationMessage.Topic}");
- }
3. 操作
发布消息
- var message = new MqttApplicationMessage()
- {
- Topic = "testTopic",
- Payload = Encoding.UTF8.GetBytes("hello seven")
- };
- server.PublishAsync(message);
查询客户端状态
- var list = server.GetClientStatusAsync().Result;
获取会话信息
- var sessions = server.GetSessionStatusAsync().Result;
查询Retain的消息
- var messages = server.GetRetainedApplicationMessagesAsync().Result;
清空Retain消息
- server.ClearRetainedApplicationMessagesAsync();
停止服务
- server.StopAsync();
4. 其他
自带的日志跟踪
- var logger = new MqttNetLogger();
- logger.LogMessagePublished += LogMessagePublished;
- private static void LogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
- {
- Console.WriteLine(e.LogMessage);
- Console.WriteLine(e.TraceMessage);
- }
二. 客户端
1. 创建配置参数
可以使用 `var options = new MqttClientOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttClientOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
| 函数名 | 功能说明 |
| Build | 构建配置参数 |
| WithAuthentication | 允许使用不同的身份验证模式 |
| WithCleanSession | 将客户端与MQTT干净会话支持一起使用 |
| WithClientId | 设置客户端ID |
| WithCommunicationTimeout | 设置通信超时 |
| WithCredentials | 设置登录凭证 |
| WithExtendedAuthenticationExchangeHandler | 以自定义方式处理身份验证 |
| WithKeepAlivePeriod | 设置保持有效期 |
| WithKeepAliveSendInterval | 设置保活的发送间隔 |
| WithMaximumPacketSize | 设置最大数据包大小 |
| WithNoKeepAlive | 不要使用保持活动状态 |
| WithProtocolVersion | 设置MQTT协议版本 |
| WithProxy | 设置代理 |
| WithTls | 客户端使用SSL/TLS |
| WithTopicAliasMaximum | 允许最大数量的主题别名 |
| WithReceiveMaximum | 允许最大数量的已接收数据包 |
| WithRequestProblemInformation | 显示请求问题信息 |
| WithRequestResponseInformation | 显示请求响应问题信息 |
| WithSessionExpiryInterval | 一段时间后终止会话 |
| WithTcpServer | 告诉客户端(通过TCP)连接到哪个MQTT代理。 |
| WithWebSocketServer | 告诉客户端(通过WebSocket)连接到哪个MQTT代理 |
| WithWillMessage | 告诉客户端最后一条消息将被发送。 |
| WithWillDelayInterval | 告诉客户端最后一个消息得延迟间隔 |
2. 连接服务端
- var client = new MqttFactory().CreateMqttClient();
- client.ConnectAsync(options.Build());
客户端连接完成事件
- client.UseConnectedHandler(e => Connected(e));
- static async Task Connected(MqttClientConnectedEventArgs e)
- {
- Console.WriteLine($"Connected");
- }
客户端断开事件
- client.UseDisconnectedHandler(e => Disconnected(e));
- static async Task Disconnected(MqttClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"Disconnected");
- }
订阅主题(必须在成功连接以后才生效)
- client.UseApplicationMessageReceivedHandler(e => MessageReceived(e));
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId}");
- }
发布消息( 必须在成功连接以后才生效 )
- var message = new MqttApplicationMessageBuilder()
- .WithTopic("myTopic")
- .WithPayload("seven365.cn")
- .WithExactlyOnceQoS()
- .WithRetainFlag()
- .Build();
- client.PublishAsync(message, CancellationToken.None);
3. 操作
客户端重连
- client.ReconnectAsync();
客户端断开
- client.DisconnectAsync