Skip to content

Commit efc835c

Browse files
committed
Merge branch 'lazebnyi/fix-sheed-id-encode-for-google-sheets' of github.com:airbytehq/airbyte into lazebnyi/fix-sheed-id-encode-for-google-sheets
2 parents 1b9c98d + 8675595 commit efc835c

File tree

64 files changed

+3070
-1556
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+3070
-1556
lines changed

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt

+29-14
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.time.ZoneOffset
2121
/**
2222
* [FeedBootstrap] is the input to a [PartitionsCreatorFactory].
2323
*
24-
* This object conveniently packages the [StateQuerier] singleton with the [feed] for which the
24+
* This object conveniently packages the [StateManager] singleton with the [feed] for which the
2525
* [PartitionsCreatorFactory] is to operate on, eventually causing the emission of Airbyte RECORD
2626
* messages for the [Stream]s in the [feed]. For this purpose, [FeedBootstrap] provides
2727
* [StreamRecordConsumer] instances which essentially provide a layer of caching over
@@ -34,15 +34,30 @@ sealed class FeedBootstrap<T : Feed>(
3434
* The [MetaFieldDecorator] instance which [StreamRecordConsumer] will use to decorate records.
3535
*/
3636
val metaFieldDecorator: MetaFieldDecorator,
37-
/** [StateQuerier] singleton for use by [PartitionsCreatorFactory]. */
38-
val stateQuerier: StateQuerier,
37+
/** [StateManager] singleton which is encapsulated by this [FeedBootstrap]. */
38+
private val stateManager: StateManager,
3939
/** [Feed] to emit records for. */
4040
val feed: T
4141
) {
4242

43-
/** Convenience getter for the current state value for the [feed]. */
43+
/** Delegates to [StateManager.feeds]. */
44+
val feeds: List<Feed>
45+
get() = stateManager.feeds
46+
47+
/** Deletages to [StateManager] to return the current state value for any [Feed]. */
48+
fun currentState(feed: Feed): OpaqueStateValue? = stateManager.scoped(feed).current()
49+
50+
/** Convenience getter for the current state value for this [feed]. */
4451
val currentState: OpaqueStateValue?
45-
get() = stateQuerier.current(feed)
52+
get() = currentState(feed)
53+
54+
/** Resets the state value of this feed and the streams in it to zero. */
55+
fun resetAll() {
56+
stateManager.scoped(feed).reset()
57+
for (stream in feed.streams) {
58+
stateManager.scoped(stream).reset()
59+
}
60+
}
4661

4762
/** A map of all [StreamRecordConsumer] for this [feed]. */
4863
fun streamRecordConsumers(): Map<StreamIdentifier, StreamRecordConsumer> =
@@ -98,7 +113,7 @@ sealed class FeedBootstrap<T : Feed>(
98113
}
99114

100115
private val precedingGlobalFeed: Global? =
101-
stateQuerier.feeds
116+
stateManager.feeds
102117
.filterIsInstance<Global>()
103118
.filter { it.streams.contains(stream) }
104119
.firstOrNull()
@@ -109,7 +124,7 @@ sealed class FeedBootstrap<T : Feed>(
109124
if (feed is Stream && precedingGlobalFeed != null) {
110125
metaFieldDecorator.decorateRecordData(
111126
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
112-
globalStateValue = stateQuerier.current(precedingGlobalFeed),
127+
globalStateValue = stateManager.scoped(precedingGlobalFeed).current(),
113128
stream,
114129
recordData,
115130
)
@@ -192,14 +207,14 @@ sealed class FeedBootstrap<T : Feed>(
192207
fun create(
193208
outputConsumer: OutputConsumer,
194209
metaFieldDecorator: MetaFieldDecorator,
195-
stateQuerier: StateQuerier,
210+
stateManager: StateManager,
196211
feed: Feed,
197212
): FeedBootstrap<*> =
198213
when (feed) {
199214
is Global ->
200-
GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed)
215+
GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed)
201216
is Stream ->
202-
StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed)
217+
StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed)
203218
}
204219
}
205220
}
@@ -241,17 +256,17 @@ enum class FieldValueChange {
241256
class GlobalFeedBootstrap(
242257
outputConsumer: OutputConsumer,
243258
metaFieldDecorator: MetaFieldDecorator,
244-
stateQuerier: StateQuerier,
259+
stateManager: StateManager,
245260
global: Global,
246-
) : FeedBootstrap<Global>(outputConsumer, metaFieldDecorator, stateQuerier, global)
261+
) : FeedBootstrap<Global>(outputConsumer, metaFieldDecorator, stateManager, global)
247262

248263
/** [FeedBootstrap] implementation for [Stream] feeds. */
249264
class StreamFeedBootstrap(
250265
outputConsumer: OutputConsumer,
251266
metaFieldDecorator: MetaFieldDecorator,
252-
stateQuerier: StateQuerier,
267+
stateManager: StateManager,
253268
stream: Stream,
254-
) : FeedBootstrap<Stream>(outputConsumer, metaFieldDecorator, stateQuerier, stream) {
269+
) : FeedBootstrap<Stream>(outputConsumer, metaFieldDecorator, stateManager, stream) {
255270

256271
/** A [StreamRecordConsumer] instance for this [Stream]. */
257272
fun streamRecordConsumer(): StreamRecordConsumer = streamRecordConsumers()[feed.id]!!

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Partitions.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import io.airbyte.cdk.read.PartitionsCreator.TryAcquireResourcesStatus
1212
interface PartitionsCreatorFactory {
1313
/**
1414
* Returns a [PartitionsCreator] which will cause the READ to advance for the [Feed] for which
15-
* the [FeedBootstrap] argument is associated to. The latter exposes a [StateQuerier] to obtain
16-
* the current [OpaqueStateValue] for this [feed] but may also be used to peek at the state of
17-
* other [Feed]s. This may be useful for synchronizing the READ for this [feed] by waiting for
18-
* other [Feed]s to reach a desired state before proceeding; the waiting may be triggered by
15+
* the [FeedBootstrap] argument is associated to. The latter exposes methods to obtain the
16+
* current [OpaqueStateValue] for this [feed] but also to peek at the state of other [Feed]s.
17+
* This may be useful for synchronizing the READ for this [feed] by waiting for other [Feed]s to
18+
* reach a desired state before proceeding; the waiting may be triggered by
1919
* [PartitionsCreator.tryAcquireResources] or [PartitionReader.tryAcquireResources].
2020
*
2121
* Returns null when the factory is unable to generate a [PartitionsCreator]. This causes

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt

+15-19
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,12 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
1010
import io.airbyte.protocol.models.v0.AirbyteStateStats
1111
import io.airbyte.protocol.models.v0.AirbyteStreamState
1212

13-
/** A [StateQuerier] is like a read-only [StateManager]. */
14-
interface StateQuerier {
15-
/** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */
16-
val feeds: List<Feed>
17-
18-
/** Returns the current state value for the given [feed]. */
19-
fun current(feed: Feed): OpaqueStateValue?
20-
21-
/** Rolls back each feed state. This is required when resyncing CDC from scratch */
22-
fun resetFeedStates()
23-
}
24-
2513
/** Singleton object which tracks the state of an ongoing READ operation. */
2614
class StateManager(
2715
global: Global? = null,
2816
initialGlobalState: OpaqueStateValue? = null,
2917
initialStreamStates: Map<Stream, OpaqueStateValue?> = mapOf(),
30-
) : StateQuerier {
18+
) {
3119
private val global: GlobalStateManager?
3220
private val nonGlobal: Map<StreamIdentifier, NonGlobalStreamStateManager>
3321

@@ -52,16 +40,14 @@ class StateManager(
5240
}
5341
}
5442

55-
override val feeds: List<Feed> =
43+
/** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */
44+
val feeds: List<Feed> =
5645
listOfNotNull(this.global?.feed) +
5746
(this.global?.streamStateManagers?.values?.map { it.feed } ?: listOf()) +
5847
nonGlobal.values.map { it.feed }
5948

60-
override fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current()
61-
62-
override fun resetFeedStates() {
63-
feeds.forEach { f -> scoped(f).set(Jsons.objectNode(), 0) }
64-
}
49+
/** Returns the current state value for the given [feed]. */
50+
fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current()
6551

6652
/** Returns a [StateManagerScopedToFeed] instance scoped to this [feed]. */
6753
fun scoped(feed: Feed): StateManagerScopedToFeed =
@@ -86,6 +72,9 @@ class StateManager(
8672
state: OpaqueStateValue,
8773
numRecords: Long,
8874
)
75+
76+
/** Resets the current state value in the [StateManager] for this [feed] to zero. */
77+
fun reset()
8978
}
9079

9180
/**
@@ -119,6 +108,13 @@ class StateManager(
119108
pendingNumRecords += numRecords
120109
}
121110

111+
@Synchronized
112+
override fun reset() {
113+
currentStateValue = null
114+
pendingStateValue = null
115+
pendingNumRecords = 0L
116+
}
117+
122118
/**
123119
* Called by [StateManager.checkpoint] to generate the Airbyte STATE messages for the
124120
* checkpoint.

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt

+38-30
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,13 @@ class FeedBootstrapTest {
4848

4949
val global = Global(listOf(stream))
5050

51-
fun stateQuerier(
51+
fun stateManager(
5252
globalStateValue: OpaqueStateValue? = null,
5353
streamStateValue: OpaqueStateValue? = null
54-
): StateQuerier =
55-
object : StateQuerier {
56-
override val feeds: List<Feed> = listOf(global, stream)
54+
): StateManager = StateManager(global, globalStateValue, mapOf(stream to streamStateValue))
5755

58-
override fun current(feed: Feed): OpaqueStateValue? =
59-
when (feed) {
60-
is Global -> globalStateValue
61-
is Stream -> streamStateValue
62-
}
63-
64-
override fun resetFeedStates() {
65-
// no-op
66-
}
67-
}
68-
69-
fun Feed.bootstrap(stateQuerier: StateQuerier): FeedBootstrap<*> =
70-
FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateQuerier, this)
56+
fun Feed.bootstrap(stateManager: StateManager): FeedBootstrap<*> =
57+
FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateManager, this)
7158

7259
fun expected(vararg data: String): List<String> {
7360
val ts = outputConsumer.recordEmittedAt.toEpochMilli()
@@ -76,7 +63,7 @@ class FeedBootstrapTest {
7663

7764
@Test
7865
fun testGlobalColdStart() {
79-
val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateQuerier())
66+
val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateManager())
8067
Assertions.assertNull(globalBootstrap.currentState)
8168
Assertions.assertEquals(1, globalBootstrap.streamRecordConsumers().size)
8269
val (actualStreamID, consumer) = globalBootstrap.streamRecordConsumers().toList().first()
@@ -91,7 +78,7 @@ class FeedBootstrapTest {
9178
@Test
9279
fun testGlobalWarmStart() {
9380
val globalBootstrap: FeedBootstrap<*> =
94-
global.bootstrap(stateQuerier(globalStateValue = Jsons.objectNode()))
81+
global.bootstrap(stateManager(globalStateValue = Jsons.objectNode()))
9582
Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState)
9683
Assertions.assertEquals(1, globalBootstrap.streamRecordConsumers().size)
9784
val (actualStreamID, consumer) = globalBootstrap.streamRecordConsumers().toList().first()
@@ -103,10 +90,36 @@ class FeedBootstrapTest {
10390
)
10491
}
10592

93+
@Test
94+
fun testGlobalReset() {
95+
val stateManager: StateManager =
96+
stateManager(
97+
streamStateValue = Jsons.objectNode(),
98+
globalStateValue = Jsons.objectNode()
99+
)
100+
val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateManager)
101+
Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState)
102+
Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState(stream))
103+
// Reset.
104+
globalBootstrap.resetAll()
105+
Assertions.assertNull(globalBootstrap.currentState)
106+
Assertions.assertNull(globalBootstrap.currentState(stream))
107+
// Set new global state and checkpoint
108+
stateManager.scoped(global).set(Jsons.arrayNode(), 0L)
109+
stateManager.checkpoint().forEach { outputConsumer.accept(it) }
110+
// Check that everything is as it should be.
111+
Assertions.assertEquals(Jsons.arrayNode(), globalBootstrap.currentState)
112+
Assertions.assertNull(globalBootstrap.currentState(stream))
113+
Assertions.assertEquals(
114+
listOf(RESET_STATE),
115+
outputConsumer.states().map(Jsons::writeValueAsString)
116+
)
117+
}
118+
106119
@Test
107120
fun testStreamColdStart() {
108121
val streamBootstrap: FeedBootstrap<*> =
109-
stream.bootstrap(stateQuerier(globalStateValue = Jsons.objectNode()))
122+
stream.bootstrap(stateManager(globalStateValue = Jsons.objectNode()))
110123
Assertions.assertNull(streamBootstrap.currentState)
111124
Assertions.assertEquals(1, streamBootstrap.streamRecordConsumers().size)
112125
val (actualStreamID, consumer) = streamBootstrap.streamRecordConsumers().toList().first()
@@ -122,7 +135,7 @@ class FeedBootstrapTest {
122135
fun testStreamWarmStart() {
123136
val streamBootstrap: FeedBootstrap<*> =
124137
stream.bootstrap(
125-
stateQuerier(
138+
stateManager(
126139
globalStateValue = Jsons.objectNode(),
127140
streamStateValue = Jsons.arrayNode(),
128141
)
@@ -140,15 +153,8 @@ class FeedBootstrapTest {
140153

141154
@Test
142155
fun testChanges() {
143-
val stateQuerier =
144-
object : StateQuerier {
145-
override val feeds: List<Feed> = listOf(stream)
146-
override fun current(feed: Feed): OpaqueStateValue? = null
147-
override fun resetFeedStates() {
148-
// no-op
149-
}
150-
}
151-
val streamBootstrap = stream.bootstrap(stateQuerier) as StreamFeedBootstrap
156+
val stateManager = StateManager(initialStreamStates = mapOf(stream to null))
157+
val streamBootstrap = stream.bootstrap(stateManager) as StreamFeedBootstrap
152158
val consumer: StreamRecordConsumer = streamBootstrap.streamRecordConsumer()
153159
val changes =
154160
mapOf(
@@ -184,5 +190,7 @@ class FeedBootstrapTest {
184190
const val STREAM_RECORD_INPUT_DATA = """{"k":2,"v":"bar"}"""
185191
const val STREAM_RECORD_OUTPUT_DATA =
186192
"""{"k":2,"v":"bar","_ab_cdc_lsn":{},"_ab_cdc_updated_at":"2069-04-20T00:00:00.000000Z","_ab_cdc_deleted_at":null}"""
193+
const val RESET_STATE =
194+
"""{"type":"GLOBAL","global":{"shared_state":[],"stream_states":[{"stream_descriptor":{"name":"tbl","namespace":"ns"},"stream_state":{}}]},"sourceStats":{"recordCount":0.0}}"""
187195
}
188196
}

airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ abstract class IntegrationTest(
310310
* You probably don't want to actually interact with this. This is generally intended to
311311
* support a specific legacy behavior. Prefer using micronaut properties when possible.
312312
*/
313-
@SystemStub private lateinit var nonDockerMockEnvVars: EnvironmentVariables
313+
@SystemStub internal lateinit var nonDockerMockEnvVars: EnvironmentVariables
314314

315315
@JvmStatic
316316
@BeforeAll

0 commit comments

Comments
 (0)