-
Notifications
You must be signed in to change notification settings - Fork 230
/
Copy pathNpgsqlModificationCommandBatch.cs
209 lines (182 loc) · 8.76 KB
/
NpgsqlModificationCommandBatch.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore.Update;
namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal
{
/// <summary>
/// The Npgsql-specific implementation for <see cref="ModificationCommandBatch" />.
/// </summary>
/// <remarks>
/// The usual ModificationCommandBatch implementation is <see cref="AffectedCountModificationCommandBatch"/>,
/// which selects the number of rows modified via a SQL query.
///
/// PostgreSQL actually has no way of selecting the modified row count.
/// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
/// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
/// identifier F121-01.
///
/// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
/// cleaner and more efficient anyway (no additional query).
/// </remarks>
public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
{
const int DefaultBatchSize = 1000;
readonly int _maxBatchSize;
long _parameterCount;
/// <summary>
/// Constructs an instance of the <see cref="NpgsqlModificationCommandBatch"/> class.
/// </summary>
/// <param name="commandBuilderFactory">The builder to build commands.</param>
/// <param name="sqlGenerationHelper">A helper for SQL generation.</param>
/// <param name="updateSqlGenerator">A SQL generator for insert, update, and delete commands.</param>
/// <param name="valueBufferFactoryFactory">A factory for creating <see cref="ValueBuffer" /> factories.</param>
/// <param name="maxBatchSize">The maximum count of commands to batch.</param>
public NpgsqlModificationCommandBatch(
[NotNull] ModificationCommandBatchFactoryDependencies dependencies,
[CanBeNull] int? maxBatchSize)
: base(dependencies)
{
if (maxBatchSize.HasValue && maxBatchSize.Value <= 0)
throw new ArgumentOutOfRangeException(nameof(maxBatchSize), RelationalStrings.InvalidMaxBatchSize);
_maxBatchSize = maxBatchSize ?? DefaultBatchSize;
}
protected override int GetParameterCount() => (int)_parameterCount;
protected override bool CanAddCommand(ModificationCommand modificationCommand)
{
if (ModificationCommands.Count >= _maxBatchSize)
return false;
var newParamCount = _parameterCount + modificationCommand.ColumnModifications.Count;
if (newParamCount > int.MaxValue)
return false;
_parameterCount = newParamCount;
return true;
}
protected override bool IsCommandTextValid()
=> true;
protected override void Consume(RelationalDataReader reader)
{
var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
var commandIndex = 0;
try
{
while (true)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ModificationCommands[nextPropagating].RequiresResultPropagation;
nextPropagating++) ;
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
throw new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(1, 0),
ModificationCommands[commandIndex].Entries
);
}
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
break;
}
// Propagate to results from the reader to the ModificationCommand
var modificationCommand = ModificationCommands[commandIndex++];
if (!reader.Read())
{
throw new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(1, 0),
modificationCommand.Entries);
}
var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
npgsqlReader.NextResult();
}
}
catch (DbUpdateException)
{
throw;
}
catch (Exception ex)
{
throw new DbUpdateException(
RelationalStrings.UpdateStoreException,
ex,
ModificationCommands[commandIndex].Entries);
}
}
protected override async Task ConsumeAsync(
RelationalDataReader reader,
CancellationToken cancellationToken = default)
{
var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
var commandIndex = 0;
try
{
while (true)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ModificationCommands[nextPropagating].RequiresResultPropagation;
nextPropagating++)
;
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
throw new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(1, 0),
ModificationCommands[commandIndex].Entries
);
}
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken)), "Expected less resultsets");
break;
}
// Extract result from the command and propagate it
var modificationCommand = ModificationCommands[commandIndex++];
if (!(await reader.ReadAsync(cancellationToken)))
{
throw new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(1, 0),
modificationCommand.Entries
);
}
var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
await npgsqlReader.NextResultAsync(cancellationToken);
}
}
catch (DbUpdateException)
{
throw;
}
catch (Exception ex)
{
throw new DbUpdateException(
RelationalStrings.UpdateStoreException,
ex,
ModificationCommands[commandIndex].Entries);
}
}
}
}