Skip to content

Commit

Permalink
implement on netfx
Browse files Browse the repository at this point in the history
  • Loading branch information
Wraith2 committed Jun 26, 2021
1 parent 598d450 commit 41af8ac
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1757,19 +1757,13 @@ private int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2270,19 +2264,14 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2532,19 +2521,15 @@ private SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,17 @@ internal int ObjectID
// 2) post first packet write, but before session return - a call to cancel will send an
// attention to the server
// 3) post session close - no attention is allowed
private bool _cancelled;
private const int _waitForCancellationLockPollTimeout = 100;

private static class CancelState
{
public const int Unset = 0;
public const int Closed = 1;
public const int Cancelled = 2;
}

private int _cancelState;

// This variable is used to prevent sending an attention by another thread that is not the
// current owner of the stateObj. I currently do not know how this can happen. Mark added
// the code but does not remember either. At some point, we need to research killing this
Expand Down Expand Up @@ -650,68 +658,57 @@ internal void Activate(object owner)
Debug.Assert(result == 1, "invalid deactivate count");
}

internal bool SetCancelStateClosed()
{
return Interlocked.CompareExchange(ref _cancelState, CancelState.Unset, CancelState.Closed) == CancelState.Unset;
}

// This method is only called by the command or datareader as a result of a user initiated
// cancel request.
internal void Cancel(int objectID)
{
bool hasLock = false;
try
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
if (
(_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken) &&
Interlocked.CompareExchange(ref _cancelState, CancelState.Unset, CancelState.Cancelled) == CancelState.Unset
)
{
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
// don't allow objectID -1 since it is reserved for 'not associated with a command'
// yes, the 2^32-1 comand won't cancel - but it also won't cancel when we don't want it
if (
(objectID == _allowObjectID) &&
(objectID != -1))
{

Monitor.TryEnter(this, _waitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// At some point in the future, I hope to remove this.
// This lock is also protecting against concurrent close and async continuations

// don't allow objectID -1 since it is reserved for 'not associated with a command'
// yes, the 2^32-1 comand won't cancel - but it also won't cancel when we don't want it
if ((!_cancelled) && (objectID == _allowObjectID) && (objectID != -1))
if (_pendingData && !_attentionSent)
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the conneciton closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
_cancelled = true;

if (_pendingData && !_attentionSent)
try
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the conneciton closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

// CancelRequest - use to cancel while writing a request to the server
Expand Down Expand Up @@ -804,7 +801,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelled = false;
_cancelState = CancelState.Unset;
_allowObjectID = -1;

if (_attentionSent)
Expand Down Expand Up @@ -1108,10 +1105,10 @@ internal Task ExecuteFlush()
{
lock (this)
{
if (_cancelled && 1 == _outputPacketNumber)
if (_cancelState != CancelState.Unset && 1 == _outputPacketNumber)
{
ResetBuffer();
_cancelled = false;
_cancelState = CancelState.Unset;
throw SQL.OperationCancelled();
}
else
Expand Down Expand Up @@ -3397,7 +3394,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelled) && (_parser._asyncWrite);
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -3446,7 +3443,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -4128,7 +4125,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!_attentionReceived && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {_attentionReceived}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down

0 comments on commit 41af8ac

Please sign in to comment.