Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit the state to remove in the airbyte empty source #13725

Merged
merged 54 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3d496d5
Emit the state to remove in the airbyte empty source
benmoriceau Jun 13, 2022
0e275d0
Handle legacy use case
benmoriceau Jun 13, 2022
78df38b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 13, 2022
f003ad3
Update names
benmoriceau Jun 13, 2022
f24472c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
d7a63b1
Add test for legacy
benmoriceau Jun 14, 2022
cbcdf92
tmp
benmoriceau Jun 14, 2022
6afbdbe
Common code to deserialize a state message in the new format
benmoriceau Jun 14, 2022
4331d10
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
09661f4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
f1269bf
tmp
benmoriceau Jun 14, 2022
ee7fd54
PR comments and type changed to typed
benmoriceau Jun 14, 2022
156e3c4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
23f943c
add new message support
benmoriceau Jun 15, 2022
780fe54
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 15, 2022
c255acd
Finish the tests
benmoriceau Jun 15, 2022
545e6ad
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 15, 2022
b2e3250
Format
benmoriceau Jun 15, 2022
8785ad7
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 15, 2022
93c138c
tmp
benmoriceau Jun 16, 2022
7707a97
Add StateType and StateWrapper objects to the model
gosusnp Jun 16, 2022
e3c881b
Merge branch 'gosusnp/add_state_wrapper' of github.com:airbytehq/airb…
benmoriceau Jun 16, 2022
27adc8b
Use state wrapper instead of Either
benmoriceau Jun 16, 2022
c3d9110
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
4ee75aa
Switch to optional
benmoriceau Jun 16, 2022
47d0ac4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
c213f71
Merge branch 'master' into bmoric/extract-message-serialization
benmoriceau Jun 16, 2022
7387b4a
Some PR comments
benmoriceau Jun 16, 2022
ad3371d
PR comments
benmoriceau Jun 16, 2022
11167e6
Add a todo
benmoriceau Jun 16, 2022
0f4be01
Add test for legacy with config
benmoriceau Jun 17, 2022
280b114
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
9cb57f6
PR comments
benmoriceau Jun 17, 2022
fdbc10e
PR comments
benmoriceau Jun 17, 2022
0ea7eba
Support array legacy state
benmoriceau Jun 17, 2022
0ae05a2
format
benmoriceau Jun 17, 2022
30e3475
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 17, 2022
03b0eeb
Remove additional properties check
benmoriceau Jun 17, 2022
59eba26
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
2749453
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 17, 2022
859d81e
Small rename
benmoriceau Jun 17, 2022
60917b9
Rename variables
benmoriceau Jun 21, 2022
c14a9b7
PR comments
benmoriceau Jun 21, 2022
8c02f18
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 21, 2022
64d5389
extract ResetSourceConfig creation
benmoriceau Jun 21, 2022
7006dfa
Extract reset config creation
benmoriceau Jun 21, 2022
ce3fd6e
Fix test
benmoriceau Jun 21, 2022
d06e70e
rm error commit
benmoriceau Jun 21, 2022
c125b93
Add comments
benmoriceau Jun 22, 2022
fb242b8
format
benmoriceau Jun 23, 2022
5847625
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
44c306c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
9eb52f8
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
5cae07c
Use new state type
benmoriceau Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: ResetSourceConfiguration
description: configuration of the reset source
type: object
additionalProperties: true
additionalProperties: false
required:
- streamsToReset
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,95 @@
package io.airbyte.workers.internal;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/**
* This source will never emit any messages. It can be used in cases where that is helpful (hint:
* reset connection jobs).
*/
@Slf4j
public class EmptyAirbyteSource implements AirbyteSource {

private final AtomicBoolean hasEmittedState;
private final Queue<StreamDescriptor> streamsToReset = new LinkedList<>();
// TODO: Once we are sure that the legacy way of transmitting the state is not use anymore, we need
// to remove this variable and the associated
// checks
Comment on lines +40 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something weird is happening with your formatting that lines are getting chopped weirdly. this comment should be able to fit on 2 lines. same for the one on line 55.

private boolean isResetBasedForConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would still like to see a comment explaining what this means, not just the todo about the condition under which it will be removed. while it is in the codebase a developer needs to be able to understand what it is and right now it is not clear.

suggestion

  /**
   * In the legacy version the config was empty. The new one has a populated config. This variable
   * encapsulates whether we are in the legacy version or not.
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to figure out what 'isResetBasedForConfig' means. It seems like it has to do with a state message being in a legacy state (though I'm not positive)? Is there a way to rename this to be clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for the migration. There might be a time where the config is not provided or empty. In such case we should reset all the streams. This will be removed on the project is deliver. It is protecting against a potential revert one of the commit that changes the list of stream.

private boolean isStarted = false;
private Optional<StateWrapper> stateWrapper;

public EmptyAirbyteSource() {
hasEmittedState = new AtomicBoolean();
}

@Override
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception {
// no op.
public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception {

if (workerSourceConfig == null || workerSourceConfig.getSourceConnectionConfiguration() == null) {
// TODO: When the jobConfig is fully updated and tested, we can remove this extra check that makes
// us compatible with running a reset with
// a null config
/*
* This is a protection against reverting a commit that set the resetSourceConfiguration, it makes
* that there is not side effect of such a revert. The legacy behavior is to have the config as an
* empty jsonObject, this is an extra protection if the workerConfiguration is null. In the previous
* implementation it was unused so passing it as null should not result in a NPE or a parsing
* failure.
*/
isResetBasedForConfig = false;
} else {
final ResetSourceConfiguration resetSourceConfiguration;
resetSourceConfiguration = parseResetSourceConfigurationAndLogError(workerSourceConfig);
streamsToReset.addAll(resetSourceConfiguration.getStreamsToReset());

if (streamsToReset.isEmpty()) {
// TODO: This is done to be able to handle the transition period where we can have no stream being
// pass to the configuration because the
// logic of populating this list is not implemented
/*
* This is a protection against reverting a commit that set the resetSourceConfiguration, it makes
* that there is not side effect of such a revert. The legacy behavior is to have the config as an
* empty object, it has been changed here:
* https://github.com/airbytehq/airbyte/pull/13696/files#diff-
* f51ff997b60a346c704608bb1cd7d22457eda2559b42987d5fa1281d568fc222L40
*/
isResetBasedForConfig = false;
} else {
stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState());

if (stateWrapper.isPresent() &&
stateWrapper.get().getStateType() == StateType.LEGACY &&
!isResetAllStreamsInCatalog(workerSourceConfig)) {
log.error("The state a legacy one but we are trying to do a partial update, this is not supported.");
throw new IllegalStateException("Try to perform a partial reset on a legacy state");
}

isResetBasedForConfig = true;
}
}
isStarted = true;
}

// always finished. it has no data to send.
Expand All @@ -43,11 +109,20 @@ public int getExitValue() {

@Override
public Optional<AirbyteMessage> attemptRead() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you! this is so much easier to read!

if (!hasEmittedState.get()) {
hasEmittedState.compareAndSet(false, true);
return Optional.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withData(Jsons.emptyObject())));
if (!isStarted) {
throw new IllegalStateException("The empty source has not been started.");
}

if (isResetBasedForConfig) {
if (stateWrapper.get().getStateType() == StateType.STREAM) {
return emitStreamState();
} else if (stateWrapper.get().getStateType() == StateType.GLOBAL) {
return emitGlobalState();
} else {
return emitLegacyState();
}
} else {
return Optional.empty();
return emitLegacyState();
}
}

Expand All @@ -61,4 +136,114 @@ public void cancel() throws Exception {
// no op.
}

private Optional<AirbyteMessage> emitStreamState() {
// Per stream, it will emit one message per stream being reset
if (!streamsToReset.isEmpty()) {
final StreamDescriptor streamDescriptor = streamsToReset.poll();
return Optional.of(getNullStreamStateMessage(streamDescriptor));
} else {
return Optional.empty();
}
}

private Optional<AirbyteMessage> emitGlobalState() {
if (hasEmittedState.get()) {
return Optional.empty();
} else {
hasEmittedState.compareAndSet(false, true);
return Optional.of(getNullGlobalMessage(streamsToReset, stateWrapper.get().getGlobal()));
}
}

private Optional<AirbyteMessage> emitLegacyState() {
if (hasEmittedState.get()) {
return Optional.empty();
} else {
hasEmittedState.compareAndSet(false, true);
return Optional.of(new AirbyteMessage().withType(Type.STATE)
.withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.emptyObject())));
}
}

private boolean isResetAllStreamsInCatalog(final WorkerSourceConfig sourceConfig) {
final Set<StreamDescriptor> catalogStreamDescriptors = sourceConfig.getCatalog().getStreams().stream().map(
configuredAirbyteStream -> new StreamDescriptor()
.withName(configuredAirbyteStream.getStream().getName())
.withNamespace(configuredAirbyteStream.getStream().getNamespace()))
.collect(Collectors.toSet());
final Set<StreamDescriptor> configStreamDescriptors = new HashSet<>(streamsToReset);

return catalogStreamDescriptors.equals(configStreamDescriptors);
}

private AirbyteMessage getNullStreamStateMessage(final StreamDescriptor streamsToReset) {
return new AirbyteMessage()
.withType(Type.STATE)
.withState(
new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState()
.withStreamDescriptor(new io.airbyte.protocol.models.StreamDescriptor()
.withName(streamsToReset.getName())
.withNamespace(streamsToReset.getNamespace()))
.withStreamState(null)));
}

private AirbyteMessage getNullGlobalMessage(final Queue<StreamDescriptor> streamsToReset, final AirbyteStateMessage currentState) {
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setStreamStates(new ArrayList<>());

currentState.getGlobal().getStreamStates().forEach(existingState -> globalState.getStreamStates()
.add(
new AirbyteStreamState()
.withStreamDescriptor(existingState.getStreamDescriptor())
.withStreamState(
streamsToReset.contains(new StreamDescriptor()
.withName(existingState.getStreamDescriptor().getName())
.withNamespace(existingState.getStreamDescriptor().getNamespace())) ? null : existingState.getStreamState())));

// If all the streams in the current state have been reset, we consider this to be a full reset, so
// reset the shared state as well
if (currentState.getGlobal().getStreamStates().size() == globalState.getStreamStates().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we always expected all the streams for a Global state. In this case, wouldn't this be always true regardless of whether we are performing a global reset?
Comparing the size of streamsToReset to the catalog feels more robust in this case.

.filter(streamState -> streamState.getStreamState() == null).count()) {
log.info("All the streams of a global state have been reset, the shared state will be erased as well");
globalState.setSharedState(null);
} else {
log.info("This is a partial reset, the shared state will be preserved");
globalState.setSharedState(currentState.getGlobal().getSharedState());
}

// Add state being reset that are not in the current state. This is made to follow the contract of
// the global state always containing the entire
// state
streamsToReset.forEach(configStreamDescriptor -> {
final io.airbyte.protocol.models.StreamDescriptor streamDescriptor = new io.airbyte.protocol.models.StreamDescriptor()
.withName(configStreamDescriptor.getName())
.withNamespace(configStreamDescriptor.getNamespace());
if (!currentState.getGlobal().getStreamStates().stream().map(streamState -> streamState.getStreamDescriptor()).toList()
.contains(streamDescriptor)) {
globalState.getStreamStates().add(new AirbyteStreamState()
.withStreamDescriptor(streamDescriptor)
.withStreamState(null));
}
});

return new AirbyteMessage()
.withType(Type.STATE)
.withState(
new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState));
}

private ResetSourceConfiguration parseResetSourceConfigurationAndLogError(final WorkerSourceConfig workerSourceConfig) {
try {
return Jsons.object(workerSourceConfig.getSourceConnectionConfiguration(), ResetSourceConfiguration.class);
} catch (final IllegalArgumentException e) {
log.error("The configuration provided to the reset has an invalid format");
throw e;
}
}

}
Loading