generated from carefree0910/carefree-client-template
-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathtxt2img.py
197 lines (165 loc) · 5.74 KB
/
txt2img.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import time
import numpy as np
from PIL import Image
from enum import Enum
from typing import Any
from typing import Optional
from fastapi import Response
from pydantic import Field
from pydantic import BaseModel
from cftool.cv import to_uint8
from cfclient.models import ImageModel
from .utils import APIs
from .common import register_sd
from .common import register_sd_inpainting
from .common import get_sd_from
from .common import get_response
from .common import handle_diffusion_model
from .common import handle_diffusion_hooks
from .common import get_normalized_arr_from_diffusion
from .common import handle_diffusion_inpainting_model
from .common import IAlgorithm
from .common import HighresModel
from .common import Txt2ImgModel
from .common import ReturnArraysModel
from .common import CommonSDInpaintingModel
txt2img_sd_endpoint = "/txt2img/sd"
txt2img_sd_inpainting_endpoint = "/txt2img/sd.inpainting"
txt2img_sd_outpainting_endpoint = "/txt2img/sd.outpainting"
class _Txt2ImgSDModel(BaseModel):
w: int = Field(512, description="The desired output width.")
h: int = Field(512, description="The desired output height.")
class Txt2ImgSDModel(ReturnArraysModel, Txt2ImgModel, _Txt2ImgSDModel):
pass
@IAlgorithm.auto_register()
class Txt2ImgSD(IAlgorithm):
model_class = Txt2ImgSDModel
endpoint = txt2img_sd_endpoint
def initialize(self) -> None:
register_sd()
async def run(self, data: Txt2ImgSDModel, *args: Any, **kwargs: Any) -> Response:
self.log_endpoint(data)
t0 = time.time()
m = get_sd_from(APIs.SD, data)
t1 = time.time()
size = data.w, data.h
kwargs.update(handle_diffusion_model(m, data))
await handle_diffusion_hooks(m, data, self, kwargs)
img_arr = m.txt2img(
data.text,
size=size,
max_wh=data.max_wh,
**kwargs,
).numpy()[0]
t2 = time.time()
res = get_response(data, [to_uint8(get_normalized_arr_from_diffusion(img_arr))])
self.log_times(
{
"get_model": t1 - t0,
"inference": t2 - t1,
"get_response": time.time() - t2,
}
)
return res
class PaddingModes(str, Enum):
CV2_NS = "cv2_ns"
CV2_TELEA = "cv2_telea"
class Txt2ImgSDInpaintingModel(CommonSDInpaintingModel, Txt2ImgModel, ImageModel):
mask_url: str = Field(
...,
description="""
The `cdn` / `cos` url of the user's mask.
> `cos` url from `qcloud` is preferred.
> If empty string is provided, then we will use an empty mask, which means we will simply perform an image-to-image transform.
""",
)
class Txt2ImgSDOutpaintingModel(CommonSDInpaintingModel, Txt2ImgModel, ImageModel):
pass
@IAlgorithm.auto_register()
class Txt2ImgSDInpainting(IAlgorithm):
model_class = Txt2ImgSDInpaintingModel
endpoint = txt2img_sd_inpainting_endpoint
def initialize(self) -> None:
register_sd()
register_sd_inpainting()
async def run(
self,
data: Txt2ImgSDInpaintingModel,
*args: Any,
**kwargs: Any,
) -> Response:
self.log_endpoint(data)
t0 = time.time()
image = await self.get_image_from("url", data, kwargs)
mask = await self.get_image_from("mask_url", data, kwargs)
t1 = time.time()
if data.use_raw_inpainting:
api_key = APIs.SD
else:
api_key = APIs.SD_INPAINTING
m = get_sd_from(api_key, data)
t2 = time.time()
kwargs.update(handle_diffusion_model(m, data))
kwargs.update(handle_diffusion_inpainting_model(data))
await handle_diffusion_hooks(m, data, self, kwargs)
if mask.mode == "RGBA":
mask_arr = np.array(mask)
mask_arr[..., -1] = np.where(mask_arr[..., -1] > 0, 255, 0)
mask = Image.fromarray(mask_arr)
img_arr = m.txt2img_inpainting(data.text, image, mask, **kwargs).numpy()[0]
t3 = time.time()
res = get_response(data, [to_uint8(get_normalized_arr_from_diffusion(img_arr))])
self.log_times(
{
"download": t1 - t0,
"get_model": t2 - t1,
"inference": t3 - t2,
"get_response": time.time() - t3,
}
)
return res
@IAlgorithm.auto_register()
class Txt2ImgSDOutpainting(IAlgorithm):
model_class = Txt2ImgSDOutpaintingModel
endpoint = txt2img_sd_outpainting_endpoint
def initialize(self) -> None:
register_sd_inpainting()
async def run(
self,
data: Txt2ImgSDOutpaintingModel,
*args: Any,
**kwargs: Any,
) -> Response:
self.log_endpoint(data)
t0 = time.time()
image = await self.get_image_from("url", data, kwargs)
t1 = time.time()
m = get_sd_from(APIs.SD_INPAINTING, data)
m.disable_control()
t2 = time.time()
kwargs.update(handle_diffusion_model(m, data))
kwargs.update(handle_diffusion_inpainting_model(data))
await handle_diffusion_hooks(m, data, self, kwargs)
img_arr = m.outpainting(data.text, image, **kwargs).numpy()[0]
t3 = time.time()
res = get_response(data, [to_uint8(get_normalized_arr_from_diffusion(img_arr))])
self.log_times(
{
"download": t1 - t0,
"get_model": t2 - t1,
"inference": t3 - t2,
"get_response": time.time() - t3,
}
)
return res
__all__ = [
"txt2img_sd_endpoint",
"txt2img_sd_inpainting_endpoint",
"txt2img_sd_outpainting_endpoint",
"Txt2ImgSDModel",
"Txt2ImgSDInpaintingModel",
"Txt2ImgSDOutpaintingModel",
"Txt2ImgSD",
"Txt2ImgSDInpainting",
"Txt2ImgSDOutpainting",
]