@@ -28,7 +28,8 @@ public abstract class PostgreSqlCachingRepository<T> : IDisposable where T : cla
28
28
. Handle < Npgsql . NpgsqlException > ( e => e . IsTransient )
29
29
. WaitAndRetryAsync ( 10 , i => TimeSpan . FromSeconds ( Math . Pow ( 2 , i ) ) ) ;
30
30
31
- private readonly Channel < ( T , WriteAction , TaskCompletionSource < T > ) > itemsToWrite = Channel . CreateUnbounded < ( T , WriteAction , TaskCompletionSource < T > ) > ( ) ;
31
+ private record struct WriteItem ( T DbItem , WriteAction Action , TaskCompletionSource < T > TaskSource ) ;
32
+ private readonly Channel < WriteItem > itemsToWrite = Channel . CreateUnbounded < WriteItem > ( ) ;
32
33
private readonly ConcurrentDictionary < T , object > updatingItems = new ( ) ; // Collection of all pending updates to be written, to faciliate detection of simultaneous parallel updates.
33
34
private readonly CancellationTokenSource writerWorkerCancellationTokenSource = new ( ) ;
34
35
private readonly Task writerWorkerTask ;
@@ -45,8 +46,8 @@ protected enum WriteAction { Add, Update, Delete }
45
46
/// Constructor
46
47
/// </summary>
47
48
/// <param name="hostApplicationLifetime">Used for requesting termination of the current application if the writer task unexpectedly exits.</param>
48
- /// <param name="logger"></param>
49
- /// <param name="cache"></param>
49
+ /// <param name="logger">Logging interface. </param>
50
+ /// <param name="cache">Memory cache for fast access to active items. </param>
50
51
/// <exception cref="System.Diagnostics.UnreachableException"></exception>
51
52
protected PostgreSqlCachingRepository ( Microsoft . Extensions . Hosting . IHostApplicationLifetime hostApplicationLifetime , ILogger logger = default , ICache < T > cache = default )
52
53
{
@@ -61,17 +62,16 @@ protected PostgreSqlCachingRepository(Microsoft.Extensions.Hosting.IHostApplicat
61
62
62
63
if ( task . Status == TaskStatus . Faulted )
63
64
{
64
- Console . WriteLine ( $ "Repository WriterWorkerAsync failed unexpectedly with: { task . Exception . Message } .") ;
65
65
Logger . LogCritical ( task . Exception , "Repository WriterWorkerAsync failed unexpectedly with: {ErrorMessage}." , task . Exception . Message ) ;
66
+ Console . WriteLine ( $ "Repository WriterWorkerAsync failed unexpectedly with: { task . Exception . Message } .") ;
66
67
}
67
68
68
- const string errMessage = "Repository WriterWorkerAsync unexpectedly completed. The TES application will now be stopped." ;
69
+ const string errMessage = "Repository WriterWorkerAsync unexpectedly completed. The service will now be stopped." ;
69
70
Logger . LogCritical ( errMessage ) ;
70
71
Console . WriteLine ( errMessage ) ;
71
72
72
73
await Task . Delay ( TimeSpan . FromSeconds ( 40 ) ) ; // Give the logger time to flush; default flush is 30s
73
74
hostApplicationLifetime ? . StopApplication ( ) ;
74
- return ;
75
75
} , TaskContinuationOptions . NotOnCanceled )
76
76
. ContinueWith ( task => Logger . LogInformation ( "The repository WriterWorkerAsync ended normally" ) , TaskContinuationOptions . OnlyOnCanceled ) ;
77
77
}
@@ -148,7 +148,7 @@ protected Task<T> AddUpdateOrRemoveItemInDbAsync(T item, WriteAction action, Can
148
148
}
149
149
}
150
150
151
- if ( ! itemsToWrite . Writer . TryWrite ( ( item , action , source ) ) )
151
+ if ( ! itemsToWrite . Writer . TryWrite ( new ( item , action , source ) ) )
152
152
{
153
153
throw new InvalidOperationException ( "Failed to TryWrite to _itemsToWrite channel." ) ;
154
154
}
@@ -172,7 +172,7 @@ Task<T> RemoveUpdatingItem(Task<T> task)
172
172
/// </summary>
173
173
private async Task WriterWorkerAsync ( CancellationToken cancellationToken )
174
174
{
175
- var list = new List < ( T , WriteAction , TaskCompletionSource < T > ) > ( ) ;
175
+ var list = new List < WriteItem > ( ) ;
176
176
177
177
await foreach ( var itemToWrite in itemsToWrite . Reader . ReadAllAsync ( cancellationToken ) )
178
178
{
@@ -191,7 +191,7 @@ private async Task WriterWorkerAsync(CancellationToken cancellationToken)
191
191
// If cancellation is requested, do not write any more items
192
192
}
193
193
194
- private async ValueTask WriteItemsAsync ( IList < ( T DbItem , WriteAction Action , TaskCompletionSource < T > TaskSource ) > dbItems , CancellationToken cancellationToken )
194
+ private async ValueTask WriteItemsAsync ( IList < WriteItem > dbItems , CancellationToken cancellationToken )
195
195
{
196
196
cancellationToken . ThrowIfCancellationRequested ( ) ;
197
197
@@ -208,19 +208,25 @@ private async ValueTask WriteItemsAsync(IList<(T DbItem, WriteAction Action, Tas
208
208
dbContext . UpdateRange ( dbItems . Where ( e => WriteAction . Update . Equals ( e . Action ) ) . Select ( e => e . DbItem ) ) ;
209
209
dbContext . RemoveRange ( dbItems . Where ( e => WriteAction . Delete . Equals ( e . Action ) ) . Select ( e => e . DbItem ) ) ;
210
210
await asyncPolicy . ExecuteAsync ( dbContext . SaveChangesAsync , cancellationToken ) ;
211
+ var action = ActionOnSuccess ( ) ;
212
+ OperateOnAll ( dbItems , action ) ;
211
213
}
212
214
catch ( Exception ex )
213
215
{
214
216
// It doesn't matter which item the failure was for, we will fail all items in this round.
215
217
// TODO: are there exceptions Postgre will send us that will tell us which item(s) failed or alternately succeeded?
216
- FailAll ( dbItems . Select ( e => e . TaskSource ) , ex ) ;
217
- return ;
218
+ var action = ActionOnFailure ( ex ) ;
219
+ OperateOnAll ( dbItems , action ) ;
218
220
}
219
221
220
- _ = Parallel . ForEach ( dbItems , e => e . TaskSource . TrySetResult ( e . DbItem ) ) ;
222
+ static void OperateOnAll ( IEnumerable < WriteItem > sources , Action < WriteItem > action )
223
+ => _ = Parallel . ForEach ( sources , e => action ( e ) ) ;
224
+
225
+ static Action < WriteItem > ActionOnFailure ( Exception ex ) =>
226
+ e => _ = e . TaskSource . TrySetException ( new AggregateException ( Enumerable . Empty < Exception > ( ) . Append ( ex ) ) ) ;
221
227
222
- static void FailAll ( IEnumerable < TaskCompletionSource < T > > sources , Exception ex )
223
- => _ = Parallel . ForEach ( sources , s => s . TrySetException ( new AggregateException ( Enumerable . Empty < Exception > ( ) . Append ( ex ) ) ) ) ;
228
+ static Action < WriteItem > ActionOnSuccess ( ) =>
229
+ e => _ = e . TaskSource . TrySetResult ( e . DbItem ) ;
224
230
}
225
231
226
232
protected virtual void Dispose ( bool disposing )
@@ -233,7 +239,7 @@ protected virtual void Dispose(bool disposing)
233
239
234
240
try
235
241
{
236
- writerWorkerTask . Wait ( ) ;
242
+ writerWorkerTask . GetAwaiter ( ) . GetResult ( ) ;
237
243
}
238
244
catch ( AggregateException aex ) when ( aex ? . InnerException is TaskCanceledException ex && writerWorkerCancellationTokenSource . Token == ex . CancellationToken )
239
245
{ } // Expected return from Wait().
0 commit comments