This repository was archived by the owner on Jun 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathSseRoutesTest.kt
305 lines (286 loc) · 11.2 KB
/
SseRoutesTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package routes
import doist.ffs.auth.Permission
import doist.ffs.db.Flag
import doist.ffs.db.TokenGenerator
import doist.ffs.db.capturingLastInsertId
import doist.ffs.db.flags
import doist.ffs.db.organizations
import doist.ffs.db.projects
import doist.ffs.db.tokens
import doist.ffs.env.ENV_INTERNAL_ROLLOUT_ID
import doist.ffs.installAuthentication
import doist.ffs.installPlugins
import doist.ffs.installRoutes
import doist.ffs.plugins.Database
import doist.ffs.plugins.database
import doist.ffs.routes.PATH_EVAL
import doist.ffs.routes.PATH_FLAGS
import doist.ffs.serialization.json
import doist.ffs.sse.SSE_FIELD_PREFIX_DATA
import doist.ffs.sse.SSE_FIELD_PREFIX_ID
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.encodeURLPath
import io.ktor.http.isSuccess
import io.ktor.server.application.install
import io.ktor.server.testing.TestApplicationCall
import io.ktor.server.testing.TestApplicationEngine
import io.ktor.server.testing.TestApplicationRequest
import io.ktor.server.testing.withTestApplication
import io.ktor.util.pipeline.execute
import io.ktor.utils.io.ByteChannel
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import org.junit.jupiter.api.Test
/**
* Tests for server-sent events (SSE) based endpoints.
*
* It uses the deprecated `withTestApplication` API (and direct database access) to circumvent a
* deadlock when using the new approach, occurring in multiple variations of the following skeleton:
*
* ```
* @Test
* fun testStreamGet() = testApplication {
* val client = createTokenClient(Permission.READ)
* val projectId = client.projectId
* var eventCount = 0
*
* client.userClient.client.post("${PATH_PROJECT(projectId)}$PATH_FLAGS") {
* setBodyForm("name" to "test", "rule" to "true")
* }
*
* client.client.launch {
* client.client.stream(PATH_FLAGS) {
* eventCount++
* val flags = json.decodeFromString<List<Flag>>(it.data)
* assert(flags.size == 1)
* assert(flags[0].name == "test")
* assert(flags[0].rule == "true")
* }
* }
*
* client.userClient.client.post("${PATH_PROJECT(projectId)}$PATH_FLAGS") {
* setBodyForm("name" to "test", "rule" to "true")
* }
*
* assert(eventCount == 1)
* }
* ```
*
* Circumventing this would probably require that the server and client NOT share the same
* dispatcher (Dispatchers.IO), but setting a different one for either is not currently possible.
*/
@Suppress("LongMethod")
class SseRoutesTest {
@Test
fun flagStream(): Unit = routes.withTestApplication {
val organizationId = application.database.capturingLastInsertId {
organizations.insert(name = "test-organization")
}
val projectId = application.database.capturingLastInsertId {
projects.insert(organization_id = organizationId, name = "test-project")
}
val token = TokenGenerator.generate(Permission.READ)
application.database.tokens.insert(
token = token,
project_id = projectId,
description = "test-read-token"
)
handleSse(
uri = PATH_FLAGS,
setup = {
addHeader(HttpHeaders.Authorization, "Bearer $token")
}
) { channel ->
val readLine = suspend { channel.readUTF8Line()!! }
// Channel starts out empty as there is no data.
assert(channel.availableForRead == 0)
// Create a flag and check it is sent.
var flag = application.database.run {
flags.run {
val id = capturingLastInsertId {
insert(project_id = projectId, name = "true", rule = "true")
}
select(id).executeAsOne()
}
}
readLine().startsWith(SSE_FIELD_PREFIX_ID)
readLine().let { line ->
line.startsWith(SSE_FIELD_PREFIX_DATA)
val flags = json.decodeFromString<List<Flag>>(
line.substring(SSE_FIELD_PREFIX_DATA.length)
)
assert(flags.size == 1)
assert(flags[0].name == "true")
assert(flags[0].rule == "true")
assert(flags[0].archived_at == null)
}
readLine().isEmpty()
// Update the flag and check it is sent again.
flag = application.database.flags.run {
update(id = flag.id, name = "false", rule = "false")
select(flag.id).executeAsOne()
}
readLine().startsWith(SSE_FIELD_PREFIX_ID)
readLine().let { line ->
line.startsWith(SSE_FIELD_PREFIX_DATA)
val flags = json.decodeFromString<List<Flag>>(
line.substring(SSE_FIELD_PREFIX_DATA.length)
)
assert(flags.size == 1)
assert(flags[0].name == "false")
assert(flags[0].rule == "false")
assert(flags[0].archived_at == null)
}
readLine().isEmpty()
// Archive the flag and check it is sent again.
flag = application.database.flags.run {
archive(id = flag.id)
select(flag.id).executeAsOne()
}
readLine().startsWith(SSE_FIELD_PREFIX_ID)
readLine().let { line ->
line.startsWith(SSE_FIELD_PREFIX_DATA)
val flags = json.decodeFromString<List<Flag>>(
line.substring(SSE_FIELD_PREFIX_DATA.length)
)
assert(flags.size == 1)
assert(flags[0].name == "false")
assert(flags[0].rule == "false")
assert(flags[0].archived_at != null)
}
readLine().isEmpty()
// Without further changes, channel is empty.
assert(channel.availableForRead == 0)
}
}
@Test
fun flagEvalStream(): Unit = routes.withTestApplication {
val organizationId = application.database.capturingLastInsertId {
organizations.insert(name = "test-organization")
}
val projectId = application.database.capturingLastInsertId {
projects.insert(organization_id = organizationId, name = "test-project")
}
val token = TokenGenerator.generate(Permission.EVAL)
application.database.tokens.insert(
token = token,
project_id = projectId,
description = "test-eval-token"
)
val env = buildJsonObject {
put(ENV_INTERNAL_ROLLOUT_ID, "123456789abcdef")
put("number", 3)
}
handleSse(
uri = "$PATH_FLAGS$PATH_EVAL?env=${json.encodeToString(env).encodeURLPath()}",
setup = {
addHeader(HttpHeaders.Authorization, "Bearer $token")
}
) { channel ->
val readLine = suspend { channel.readUTF8Line()!! }
// Channel starts out empty as there is no data.
assert(channel.availableForRead == 0)
// Create a flag and check its eval is sent.
val flag = application.database.run {
flags.run {
val id = capturingLastInsertId {
insert(
project_id = projectId,
name = "true",
rule = "gt(env[\"number\"], 2)"
)
}
select(id).executeAsOne()
}
}
readLine().startsWith(SSE_FIELD_PREFIX_ID)
readLine().let { line ->
line.startsWith(SSE_FIELD_PREFIX_DATA)
val flagEvals = json.decodeFromString<Map<String, Boolean>>(
line.substring(SSE_FIELD_PREFIX_DATA.length)
)
assert(flagEvals == mapOf("true" to true))
}
readLine().isEmpty()
// Update the flag rule and check its eval is sent again.
application.database.flags.run {
update(id = flag.id, name = "false", rule = "lt(env[\"number\"], 2)")
select(flag.id).executeAsOne()
}
readLine().startsWith(SSE_FIELD_PREFIX_ID)
readLine().let { line ->
line.startsWith(SSE_FIELD_PREFIX_DATA)
val flagEvals = json.decodeFromString<Map<String, Boolean>>(
line.substring(SSE_FIELD_PREFIX_DATA.length)
)
assert(flagEvals == mapOf("false" to false))
}
readLine().isEmpty()
// Update the flag name and ensure the channel remains empty.
application.database.flags.run {
update(id = flag.id, name = "maybe", rule = "0.5")
select(flag.id).executeAsOne()
}
assert(channel.availableForRead == 0)
}
}
}
/**
* Make a test request that sets up an SSE session and invokes a [callback] function to
* receive events from the server.
*/
@Suppress("EXPERIMENTAL_API_USAGE_FUTURE_ERROR")
private fun TestApplicationEngine.handleSse(
uri: String,
setup: TestApplicationRequest.() -> Unit = {},
callback: suspend TestApplicationCall.(incoming: ByteReadChannel) -> Unit
): TestApplicationCall {
val call = createCall(closeRequest = false) {
this.uri = uri
addHeader(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
setup()
bodyChannel = ByteChannel(true)
}
launch(call.coroutineContext) {
// Execute server side.
pipeline.execute(call)
}
runBlocking(call.coroutineContext) {
// responseChannelDeferred is internal, so we wait like this.
// Ref: https://github.com/ktorio/ktor/blob/c5877a22c91fd693ea6dcd0b4e1924f05d3b6825/ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/TestApplicationEngine.kt#L225-L230
var responseChannel: ByteReadChannel?
do {
// Ensure status is absent or valid.
val status = call.response.status()
if (status?.isSuccess() == false) {
throw IllegalStateException(status.toString())
}
// Suspend, then try to grab response channel.
yield()
// websocketChannel is just responseChannel internally.
responseChannel = call.response.websocketChannel()
} while (responseChannel == null)
// Execute client side.
call.callback(responseChannel)
}
return call
}
private fun withTestApplication(test: TestApplicationEngine.() -> Unit) = withTestApplication {
application.apply {
// Hikari configuration from application.conf won't be available in tests,
// so we need to replicate Application.module() without relying on it. Huh.
install(Database)
installAuthentication()
installPlugins()
installRoutes()
}
test()
}