Skip to content

Commit c22b603

Browse files
committed
Accept multiple SubscriptionIdentifiers for PUBLISH packets.
1 parent 27476a9 commit c22b603

9 files changed

+87
-50
lines changed

Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ public MqttPublishPacket CreatePublishPacket(MqttApplicationMessage applicationM
3333
MessageExpiryInterval = applicationMessage.MessageExpiryInterval,
3434
PayloadFormatIndicator = applicationMessage.PayloadFormatIndicator,
3535
ResponseTopic = applicationMessage.ResponseTopic,
36-
SubscriptionIdentifier = applicationMessage.SubscriptionIdentifier,
36+
SubscriptionIdentifiers = applicationMessage.SubscriptionIdentifiers,
3737
TopicAlias = applicationMessage.TopicAlias
3838
}
3939
};
4040

4141
if (applicationMessage.UserProperties != null)
4242
{
43+
packet.Properties.UserProperties = new List<MqttUserProperty>();
4344
packet.Properties.UserProperties.AddRange(applicationMessage.UserProperties);
4445
}
4546

@@ -67,7 +68,7 @@ public MqttApplicationMessage CreateApplicationMessage(MqttPublishPacket publish
6768
ContentType = publishPacket.Properties?.ContentType,
6869
CorrelationData = publishPacket.Properties?.CorrelationData,
6970
MessageExpiryInterval = publishPacket.Properties?.MessageExpiryInterval,
70-
SubscriptionIdentifier = publishPacket.Properties?.SubscriptionIdentifier,
71+
SubscriptionIdentifiers = publishPacket.Properties?.SubscriptionIdentifiers,
7172
TopicAlias = publishPacket.Properties?.TopicAlias,
7273
PayloadFormatIndicator = publishPacket.Properties?.PayloadFormatIndicator,
7374
UserProperties = publishPacket.Properties?.UserProperties ?? new List<MqttUserProperty>()

Source/MQTTnet/Formatter/V5/MqttV500PacketDecoder.cs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using MQTTnet.Adapter;
45
using MQTTnet.Exceptions;
@@ -117,7 +118,7 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
117118
}
118119
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
119120
{
120-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
121+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
121122
}
122123
else
123124
{
@@ -155,7 +156,12 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
155156
}
156157
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
157158
{
158-
packet.WillMessage.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
159+
if (packet.WillMessage.SubscriptionIdentifiers == null)
160+
{
161+
packet.WillMessage.SubscriptionIdentifiers = new List<uint>();
162+
}
163+
164+
packet.WillMessage.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
159165
}
160166
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
161167
{
@@ -168,7 +174,12 @@ private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
168174
}
169175
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
170176
{
171-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
177+
if (packet.WillMessage.UserProperties == null)
178+
{
179+
packet.WillMessage.UserProperties = new List<MqttUserProperty>();
180+
}
181+
182+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
172183
}
173184
else
174185
{
@@ -271,7 +282,7 @@ private static MqttBasePacket DecodeConnAckPacket(IMqttPacketBodyReader body)
271282
}
272283
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
273284
{
274-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
285+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
275286
}
276287
else
277288
{
@@ -309,7 +320,7 @@ private static MqttBasePacket DecodeDisconnectPacket(IMqttPacketBodyReader body)
309320
}
310321
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
311322
{
312-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
323+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
313324
}
314325
else
315326
{
@@ -339,7 +350,7 @@ private static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
339350
}
340351
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
341352
{
342-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
353+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
343354
}
344355
else
345356
{
@@ -389,7 +400,7 @@ private static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
389400
}
390401
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
391402
{
392-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
403+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
393404
}
394405
else
395406
{
@@ -421,7 +432,7 @@ private static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body
421432
{
422433
if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
423434
{
424-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
435+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
425436
}
426437
else
427438
{
@@ -456,7 +467,7 @@ private static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
456467
}
457468
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
458469
{
459-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
470+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
460471
}
461472
else
462473
{
@@ -530,15 +541,25 @@ private static MqttBasePacket DecodePublishPacket(byte header, IMqttPacketBodyRe
530541
}
531542
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
532543
{
533-
packet.Properties.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
544+
if (packet.Properties.SubscriptionIdentifiers == null)
545+
{
546+
packet.Properties.SubscriptionIdentifiers = new List<uint>();
547+
}
548+
549+
packet.Properties.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
534550
}
535551
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
536552
{
537553
packet.Properties.ContentType = propertiesReader.ReadContentType();
538554
}
539555
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
540556
{
541-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
557+
if (packet.Properties.UserProperties == null)
558+
{
559+
packet.Properties.UserProperties = new List<MqttUserProperty>();
560+
}
561+
562+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
542563
}
543564
else
544565
{
@@ -581,7 +602,7 @@ private static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
581602
}
582603
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
583604
{
584-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
605+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
585606
}
586607
else
587608
{
@@ -619,7 +640,7 @@ private static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
619640
}
620641
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
621642
{
622-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
643+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
623644
}
624645
else
625646
{
@@ -657,7 +678,7 @@ private static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
657678
}
658679
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
659680
{
660-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
681+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
661682
}
662683
else
663684
{
@@ -695,7 +716,7 @@ private static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
695716
}
696717
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
697718
{
698-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
719+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
699720
}
700721
else
701722
{
@@ -740,7 +761,7 @@ private static MqttBasePacket DecodeAuthPacket(IMqttPacketBodyReader body)
740761
}
741762
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
742763
{
743-
propertiesReader.FillUserProperties(packet.Properties.UserProperties);
764+
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
744765
}
745766
else
746767
{

Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private static byte EncodeConnectPacket(MqttConnectPacket packet, IMqttPacketWri
150150
willPropertiesWriter.WriteTopicAlias(packet.WillMessage.TopicAlias);
151151
willPropertiesWriter.WriteResponseTopic(packet.WillMessage.ResponseTopic);
152152
willPropertiesWriter.WriteCorrelationData(packet.WillMessage.CorrelationData);
153-
willPropertiesWriter.WriteSubscriptionIdentifier(packet.WillMessage.SubscriptionIdentifier);
153+
willPropertiesWriter.WriteSubscriptionIdentifiers(packet.WillMessage.SubscriptionIdentifiers);
154154
willPropertiesWriter.WriteContentType(packet.WillMessage.ContentType);
155155
willPropertiesWriter.WriteUserProperties(packet.WillMessage.UserProperties);
156156

@@ -258,7 +258,7 @@ private static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWri
258258
propertiesWriter.WriteTopicAlias(packet.Properties.TopicAlias);
259259
propertiesWriter.WriteResponseTopic(packet.Properties.ResponseTopic);
260260
propertiesWriter.WriteCorrelationData(packet.Properties.CorrelationData);
261-
propertiesWriter.WriteSubscriptionIdentifier(packet.Properties.SubscriptionIdentifier);
261+
propertiesWriter.WriteSubscriptionIdentifiers(packet.Properties.SubscriptionIdentifiers);
262262
propertiesWriter.WriteContentType(packet.Properties.ContentType);
263263
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
264264
}

Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public bool MoveNext()
4242
return true;
4343
}
4444

45-
public void FillUserProperties(List<MqttUserProperty> userProperties)
45+
public void AddUserPropertyTo(List<MqttUserProperty> userProperties)
4646
{
4747
if (userProperties == null) throw new ArgumentNullException(nameof(userProperties));
4848

@@ -67,17 +67,17 @@ public byte[] ReadAuthenticationData()
6767
return _body.ReadWithLengthPrefix();
6868
}
6969

70-
public bool? ReadRetainAvailable()
70+
public bool ReadRetainAvailable()
7171
{
7272
return _body.ReadBoolean();
7373
}
7474

75-
public uint? ReadSessionExpiryInterval()
75+
public uint ReadSessionExpiryInterval()
7676
{
7777
return _body.ReadFourByteInteger();
7878
}
7979

80-
public ushort? ReadReceiveMaximum()
80+
public ushort ReadReceiveMaximum()
8181
{
8282
return _body.ReadTwoByteInteger();
8383
}
@@ -92,17 +92,17 @@ public string ReadServerReference()
9292
return _body.ReadStringWithLengthPrefix();
9393
}
9494

95-
public ushort? ReadTopicAliasMaximum()
95+
public ushort ReadTopicAliasMaximum()
9696
{
9797
return _body.ReadTwoByteInteger();
9898
}
9999

100-
public uint? ReadMaximumPacketSize()
100+
public uint ReadMaximumPacketSize()
101101
{
102102
return _body.ReadFourByteInteger();
103103
}
104104

105-
public ushort? ReadServerKeepAlive()
105+
public ushort ReadServerKeepAlive()
106106
{
107107
return _body.ReadTwoByteInteger();
108108
}
@@ -112,22 +112,22 @@ public string ReadResponseInformation()
112112
return _body.ReadStringWithLengthPrefix();
113113
}
114114

115-
public bool? ReadSharedSubscriptionAvailable()
115+
public bool ReadSharedSubscriptionAvailable()
116116
{
117117
return _body.ReadBoolean();
118118
}
119119

120-
public bool? ReadSubscriptionIdentifiersAvailable()
120+
public bool ReadSubscriptionIdentifiersAvailable()
121121
{
122122
return _body.ReadBoolean();
123123
}
124124

125-
public bool? ReadWildcardSubscriptionAvailable()
125+
public bool ReadWildcardSubscriptionAvailable()
126126
{
127127
return _body.ReadBoolean();
128128
}
129129

130-
public uint? ReadSubscriptionIdentifier()
130+
public uint ReadSubscriptionIdentifier()
131131
{
132132
return _body.ReadVariableLengthInteger();
133133
}
@@ -137,12 +137,12 @@ public string ReadResponseInformation()
137137
return (MqttPayloadFormatIndicator)_body.ReadByte();
138138
}
139139

140-
public uint? ReadMessageExpiryInterval()
140+
public uint ReadMessageExpiryInterval()
141141
{
142142
return _body.ReadFourByteInteger();
143143
}
144144

145-
public ushort? ReadTopicAlias()
145+
public ushort ReadTopicAlias()
146146
{
147147
return _body.ReadTwoByteInteger();
148148
}
@@ -162,17 +162,17 @@ public string ReadContentType()
162162
return _body.ReadStringWithLengthPrefix();
163163
}
164164

165-
public uint? ReadWillDelayInterval()
165+
public uint ReadWillDelayInterval()
166166
{
167167
return _body.ReadFourByteInteger();
168168
}
169169

170-
public bool? RequestResponseInformation()
170+
public bool RequestResponseInformation()
171171
{
172172
return _body.ReadBoolean();
173173
}
174174

175-
public bool? RequestProblemInformation()
175+
public bool RequestProblemInformation()
176176
{
177177
return _body.ReadBoolean();
178178
}

Source/MQTTnet/Formatter/V5/MqttV500PropertiesWriter.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ public void WriteSubscriptionIdentifier(uint? value)
8080
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, value);
8181
}
8282

83+
public void WriteSubscriptionIdentifiers(IEnumerable<uint> value)
84+
{
85+
if (value == null)
86+
{
87+
return;
88+
}
89+
90+
foreach (var subscriptionIdentifier in value)
91+
{
92+
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, subscriptionIdentifier);
93+
}
94+
}
95+
8396
public void WriteTopicAlias(ushort? value)
8497
{
8598
Write(MqttPropertyId.TopicAlias, value);

Source/MQTTnet/MqttApplicationMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ public class MqttApplicationMessage
2828

2929
public byte[] CorrelationData { get; set; }
3030

31-
public uint? SubscriptionIdentifier { get; set; }
31+
public List<uint> SubscriptionIdentifiers { get; set; }
3232
}
3333
}

0 commit comments

Comments
 (0)