Skip to content

Commit 710a4cc

Browse files
committed
Add MultipartContentManager to transmit larger byte arrays
1 parent e1072ec commit 710a4cc

3 files changed

+295
-0
lines changed

Multipart/MultipartContentManager.cs

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
using Satchel;
2+
using Steamworks;
3+
using System;
4+
using System.Collections;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
using System.Timers;
10+
11+
namespace HkmpPouch.Multipart
12+
{
13+
internal class OngoingRequest
14+
{
15+
public string ContentId;
16+
public ushort PartNumber;
17+
public DateTime LastRequest;
18+
}
19+
/// <summary>
20+
/// Allows handling transmission of arbitrary byte[] in parts
21+
/// </summary>
22+
public class MultipartContentManager
23+
{
24+
25+
private static ushort PartSize = 500;
26+
private static double MaxTimeout = 60;
27+
28+
private Dictionary<string, byte[]> _content = new();
29+
private Dictionary<string, Dictionary<ushort, byte[]>> _partialContent = new();
30+
private Dictionary<string, OngoingRequest> _ongoingRequests = new();
31+
private Timer EventTimer;
32+
33+
private OnAble pipe;
34+
35+
/// <summary>
36+
/// Action that must actually send the content request to the correct client/server
37+
/// </summary>
38+
public Action<RequestMultipartContent> SendContentRequest;
39+
40+
/// <summary>
41+
/// Action that must handle sending the requested content from the sending side
42+
/// </summary>
43+
public Action<RequestedMultipartContent> ContentRequestHandler;
44+
45+
/// <summary>
46+
/// Action that is called with the requested data on the receiver side
47+
/// </summary>
48+
public Action<RequestedMultipartContent> ContentReceivedHandler;
49+
50+
/// <summary>
51+
/// ctor
52+
/// </summary>
53+
/// <param name="pipe"></param>
54+
public MultipartContentManager(OnAble pipe)
55+
{
56+
this.pipe = pipe;
57+
pipe.On(RequestedMultipartContentFactory.Instance).Do<RequestedMultipartContent>(ReceivedContent);
58+
pipe.On(RequestMultipartContentFactory.Instance).Do<RequestMultipartContent>(ContentRequested);
59+
EventTimer = new Timer(1000);
60+
EventTimer.Elapsed += EventTimer_Elapsed;
61+
}
62+
63+
private void EventTimer_Elapsed(object sender, ElapsedEventArgs e)
64+
{
65+
foreach (var request in _ongoingRequests) {
66+
if((DateTime.Now - request.Value.LastRequest).TotalSeconds < MaxTimeout)
67+
{
68+
_ongoingRequests[request.Value.ContentId] = new() { ContentId = request.Value.ContentId, PartNumber = request.Value.PartNumber, LastRequest = DateTime.Now };
69+
SendContentRequest(new RequestMultipartContent(request.Value.ContentId) { PartNumber = _ongoingRequests[request.Value.ContentId].PartNumber });
70+
}
71+
}
72+
}
73+
74+
private void ContentRequested(RequestMultipartContent request)
75+
{
76+
if(ContentRequestHandler != null)
77+
{
78+
byte[] currentPartContent;
79+
ushort totalParts = 1;
80+
if (_partialContent.ContainsKey(request.ContentId) && _partialContent[request.ContentId].ContainsKey(request.PartNumber))
81+
{
82+
currentPartContent = _partialContent[request.ContentId][request.PartNumber];
83+
} else
84+
{
85+
// get the necessary part & cache
86+
if (!_partialContent.ContainsKey(request.ContentId))
87+
{
88+
_partialContent[request.ContentId] = new();
89+
}
90+
var data = GetContent(request.ContentId);
91+
List<byte> currentBuffer = new List<byte>();
92+
if (data != null)
93+
{
94+
totalParts = (ushort)Math.Ceiling((float)data.Length / PartSize);
95+
var currentIndex = request.PartNumber * PartSize;
96+
var readPos = 0;
97+
while (currentIndex + readPos < data.Length && readPos < PartSize)
98+
{
99+
currentBuffer.Add(data[currentIndex + readPos]);
100+
readPos++;
101+
}
102+
}
103+
currentPartContent = currentBuffer.ToArray();
104+
_partialContent[request.ContentId][request.PartNumber] = currentPartContent;
105+
106+
}
107+
// send the part to the request handler
108+
// request, currentPartContent
109+
ContentRequestHandler(new RequestedMultipartContent(request.ContentId) {
110+
PartNumber = request.PartNumber,
111+
TotalParts = totalParts ,
112+
ExtraBytes = currentPartContent});
113+
}
114+
}
115+
116+
private void ReceivedContent(RequestedMultipartContent content)
117+
{
118+
//handle recombining of multipart content
119+
if (!_partialContent.ContainsKey(content.ContentId))
120+
{
121+
_partialContent[content.ContentId] = new();
122+
}
123+
if (_partialContent[content.ContentId].ContainsKey(content.PartNumber))
124+
{
125+
return;
126+
}
127+
_partialContent[content.ContentId][content.PartNumber] = content.ExtraBytes;
128+
if (_partialContent[content.ContentId].Count == content.TotalParts)
129+
{
130+
var totalData = new List<byte>();
131+
foreach (var part in _partialContent[content.ContentId])
132+
{
133+
totalData.AddRange(part.Value);
134+
}
135+
_ongoingRequests.Remove(content.ContentId);
136+
if (ContentReceivedHandler != null)
137+
{
138+
var data = totalData.ToArray();
139+
RegisterContent(content.ContentId, data);
140+
ContentReceivedHandler(new RequestedMultipartContent(content.ContentId) {PartNumber = 0 ,TotalParts = 1, ExtraBytes = data});
141+
}
142+
}
143+
else
144+
{
145+
if (_ongoingRequests.ContainsKey(content.ContentId))
146+
{
147+
//request the next pending part
148+
ushort nextPart = (ushort)(content.PartNumber + 1);
149+
_ongoingRequests[content.ContentId].PartNumber = nextPart;
150+
_ongoingRequests[content.ContentId].LastRequest = DateTime.Now;
151+
if (nextPart < content.TotalParts && !_partialContent[content.ContentId].ContainsKey(nextPart))
152+
{
153+
if (SendContentRequest != null)
154+
{
155+
SendContentRequest(new RequestMultipartContent(content.ContentId) { PartNumber = nextPart });
156+
}
157+
}
158+
}
159+
}
160+
161+
162+
}
163+
164+
/// <summary>
165+
/// Used to request the content over the network
166+
/// </summary>
167+
/// <param name="contentId"></param>
168+
public void RequestContent(string contentId)
169+
{
170+
if (!_ongoingRequests.ContainsKey(contentId))
171+
{
172+
_ongoingRequests[contentId] = new() { ContentId = contentId ,PartNumber = 0, LastRequest = DateTime.Now};
173+
SendContentRequest(new RequestMultipartContent(contentId) { PartNumber = _ongoingRequests[contentId].PartNumber });
174+
}
175+
}
176+
177+
/// <summary>
178+
/// Used to register content
179+
/// </summary>
180+
/// <param name="contentId"></param>
181+
/// <param name="content"></param>
182+
public void RegisterContent(string contentId, byte[] content)
183+
{
184+
_content[contentId] = content;
185+
}
186+
187+
/// <summary>
188+
/// Used to check if content is already available
189+
/// </summary>
190+
/// <param name="contentId"></param>
191+
/// <returns></returns>
192+
public byte[] GetContent(string contentId) {
193+
if (_content.ContainsKey(contentId))
194+
{
195+
return _content[contentId];
196+
}
197+
// handle making a request for the content?
198+
return null;
199+
}
200+
201+
/// <summary>
202+
/// Get the list of content that is available
203+
/// </summary>
204+
/// <returns></returns>
205+
public List<string> GetContentList()
206+
{
207+
return _content.Keys.ToList();
208+
}
209+
}
210+
}

Multipart/RequestMultipartContent.cs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Globalization;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
8+
namespace HkmpPouch.Multipart
9+
{
10+
public class RequestMultipartContent : PipeEvent
11+
{
12+
public static string Name = "RequestMultipartContent";
13+
public ushort PartNumber = 0;
14+
public string ContentId;
15+
16+
public RequestMultipartContent(string ContentId)
17+
{
18+
base.IsReliable = true;
19+
this.ContentId = ContentId;
20+
}
21+
public override string GetName() => Name;
22+
public override string ToString()
23+
{
24+
return $"{ContentId},{PartNumber}";
25+
}
26+
}
27+
28+
public class RequestMultipartContentFactory : IEventFactory
29+
{
30+
public static RequestMultipartContentFactory Instance { get; internal set; } = new RequestMultipartContentFactory();
31+
public PipeEvent FromSerializedString(string serializedData)
32+
{
33+
var Split = serializedData.Split(',');
34+
var ContentId = Split[0];
35+
var pEvent = new RequestMultipartContent(ContentId);
36+
pEvent.PartNumber = ushort.Parse(Split[1], CultureInfo.InvariantCulture);
37+
return pEvent;
38+
}
39+
40+
public string GetName() => RequestMultipartContent.Name;
41+
}
42+
}
+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Globalization;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
8+
namespace HkmpPouch.Multipart
9+
{
10+
public class RequestedMultipartContent : PipeEvent
11+
{
12+
public static string Name = "RequestedMultipartContent";
13+
public ushort PartNumber = 0;
14+
public ushort TotalParts = 1;
15+
public string ContentId;
16+
17+
public RequestedMultipartContent(string ContentId)
18+
{
19+
base.IsReliable = true;
20+
this.ContentId = ContentId;
21+
}
22+
public override string GetName() => Name;
23+
public override string ToString()
24+
{
25+
return $"{ContentId},{PartNumber},{TotalParts}";
26+
}
27+
}
28+
public class RequestedMultipartContentFactory : IEventFactory
29+
{
30+
public static RequestedMultipartContentFactory Instance { get; internal set; } = new RequestedMultipartContentFactory();
31+
public PipeEvent FromSerializedString(string serializedData)
32+
{
33+
var Split = serializedData.Split(',');
34+
var ContentId = Split[0];
35+
var pEvent = new RequestedMultipartContent(ContentId);
36+
pEvent.PartNumber = ushort.Parse(Split[1], CultureInfo.InvariantCulture);
37+
pEvent.TotalParts = ushort.Parse(Split[2], CultureInfo.InvariantCulture);
38+
return pEvent;
39+
}
40+
41+
public string GetName() => RequestedMultipartContent.Name;
42+
}
43+
}

0 commit comments

Comments
 (0)