-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit the state to remove in the airbyte empty source #13725
Changes from 47 commits
3d496d5
0e275d0
78df38b
f003ad3
f24472c
d7a63b1
cbcdf92
6afbdbe
4331d10
09661f4
f1269bf
ee7fd54
156e3c4
23f943c
780fe54
c255acd
545e6ad
b2e3250
8785ad7
93c138c
7707a97
e3c881b
27adc8b
c3d9110
4ee75aa
47d0ac4
c213f71
7387b4a
ad3371d
11167e6
0f4be01
280b114
9cb57f6
fdbc10e
0ea7eba
0ae05a2
30e3475
03b0eeb
59eba26
2749453
859d81e
60917b9
c14a9b7
8c02f18
64d5389
7006dfa
ce3fd6e
d06e70e
c125b93
fb242b8
5847625
44c306c
9eb52f8
5cae07c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,29 +5,81 @@ | |
package io.airbyte.workers.internal; | ||
|
||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.ResetSourceConfiguration; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.config.StreamDescriptor; | ||
import io.airbyte.config.WorkerSourceConfig; | ||
import io.airbyte.config.helpers.StateMessageHelper; | ||
import io.airbyte.protocol.models.AirbyteGlobalState; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.AirbyteMessage.Type; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import io.airbyte.protocol.models.AirbyteStreamState; | ||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.LinkedList; | ||
import java.util.Optional; | ||
import java.util.Queue; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.stream.Collectors; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* This source will never emit any messages. It can be used in cases where that is helpful (hint: | ||
* reset connection jobs). | ||
*/ | ||
@Slf4j | ||
public class EmptyAirbyteSource implements AirbyteSource { | ||
|
||
private final AtomicBoolean hasEmittedState; | ||
private final Queue<StreamDescriptor> streamsToReset = new LinkedList<>(); | ||
// TODO: Once we are sure that the legacy way of transmitting the state is not use anymore, we need | ||
// to remove this variable and the associated | ||
// checks | ||
private boolean isResetBasedForConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would still like to see a comment explaining what this means, not just the todo about the condition under which it will be removed. while it is in the codebase a developer needs to be able to understand what it is and right now it is not clear. suggestion
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying to figure out what 'isResetBasedForConfig' means. It seems like it has to do with a state message being in a legacy state (though I'm not positive)? Is there a way to rename this to be clearer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is for the migration. There might be a time where the config is not provided or empty. In such case we should reset all the streams. This will be removed on the project is deliver. It is protecting against a potential revert one of the commit that changes the list of stream. |
||
private boolean isStarted = false; | ||
private Optional<StateWrapper> stateWrapper; | ||
|
||
public EmptyAirbyteSource() { | ||
hasEmittedState = new AtomicBoolean(); | ||
} | ||
|
||
@Override | ||
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception { | ||
// no op. | ||
public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception { | ||
|
||
if (workerSourceConfig == null || workerSourceConfig.getSourceConnectionConfiguration() == null) { | ||
// TODO: When the jobConfig is fully updated and tested, we can remove this extra check that makes | ||
// us compatible with running a reset with | ||
// a null config | ||
isResetBasedForConfig = false; | ||
} else { | ||
final ResetSourceConfiguration resetSourceConfiguration; | ||
resetSourceConfiguration = parseResetSourceConfigurationAndLogError(workerSourceConfig); | ||
streamsToReset.addAll(resetSourceConfiguration.getStreamsToReset()); | ||
|
||
if (streamsToReset.isEmpty()) { | ||
// TODO: This is done to be able to handle the transition period where we can have no stream being | ||
// pass to the configuration because the | ||
// logic of populating this list is not implemented | ||
isResetBasedForConfig = false; | ||
} else { | ||
stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState()); | ||
|
||
if (stateWrapper.isPresent() && | ||
stateWrapper.get().getStateType() == StateType.LEGACY && | ||
!isResetAllStreamsInCatalog(workerSourceConfig)) { | ||
log.error("The state a legacy one but we are trying to do a partial update, this is not supported."); | ||
throw new IllegalStateException("Try to perform a partial reset on a legacy state"); | ||
gosusnp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
isResetBasedForConfig = true; | ||
} | ||
} | ||
isStarted = true; | ||
} | ||
|
||
// always finished. it has no data to send. | ||
|
@@ -43,11 +95,20 @@ public int getExitValue() { | |
|
||
@Override | ||
public Optional<AirbyteMessage> attemptRead() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you! this is so much easier to read! |
||
if (!hasEmittedState.get()) { | ||
hasEmittedState.compareAndSet(false, true); | ||
return Optional.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withData(Jsons.emptyObject()))); | ||
if (!isStarted) { | ||
throw new IllegalStateException("The empty source has not been started."); | ||
} | ||
|
||
if (isResetBasedForConfig) { | ||
if (stateWrapper.get().getStateType() == StateType.STREAM) { | ||
return emitStreamState(); | ||
} else if (stateWrapper.get().getStateType() == StateType.GLOBAL) { | ||
return emitGlobalState(); | ||
} else { | ||
return emitLegacyState(); | ||
} | ||
} else { | ||
return Optional.empty(); | ||
return emitLegacyState(); | ||
} | ||
} | ||
|
||
|
@@ -61,4 +122,114 @@ public void cancel() throws Exception { | |
// no op. | ||
} | ||
|
||
private Optional<AirbyteMessage> emitStreamState() { | ||
// Per stream, it will emit one message per stream being reset | ||
if (!streamsToReset.isEmpty()) { | ||
final StreamDescriptor streamDescriptor = streamsToReset.poll(); | ||
return Optional.of(getNullStreamStateMessage(streamDescriptor)); | ||
} else { | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
private Optional<AirbyteMessage> emitGlobalState() { | ||
if (hasEmittedState.get()) { | ||
return Optional.empty(); | ||
} else { | ||
hasEmittedState.compareAndSet(false, true); | ||
return Optional.of(getNullGlobalMessage(streamsToReset, stateWrapper.get().getGlobal())); | ||
} | ||
} | ||
|
||
private Optional<AirbyteMessage> emitLegacyState() { | ||
if (hasEmittedState.get()) { | ||
return Optional.empty(); | ||
} else { | ||
hasEmittedState.compareAndSet(false, true); | ||
return Optional.of(new AirbyteMessage().withType(Type.STATE) | ||
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.emptyObject()))); | ||
} | ||
} | ||
|
||
private boolean isResetAllStreamsInCatalog(final WorkerSourceConfig sourceConfig) { | ||
final Set<StreamDescriptor> catalogStreamDescriptors = sourceConfig.getCatalog().getStreams().stream().map( | ||
configuredAirbyteStream -> new StreamDescriptor() | ||
.withName(configuredAirbyteStream.getStream().getName()) | ||
.withNamespace(configuredAirbyteStream.getStream().getNamespace())) | ||
.collect(Collectors.toSet()); | ||
final Set<StreamDescriptor> configStreamDescriptors = new HashSet<>(streamsToReset); | ||
|
||
return catalogStreamDescriptors.equals(configStreamDescriptors); | ||
} | ||
|
||
private AirbyteMessage getNullStreamStateMessage(final StreamDescriptor streamsToReset) { | ||
return new AirbyteMessage() | ||
.withType(Type.STATE) | ||
.withState( | ||
new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState() | ||
.withStreamDescriptor(new io.airbyte.protocol.models.StreamDescriptor() | ||
.withName(streamsToReset.getName()) | ||
.withNamespace(streamsToReset.getNamespace())) | ||
.withStreamState(null))); | ||
} | ||
|
||
private AirbyteMessage getNullGlobalMessage(final Queue<StreamDescriptor> streamsToReset, final AirbyteStateMessage currentState) { | ||
final AirbyteGlobalState globalState = new AirbyteGlobalState(); | ||
globalState.setStreamStates(new ArrayList<>()); | ||
|
||
currentState.getGlobal().getStreamStates().forEach(existingState -> globalState.getStreamStates() | ||
.add( | ||
new AirbyteStreamState() | ||
.withStreamDescriptor(existingState.getStreamDescriptor()) | ||
.withStreamState( | ||
streamsToReset.contains(new StreamDescriptor() | ||
.withName(existingState.getStreamDescriptor().getName()) | ||
.withNamespace(existingState.getStreamDescriptor().getNamespace())) ? null : existingState.getStreamState()))); | ||
|
||
// If all the streams in the current state have been reset, we consider this to be a full reset, so | ||
// reset the shared state as well | ||
if (currentState.getGlobal().getStreamStates().size() == globalState.getStreamStates().stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we always expected all the streams for a Global state. In this case, wouldn't this be always true regardless of whether we are performing a global reset? |
||
.filter(streamState -> streamState.getStreamState() == null).count()) { | ||
log.info("All the streams of a global state have been reset, the shared state will be erased as well"); | ||
globalState.setSharedState(null); | ||
} else { | ||
log.info("This is a partial reset, the shared state will be preserved"); | ||
globalState.setSharedState(currentState.getGlobal().getSharedState()); | ||
} | ||
|
||
// Add state being reset that are not in the current state. This is made to follow the contract of | ||
// the global state always containing the entire | ||
// state | ||
streamsToReset.forEach(configStreamDescriptor -> { | ||
final io.airbyte.protocol.models.StreamDescriptor streamDescriptor = new io.airbyte.protocol.models.StreamDescriptor() | ||
.withName(configStreamDescriptor.getName()) | ||
.withNamespace(configStreamDescriptor.getNamespace()); | ||
if (!currentState.getGlobal().getStreamStates().stream().map(streamState -> streamState.getStreamDescriptor()).toList() | ||
.contains(streamDescriptor)) { | ||
globalState.getStreamStates().add(new AirbyteStreamState() | ||
.withStreamDescriptor(streamDescriptor) | ||
.withStreamState(null)); | ||
} | ||
}); | ||
|
||
return new AirbyteMessage() | ||
.withType(Type.STATE) | ||
.withState( | ||
new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal(globalState)); | ||
} | ||
|
||
private ResetSourceConfiguration parseResetSourceConfigurationAndLogError(final WorkerSourceConfig workerSourceConfig) { | ||
try { | ||
return Jsons.object(workerSourceConfig.getSourceConnectionConfiguration(), ResetSourceConfiguration.class); | ||
} catch (final IllegalArgumentException e) { | ||
log.error("The configuration provided to the reset has an invalid format"); | ||
throw e; | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something weird is happening with your formatting that lines are getting chopped weirdly. this comment should be able to fit on 2 lines. same for the one on line 55.