Commit de873fc 1 parent 230d1ce commit de873fc Copy full SHA for de873fc
File tree 2 files changed +30
-2
lines changed
2 files changed +30
-2
lines changed Original file line number Diff line number Diff line change @@ -19,7 +19,7 @@ async def ack(self) -> None:
19
19
20
20
def ack_sync (self ) -> None :
21
21
if not self .raw_message ._ackd :
22
- self .raw_message .ack ()
22
+ self .raw_message .ack_sync ()
23
23
super ().ack ()
24
24
25
25
async def nack (
Original file line number Diff line number Diff line change 1
1
import asyncio
2
- from unittest .mock import Mock , patch
2
+ from unittest .mock import MagicMock , Mock , patch
3
3
4
4
import pytest
5
5
from nats .aio .msg import Msg
@@ -214,6 +214,34 @@ async def handler(msg: NatsMessage):
214
214
215
215
assert event .is_set ()
216
216
217
+ async def test_consume_ack_sync_manual (
218
+ self ,
219
+ queue : str ,
220
+ event : asyncio .Event ,
221
+ stream : JStream ,
222
+ ):
223
+ consume_broker = self .get_broker (apply_types = True )
224
+
225
+ @consume_broker .subscriber (queue , stream = stream )
226
+ async def handler (msg : NatsMessage ):
227
+ msg .ack_sync ()
228
+ event .set ()
229
+
230
+ async with self .patch_broker (consume_broker ) as br :
231
+ await br .start ()
232
+
233
+ with patch .object (Msg , "ack_sync" , new_callable = MagicMock ) as m :
234
+ await asyncio .wait (
235
+ (
236
+ asyncio .create_task (br .publish ("hello" , queue )),
237
+ asyncio .create_task (event .wait ()),
238
+ ),
239
+ timeout = 3 ,
240
+ )
241
+ m .assert_called_once ()
242
+
243
+ assert event .is_set ()
244
+
217
245
async def test_consume_ack_raise (
218
246
self ,
219
247
queue : str ,
You can’t perform that action at this time.
0 commit comments