You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
I tried to use raise NackMessage() in my code, but the exception StreamMessage.reject() got an unexpected keyword argument 'redis' pops out.
How to reproduce
Include source code:
The subscriber side:
importtimeimportasynciofromfaststreamimportFastStream, Loggerfromfaststream.redisimportRedisBroker, StreamSubfromfaststream.redis.annotationsimportRedisMessage, Redisfromfaststream.exceptionsimportNackMessagebroker=RedisBroker("redis://default:password@localhost:6379")
app=FastStream(broker)
asyncdefexecute_order(order):
print(f"Executing order: {order['order_id']}")
awaitasyncio.sleep(1)
print(f"Order executed: {order['order_id']}")
@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))asyncdefhandle(body: dict, logger: Logger, msg: RedisMessage, redis: Redis):
logger.info(f">>>> I'm 1 con, {body}")
current_time=int(time.time())
ifcurrent_time>=body.get("execution_time"):
awaitexecute_order(body)
awaitmsg.ack(redis)
logger.info(f">>>> I'm 1, {body} is executed and acked.")
else:
# I thought by nacking this message, this consumer would be notified by the same message later (cause it's not processed), # but I was wrong.# this message went back to stream, and never came back to consumer 1.# await msg.nack()logger.info(f">> I'm 1, {body} is not executed and acked.")
raiseNackMessage()
The publisher side:
importasyncioimportrandomimporttimefromfaststream.redisimportRedisBrokerdefmy_random(d):
''' Generates a random number with d digits '''returnrandom.randrange(10**(d-1), 10**d)
asyncdefmain(msg: dict):
asyncwithRedisBroker("redis://default:password@localhost:6379") asbr:
awaitbr.publish(msg, stream="test-stream")
if__name__=="__main__":
delay_minutes=5payload= {"order_id": str(my_random(6)), "execution_time": int(time.time()) + (delay_minutes*60)}
asyncio.run(main(payload))
Expected behavior
after 5 min, this order will be executed...
Observed behavior
StreamMessage.reject() got an unexpected keyword argument 'redis'
Screenshots
N/A
Environment
Running FastStream 0.5.12 with CPython 3.10.13 on Linux
Additional context
N/A
The text was updated successfully, but these errors were encountered:
Describe the bug
I tried to use
raise NackMessage()
in my code, but the exceptionStreamMessage.reject() got an unexpected keyword argument 'redis'
pops out.How to reproduce
Include source code:
The subscriber side:
The publisher side:
Expected behavior
after 5 min, this order will be
executed
...Observed behavior
StreamMessage.reject() got an unexpected keyword argument 'redis'
Screenshots
N/A
Environment
Running FastStream 0.5.12 with CPython 3.10.13 on Linux
Additional context
N/A
The text was updated successfully, but these errors were encountered: