ChannelBuffer.cs
14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
#if ENABLE_UNET
using System;
using System.Collections.Generic;
#pragma warning disable 618
namespace UnityEngine.Networking
{
class ChannelBuffer : IDisposable
{
NetworkConnection m_Connection;
ChannelPacket m_CurrentPacket;
float m_LastFlushTime;
byte m_ChannelId;
int m_MaxPacketSize;
bool m_IsReliable;
bool m_AllowFragmentation;
bool m_IsBroken;
int m_MaxPendingPacketCount;
const int k_MaxFreePacketCount = 512; // this is for all connections. maybe make this configurable
public const int MaxPendingPacketCount = 16; // this is per connection. each is around 1400 bytes (MTU)
public const int MaxBufferedPackets = 512;
Queue<ChannelPacket> m_PendingPackets;
static List<ChannelPacket> s_FreePackets;
static internal int pendingPacketCount; // this is across all connections. only used for profiler metrics.
// config
public float maxDelay = 0.01f;
// stats
float m_LastBufferedMessageCountTimer = Time.realtimeSinceStartup;
public int numMsgsOut { get; private set; }
public int numBufferedMsgsOut { get; private set; }
public int numBytesOut { get; private set; }
public int numMsgsIn { get; private set; }
public int numBytesIn { get; private set; }
public int numBufferedPerSecond { get; private set; }
public int lastBufferedPerSecond { get; private set; }
static NetworkWriter s_SendWriter = new NetworkWriter();
static NetworkWriter s_FragmentWriter = new NetworkWriter();
// We need to reserve some space for header information, this will be taken off the total channel buffer size
const int k_PacketHeaderReserveSize = 100;
public ChannelBuffer(NetworkConnection conn, int bufferSize, byte cid, bool isReliable, bool isSequenced)
{
m_Connection = conn;
m_MaxPacketSize = bufferSize - k_PacketHeaderReserveSize;
m_CurrentPacket = new ChannelPacket(m_MaxPacketSize, isReliable);
m_ChannelId = cid;
m_MaxPendingPacketCount = MaxPendingPacketCount;
m_IsReliable = isReliable;
m_AllowFragmentation = (isReliable && isSequenced);
if (isReliable)
{
m_PendingPackets = new Queue<ChannelPacket>();
if (s_FreePackets == null)
{
s_FreePackets = new List<ChannelPacket>();
}
}
}
// Track whether Dispose has been called.
bool m_Disposed;
public void Dispose()
{
Dispose(true);
// Take yourself off the Finalization queue
// to prevent finalization code for this object
// from executing a second time.
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
// Check to see if Dispose has already been called.
if (!m_Disposed)
{
if (disposing)
{
if (m_PendingPackets != null)
{
while (m_PendingPackets.Count > 0)
{
pendingPacketCount -= 1;
ChannelPacket packet = m_PendingPackets.Dequeue();
if (s_FreePackets.Count < k_MaxFreePacketCount)
{
s_FreePackets.Add(packet);
}
}
m_PendingPackets.Clear();
}
}
}
m_Disposed = true;
}
public bool SetOption(ChannelOption option, int value)
{
switch (option)
{
case ChannelOption.MaxPendingBuffers:
{
if (!m_IsReliable)
{
// not an error
//if (LogFilter.logError) { Debug.LogError("Cannot set MaxPendingBuffers on unreliable channel " + m_ChannelId); }
return false;
}
if (value < 0 || value >= MaxBufferedPackets)
{
if (LogFilter.logError) { Debug.LogError("Invalid MaxPendingBuffers for channel " + m_ChannelId + ". Must be greater than zero and less than " + k_MaxFreePacketCount); }
return false;
}
m_MaxPendingPacketCount = value;
return true;
}
case ChannelOption.AllowFragmentation:
{
m_AllowFragmentation = (value != 0);
return true;
}
case ChannelOption.MaxPacketSize:
{
if (!m_CurrentPacket.IsEmpty() || m_PendingPackets.Count > 0)
{
if (LogFilter.logError) { Debug.LogError("Cannot set MaxPacketSize after sending data."); }
return false;
}
if (value <= 0)
{
if (LogFilter.logError) { Debug.LogError("Cannot set MaxPacketSize less than one."); }
return false;
}
if (value > m_MaxPacketSize)
{
if (LogFilter.logError) { Debug.LogError("Cannot set MaxPacketSize to greater than the existing maximum (" + m_MaxPacketSize + ")."); }
return false;
}
// rebuild the packet with the new size. the packets doesn't store a size variable, just has the size of the internal buffer
m_CurrentPacket = new ChannelPacket(value, m_IsReliable);
m_MaxPacketSize = value;
return true;
}
}
return false;
}
public void CheckInternalBuffer()
{
if (Time.realtimeSinceStartup - m_LastFlushTime > maxDelay && !m_CurrentPacket.IsEmpty())
{
SendInternalBuffer();
m_LastFlushTime = Time.realtimeSinceStartup;
}
if (Time.realtimeSinceStartup - m_LastBufferedMessageCountTimer > 1.0f)
{
lastBufferedPerSecond = numBufferedPerSecond;
numBufferedPerSecond = 0;
m_LastBufferedMessageCountTimer = Time.realtimeSinceStartup;
}
}
public bool SendWriter(NetworkWriter writer)
{
return SendBytes(writer.AsArraySegment().Array, writer.AsArraySegment().Count);
}
public bool Send(short msgType, MessageBase msg)
{
// build the stream
s_SendWriter.StartMessage(msgType);
msg.Serialize(s_SendWriter);
s_SendWriter.FinishMessage();
numMsgsOut += 1;
return SendWriter(s_SendWriter);
}
internal NetBuffer fragmentBuffer = new NetBuffer();
bool readingFragment = false;
internal bool HandleFragment(NetworkReader reader)
{
int state = reader.ReadByte();
if (state == 0)
{
if (readingFragment == false)
{
fragmentBuffer.SeekZero();
readingFragment = true;
}
byte[] data = reader.ReadBytesAndSize();
fragmentBuffer.WriteBytes(data, (ushort)data.Length);
return false;
}
else
{
readingFragment = false;
return true;
}
}
internal bool SendFragmentBytes(byte[] bytes, int bytesToSend)
{
const int fragmentHeaderSize = 32;
int pos = 0;
while (bytesToSend > 0)
{
int diff = Math.Min(bytesToSend, m_MaxPacketSize - fragmentHeaderSize);
byte[] buffer = new byte[diff];
Array.Copy(bytes, pos, buffer, 0, diff);
s_FragmentWriter.StartMessage(MsgType.Fragment);
s_FragmentWriter.Write((byte)0);
s_FragmentWriter.WriteBytesFull(buffer);
s_FragmentWriter.FinishMessage();
SendWriter(s_FragmentWriter);
pos += diff;
bytesToSend -= diff;
}
// send finish
s_FragmentWriter.StartMessage(MsgType.Fragment);
s_FragmentWriter.Write((byte)1);
s_FragmentWriter.FinishMessage();
SendWriter(s_FragmentWriter);
return true;
}
internal bool SendBytes(byte[] bytes, int bytesToSend)
{
#if UNITY_EDITOR
Profiler.IncrementStatOutgoing(MsgType.HLAPIMsg);
#endif
if (bytesToSend >= UInt16.MaxValue)
{
if (LogFilter.logError) { Debug.LogError("ChannelBuffer:SendBytes cannot send packet larger than " + UInt16.MaxValue + " bytes"); }
return false;
}
if (bytesToSend <= 0)
{
// zero length packets getting into the packet queues are bad.
if (LogFilter.logError) { Debug.LogError("ChannelBuffer:SendBytes cannot send zero bytes"); }
return false;
}
if (bytesToSend > m_MaxPacketSize)
{
if (m_AllowFragmentation)
{
return SendFragmentBytes(bytes, bytesToSend);
}
else
{
// cannot do HLAPI fragmentation on this channel
if (LogFilter.logError) { Debug.LogError("Failed to send big message of " + bytesToSend + " bytes. The maximum is " + m_MaxPacketSize + " bytes on channel:" + m_ChannelId); }
return false;
}
}
if (!m_CurrentPacket.HasSpace(bytesToSend))
{
if (m_IsReliable)
{
if (m_PendingPackets.Count == 0)
{
// nothing in the pending queue yet, just flush and write
if (!m_CurrentPacket.SendToTransport(m_Connection, m_ChannelId))
{
QueuePacket();
}
m_CurrentPacket.Write(bytes, bytesToSend);
return true;
}
if (m_PendingPackets.Count >= m_MaxPendingPacketCount)
{
if (!m_IsBroken)
{
// only log this once, or it will spam the log constantly
if (LogFilter.logError) { Debug.LogError("ChannelBuffer buffer limit of " + m_PendingPackets.Count + " packets reached."); }
}
m_IsBroken = true;
return false;
}
// calling SendToTransport here would write out-of-order data to the stream. just queue
QueuePacket();
m_CurrentPacket.Write(bytes, bytesToSend);
return true;
}
if (!m_CurrentPacket.SendToTransport(m_Connection, m_ChannelId))
{
if (LogFilter.logError) { Debug.Log("ChannelBuffer SendBytes no space on unreliable channel " + m_ChannelId); }
return false;
}
m_CurrentPacket.Write(bytes, bytesToSend);
return true;
}
m_CurrentPacket.Write(bytes, bytesToSend);
if (maxDelay == 0.0f)
{
return SendInternalBuffer();
}
return true;
}
void QueuePacket()
{
pendingPacketCount += 1;
m_PendingPackets.Enqueue(m_CurrentPacket);
m_CurrentPacket = AllocPacket();
}
ChannelPacket AllocPacket()
{
#if UNITY_EDITOR
Profiler.SetStatOutgoing(MsgType.HLAPIPending, pendingPacketCount);
#endif
if (s_FreePackets.Count == 0)
{
return new ChannelPacket(m_MaxPacketSize, m_IsReliable);
}
var packet = s_FreePackets[s_FreePackets.Count - 1];
s_FreePackets.RemoveAt(s_FreePackets.Count - 1);
packet.Reset();
return packet;
}
static void FreePacket(ChannelPacket packet)
{
#if UNITY_EDITOR
Profiler.SetStatOutgoing(MsgType.HLAPIPending, pendingPacketCount);
#endif
if (s_FreePackets.Count >= k_MaxFreePacketCount)
{
// just discard this packet, already tracking too many free packets
return;
}
s_FreePackets.Add(packet);
}
public bool SendInternalBuffer()
{
#if UNITY_EDITOR
Profiler.IncrementStatOutgoing(MsgType.LLAPIMsg);
#endif
if (m_IsReliable && m_PendingPackets.Count > 0)
{
// send until transport can take no more
while (m_PendingPackets.Count > 0)
{
var packet = m_PendingPackets.Dequeue();
if (!packet.SendToTransport(m_Connection, m_ChannelId))
{
m_PendingPackets.Enqueue(packet);
break;
}
pendingPacketCount -= 1;
FreePacket(packet);
if (m_IsBroken && m_PendingPackets.Count < (m_MaxPendingPacketCount / 2))
{
if (LogFilter.logWarn) { Debug.LogWarning("ChannelBuffer recovered from overflow but data was lost."); }
m_IsBroken = false;
}
}
return true;
}
return m_CurrentPacket.SendToTransport(m_Connection, m_ChannelId);
}
}
}
#pragma warning restore 618
#endif //ENABLE_UNET