Skip to content

Commit 26aca1a

Browse files
author
sehat1137
committed
fix: #1874 support workers for ASGI FastStream
1 parent 9bc7a05 commit 26aca1a

File tree

4 files changed

+103
-28
lines changed

4 files changed

+103
-28
lines changed

docs/docs/en/getting-started/asgi.md

+32
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ uvicorn main:app
3838

3939
It does nothing but launch the app itself as an **ASGI lifespan**.
4040

41+
!!! note
42+
If you want to run your app using several workers, you need to use something else than `uvicorn`.
43+
```shell
44+
faststream run main:app --workers 4
45+
```
46+
```shell
47+
gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4
48+
```
49+
```shell
50+
granian --interface asgi main:app --workers 4
51+
```
52+
```shell
53+
hypercorn main:app --workers 4
54+
```
55+
56+
4157
### ASGI Routes
4258

4359
It doesn't look very helpful, so let's add some **HTTP** endpoints.
@@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi(
137153
```shell
138154
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
139155
```
156+
This possibility builded on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI.
157+
We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py).
140158

141159
## Other ASGI Compatibility
142160

@@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker)
166184
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
167185
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))
168186
```
187+
188+
!!! tip
189+
You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default
190+
191+
```shell
192+
faststream run main:app --bind unix:/tmp/socket.sock
193+
```
194+
```shell
195+
faststream run main:app --bind fd://2
196+
```
197+
You can use multiple binds if you want
198+
```shell
199+
faststream run main:app --bind 0.0.0.0:8000 '[::]:8000'
200+
```

faststream/asgi/app.py

+57-19
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Sequence,
1111
Tuple,
1212
Union,
13+
List,
1314
)
1415

1516
import anyio
@@ -20,6 +21,17 @@
2021
from faststream.asgi.websocket import WebSocketClose
2122
from faststream.log.logging import logger
2223

24+
try:
25+
from gunicorn.app.base import BaseApplication
26+
except ImportError:
27+
BaseApplication = None
28+
29+
try:
30+
import uvicorn
31+
except ImportError:
32+
uvicorn = None # type: ignore
33+
34+
2335
if TYPE_CHECKING:
2436
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
2537
from faststream.asyncapi.schema import (
@@ -43,6 +55,23 @@
4355
)
4456

4557

58+
class ASGIRunner(BaseApplication): # type: ignore
59+
def __init__(self, asgi_app: "ASGIApp", options: Dict[str, Any]):
60+
self.options = options
61+
self.asgi_app = asgi_app
62+
super().__init__()
63+
64+
def load_config(self) -> None:
65+
for k, v in self.options.items():
66+
if k in self.cfg.settings and v is not None:
67+
self.cfg.set(k.lower(), v)
68+
else:
69+
logger.warn(f"Unknown config variable: {k} with value {v}")
70+
71+
def load(self) -> "ASGIApp":
72+
return self.asgi_app
73+
74+
4675
class AsgiFastStream(Application):
4776
def __init__(
4877
self,
@@ -146,25 +175,34 @@ async def run(
146175
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
147176
sleep_time: float = 0.1,
148177
) -> None:
149-
import uvicorn
150-
151-
if not run_extra_options:
152-
run_extra_options = {}
153-
port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type]
154-
workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type]
155-
host = str(run_extra_options.pop("host", "localhost"))
156-
fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type]
157-
config = uvicorn.Config(
158-
self,
159-
host=host,
160-
port=port,
161-
log_level=log_level,
162-
workers=workers,
163-
fd=fd if fd != -1 else None,
164-
**run_extra_options,
165-
)
166-
server = uvicorn.Server(config)
167-
await server.serve()
178+
if not all([uvicorn, BaseApplication]):
179+
raise RuntimeError(
180+
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI"
181+
)
182+
183+
run_extra_options = run_extra_options or {}
184+
185+
bindings: List[str] = []
186+
host = run_extra_options.pop("host", None)
187+
port = run_extra_options.pop("port", None)
188+
if host is not None and port is not None:
189+
bindings.append(f"{host}:{port}")
190+
elif host is not None:
191+
bindings.append(f"{host}:8000")
192+
elif port is not None:
193+
bindings.append(f"127.0.0.1:{port}")
194+
195+
bind = run_extra_options.get("bind")
196+
if isinstance(bind, list):
197+
bindings.extend(bind) # type: ignore
198+
elif isinstance(bind, str):
199+
bindings.append(bind)
200+
201+
run_extra_options["bind"] = bindings or "127.0.0.1:8000"
202+
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
203+
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"
204+
205+
ASGIRunner(self, run_extra_options).run()
168206

169207
@asynccontextmanager
170208
async def start_lifespan_context(self) -> AsyncIterator[None]:

faststream/cli/utils/parser.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import re
12
from functools import reduce
23
from typing import TYPE_CHECKING, Dict, List, Tuple
34

45
if TYPE_CHECKING:
56
from faststream.types import SettingField
67

8+
APP_REGEX = re.compile(r"[a-zA-Z]+:[a-zA-Z]+")
9+
710

811
def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
912
"""Parses command line arguments."""
@@ -22,7 +25,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
2225
),
2326
"-",
2427
]:
25-
if ":" in item:
28+
if re.match(APP_REGEX, item):
2629
app = item
2730

2831
else:

tests/cli/utils/test_parser.py

+10-8
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,20 @@
2323
)
2424
ARG6 = ("--some-key",)
2525
ARG7 = ("--k7", "1", "2", "--k7", "3")
26+
ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2")
2627

2728

2829
@pytest.mark.parametrize(
2930
"args",
3031
( # noqa: PT007
31-
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
32-
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
33-
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
34-
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7),
35-
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7),
36-
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7),
37-
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7),
38-
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION),
32+
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
33+
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
34+
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
35+
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
36+
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8),
37+
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8),
38+
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8),
39+
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION),
3940
),
4041
)
4142
def test_custom_argument_parsing(args: Tuple[str]):
@@ -49,4 +50,5 @@ def test_custom_argument_parsing(args: Tuple[str]):
4950
"k5": ["1", "1"],
5051
"some_key": True,
5152
"k7": ["1", "2", "3"],
53+
"bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"],
5254
}

0 commit comments

Comments
 (0)