This repository has been archived by the owner on Feb 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathProgram.cs
176 lines (153 loc) · 7.04 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
using System;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Polly;
namespace FhirLoader
{
class Program
{
static void Main(
string inputFolder,
Uri fhirServerUrl,
Uri authority = null,
string clientId = null,
string clientSecret = null,
string accessToken = null,
string bufferFileName = "resources.json",
bool reCreateBufferIfExists = false,
bool forcePost = false,
int maxDegreeOfParallelism = 8,
int refreshInterval = 5)
{
HttpClient httpClient = new HttpClient();
MetricsCollector metrics = new MetricsCollector();
// Create an ndjson file from the FHIR bundles in folder
if (!(new FileInfo(bufferFileName).Exists) || reCreateBufferIfExists)
{
Console.WriteLine("Creating ndjson buffer file...");
CreateBufferFile(inputFolder, bufferFileName);
Console.WriteLine("Buffer created.");
}
bool useAuth = authority != null && clientId != null && clientSecret != null && accessToken == null;
AuthenticationContext authContext = useAuth ? new AuthenticationContext(authority.AbsoluteUri, new TokenCache()) : null;
ClientCredential clientCredential = useAuth ? new ClientCredential(clientId, clientSecret) : null;
var randomGenerator = new Random();
var actionBlock = new ActionBlock<string>(async resourceString =>
{
var resource = JObject.Parse(resourceString);
string resource_type = (string)resource["resourceType"];
string id = (string)resource["id"];
Thread.Sleep(TimeSpan.FromMilliseconds(randomGenerator.Next(50)));
StringContent content = new StringContent(resourceString, Encoding.UTF8, "application/json");
var pollyDelays =
new[]
{
TimeSpan.FromMilliseconds(2000 + randomGenerator.Next(50)),
TimeSpan.FromMilliseconds(3000 + randomGenerator.Next(50)),
TimeSpan.FromMilliseconds(5000 + randomGenerator.Next(50)),
TimeSpan.FromMilliseconds(8000 + randomGenerator.Next(50)),
TimeSpan.FromMilliseconds(12000 + randomGenerator.Next(50)),
TimeSpan.FromMilliseconds(16000 + randomGenerator.Next(50)),
};
HttpResponseMessage uploadResult = await Policy
.HandleResult<HttpResponseMessage>(response => !response.IsSuccessStatusCode)
.WaitAndRetryAsync(pollyDelays, (result, timeSpan, retryCount, context) =>
{
if (retryCount > 3)
{
Console.WriteLine($"Request failed with {result.Result.StatusCode}. Waiting {timeSpan} before next retry. Retry attempt {retryCount}");
}
})
.ExecuteAsync(() =>
{
var message = forcePost || string.IsNullOrEmpty(id)
? new HttpRequestMessage(HttpMethod.Post, new Uri(fhirServerUrl, $"/{resource_type}"))
: new HttpRequestMessage(HttpMethod.Put, new Uri(fhirServerUrl, $"/{resource_type}/{id}"));
message.Content = content;
if (useAuth)
{
var authResult = authContext.AcquireTokenAsync(fhirServerUrl.AbsoluteUri.TrimEnd('/'), clientCredential).Result;
message.Headers.Authorization = new AuthenticationHeaderValue("Bearer", authResult.AccessToken);
}
else if (accessToken != null)
{
message.Headers.Authorization = new AuthenticationHeaderValue("Bearer", accessToken);
}
return httpClient.SendAsync(message);
});
if (!uploadResult.IsSuccessStatusCode)
{
string resultContent = await uploadResult.Content.ReadAsStringAsync();
Console.WriteLine(resultContent);
throw new Exception($"Unable to upload to server. Error code {uploadResult.StatusCode}");
}
metrics.Collect(DateTime.Now);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
}
);
// Start output on timer
var t = new Task( () => {
while (true)
{
Thread.Sleep(1000 * refreshInterval);
Console.WriteLine($"Resources per second: {metrics.EventsPerSecond}");
}
});
t.Start();
// Read the ndjson file and feed it to the threads
System.IO.StreamReader buffer = new System.IO.StreamReader(bufferFileName);
string line;
while ((line = buffer.ReadLine()) != null)
{
actionBlock.Post(line);
}
actionBlock.Complete();
actionBlock.Completion.Wait();
}
private static void CreateBufferFile(string inputFolder, string bufferFileName)
{
using (System.IO.StreamWriter outFile = new System.IO.StreamWriter(bufferFileName))
{
string[] files = Directory.GetFiles(inputFolder, "*.json", SearchOption.TopDirectoryOnly);
foreach (var file in files)
{
string bundleText = File.ReadAllText(file);
JObject bundle;
try
{
bundle = JObject.Parse(bundleText);
}
catch (JsonReaderException)
{
Console.WriteLine("Input file is not a valid JSON document");
throw;
}
try
{
SyntheaReferenceResolver.ConvertUUIDs(bundle);
}
catch
{
Console.WriteLine("Failed to resolve references in doc");
throw;
}
foreach (var r in bundle.SelectTokens("$.entry[*].resource"))
{
outFile.WriteLine(r.ToString(Formatting.None));
}
}
}
}
}
}