generated from carefree0910/carefree-client-template
-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathtxt2img.py
180 lines (151 loc) · 5.09 KB
/
txt2img.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import time
from enum import Enum
from typing import Any
from fastapi import Response
from pydantic import Field
from cfclient.utils import download_image_with_retry
from cfclient.models import ImageModel
from .common import cleanup
from .common import init_sd_ms
from .common import get_sd_from
from .common import get_sd_inpainting
from .common import handle_diffusion_model
from .common import get_bytes_from_diffusion
from .common import IAlgorithm
from .common import Txt2ImgModel
from .parameters import save_gpu_ram
txt2img_sd_endpoint = "/txt2img/sd"
txt2img_sd_inpainting_endpoint = "/txt2img/sd.inpainting"
txt2img_sd_outpainting_endpoint = "/txt2img/sd.outpainting"
class Txt2ImgSDModel(Txt2ImgModel):
w: int = Field(512, description="The desired output width.")
h: int = Field(512, description="The desired output height.")
is_anime: bool = Field(
False,
description="Whether should we generate anime images or not.",
)
@IAlgorithm.auto_register()
class Txt2ImgSD(IAlgorithm):
model_class = Txt2ImgSDModel
endpoint = txt2img_sd_endpoint
def initialize(self) -> None:
self.ms = init_sd_ms()
async def run(self, data: Txt2ImgSDModel, *args: Any) -> Response:
self.log_endpoint(data)
t0 = time.time()
m = get_sd_from(self.ms, data)
t1 = time.time()
size = data.w, data.h
kwargs = handle_diffusion_model(m, data)
img_arr = m.txt2img(
data.text,
size=size,
max_wh=data.max_wh,
**kwargs,
).numpy()[0]
content = get_bytes_from_diffusion(img_arr)
t2 = time.time()
cleanup(m)
self.log_times(
{
"get_model": t1 - t0,
"inference": t2 - t1,
"cleanup": time.time() - t2,
}
)
return Response(content=content, media_type="image/png")
class PaddingModes(str, Enum):
CV2_NS = "cv2_ns"
CV2_TELEA = "cv2_telea"
class Txt2ImgSDInpaintingModel(Txt2ImgModel, ImageModel):
mask_url: str = Field(
...,
description="""
The `cdn` / `cos` url of the user's mask.
> `cos` url from `qcloud` is preferred.
> If empty string is provided, then we will use an empty mask, which means we will simply perform an image-to-image transform.
""",
)
class Txt2ImgSDOutpaintingModel(Txt2ImgModel, ImageModel):
pass
@IAlgorithm.auto_register()
class Txt2ImgSDInpainting(IAlgorithm):
model_class = Txt2ImgSDInpaintingModel
endpoint = txt2img_sd_inpainting_endpoint
def initialize(self) -> None:
self.m = get_sd_inpainting()
async def run(self, data: Txt2ImgSDInpaintingModel, *args: Any) -> Response:
self.log_endpoint(data)
t0 = time.time()
image = await download_image_with_retry(self.http_client.session, data.url)
mask = await download_image_with_retry(self.http_client.session, data.mask_url)
t1 = time.time()
if save_gpu_ram():
self.m.to("cuda:0", use_half=True)
t2 = time.time()
kwargs = handle_diffusion_model(self.m, data)
img_arr = self.m.txt2img_inpainting(
data.text,
image,
mask,
anchor=64,
max_wh=data.max_wh,
**kwargs,
).numpy()[0]
content = get_bytes_from_diffusion(img_arr)
t3 = time.time()
cleanup(self.m)
self.log_times(
{
"download": t1 - t0,
"get_model": t2 - t1,
"inference": t3 - t2,
"cleanup": time.time() - t3,
}
)
return Response(content=content, media_type="image/png")
@IAlgorithm.auto_register()
class Txt2ImgSDOutpainting(IAlgorithm):
model_class = Txt2ImgSDOutpaintingModel
endpoint = txt2img_sd_outpainting_endpoint
def initialize(self) -> None:
self.m = get_sd_inpainting()
async def run(self, data: Txt2ImgSDOutpaintingModel, *args: Any) -> Response:
self.log_endpoint(data)
t0 = time.time()
image = await download_image_with_retry(self.http_client.session, data.url)
t1 = time.time()
if save_gpu_ram():
self.m.to("cuda:0", use_half=True)
t2 = time.time()
kwargs = handle_diffusion_model(self.m, data)
img_arr = self.m.outpainting(
data.text,
image,
anchor=64,
max_wh=data.max_wh,
**kwargs,
).numpy()[0]
content = get_bytes_from_diffusion(img_arr)
t3 = time.time()
cleanup(self.m)
self.log_times(
{
"download": t1 - t0,
"get_model": t2 - t1,
"inference": t3 - t2,
"cleanup": time.time() - t3,
}
)
return Response(content=content, media_type="image/png")
__all__ = [
"txt2img_sd_endpoint",
"txt2img_sd_inpainting_endpoint",
"txt2img_sd_outpainting_endpoint",
"Txt2ImgSDModel",
"Txt2ImgSDInpaintingModel",
"Txt2ImgSDOutpaintingModel",
"Txt2ImgSD",
"Txt2ImgSDInpainting",
"Txt2ImgSDOutpainting",
]