Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: StreamMessage.reject() got an unexpected keyword argument 'redis' #1544

Closed
allanwakes opened this issue Jun 21, 2024 · 2 comments · Fixed by #1546
Closed

Bug: StreamMessage.reject() got an unexpected keyword argument 'redis' #1544

allanwakes opened this issue Jun 21, 2024 · 2 comments · Fixed by #1546
Labels
bug Something isn't working Redis Issues related to `faststream.redis` module and Redis features

Comments

@allanwakes
Copy link

Describe the bug
I tried to use raise NackMessage() in my code, but the exception StreamMessage.reject() got an unexpected keyword argument 'redis' pops out.

How to reproduce
Include source code:

The subscriber side:

import time
import asyncio

from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub
from faststream.redis.annotations import RedisMessage, Redis
from faststream.exceptions import NackMessage

broker = RedisBroker("redis://default:password@localhost:6379")
app = FastStream(broker)


async def execute_order(order):
    print(f"Executing order: {order['order_id']}")
    await asyncio.sleep(1)
    print(f"Order executed: {order['order_id']}")


@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))
async def handle(body: dict, logger: Logger, msg: RedisMessage, redis: Redis):
    logger.info(f">>>> I'm 1 con, {body}")
    current_time = int(time.time())
    if current_time >= body.get("execution_time"):
        await execute_order(body)
        await msg.ack(redis)
        logger.info(f">>>> I'm 1, {body} is executed and acked.")
    else:
        # I thought by nacking this message, this consumer would be notified by the same message later (cause it's not processed), 
        # but I was wrong.
        # this message went back to stream, and never came back to consumer 1.
        # await msg.nack()
        logger.info(f">> I'm 1, {body} is not executed and acked.")
        raise NackMessage()

The publisher side:

import asyncio
import random
import time

from faststream.redis import RedisBroker


def my_random(d):
    ''' Generates a random number with d digits '''
    return random.randrange(10**(d-1), 10**d)


async def main(msg: dict):
    async with RedisBroker("redis://default:password@localhost:6379") as br:
        await br.publish(msg, stream="test-stream")
    

if __name__ == "__main__":
    delay_minutes = 5
    payload = {"order_id": str(my_random(6)), "execution_time": int(time.time()) + (delay_minutes * 60)}
    asyncio.run(main(payload))

Expected behavior
after 5 min, this order will be executed...

Observed behavior
StreamMessage.reject() got an unexpected keyword argument 'redis'

Screenshots
N/A

Environment
Running FastStream 0.5.12 with CPython 3.10.13 on Linux

Additional context
N/A

@allanwakes allanwakes added the bug Something isn't working label Jun 21, 2024
@Lancetnik Lancetnik added the Redis Issues related to `faststream.redis` module and Redis features label Jun 21, 2024
@Lancetnik
Copy link
Member

Thank you for the report, I think, we can upload the fix today

@Lancetnik
Copy link
Member

Btw, you can do not call await msg.ack(redis) - it will be called automatically after your function execution if there is no exceptions

github-merge-queue bot pushed a commit that referenced this issue Jun 21, 2024
* fix (#1544): correct Redis message nack & reject signature

* docs: generate API References

* chore: trigger CI

---------

Co-authored-by: Lancetnik <Lancetnik@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Redis Issues related to `faststream.redis` module and Redis features
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants