-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination-async-framework: move the state emission logic into GlobalAsyncStateManager #35240
Changes from 3 commits
b25716e
7f9412e
ad55a52
cc5bbee
3ecb04c
b3d4b84
0f841cd
6be3616
9ecf432
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; | ||
import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; | ||
import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; | ||
import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.slf4j.Logger; | ||
|
@@ -64,9 +63,9 @@ public void close() throws Exception { | |
* | ||
* @return list of states that can be flushed | ||
*/ | ||
public List<PartialStateWithDestinationStats> flushStates(final Map<Long, Long> stateIdToCount) { | ||
public void flushStates(final Map<Long, Long> stateIdToCount) { | ||
stateIdToCount.forEach(stateManager::decrement); | ||
return stateManager.flushStates(); | ||
stateManager.flushStates(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious, does the locking break if we return the ids to flush and flush them in the FlushWorkers instead? That way, all the actual action happens in the flush workers, and not in objects scattered around. Mainly a cleanliness suggestion. |
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,13 +10,13 @@ | |
import com.google.common.base.Strings; | ||
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; | ||
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.protocol.models.v0.AirbyteMessage; | ||
import io.airbyte.protocol.models.v0.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.v0.AirbyteStateStats; | ||
import io.airbyte.protocol.models.v0.StreamDescriptor; | ||
import java.util.ArrayList; | ||
import java.time.Instant; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
|
@@ -25,6 +25,7 @@ | |
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.LinkedBlockingDeque; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Consumer; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.commons.io.FileUtils; | ||
import org.apache.commons.lang3.tuple.ImmutablePair; | ||
|
@@ -95,11 +96,13 @@ public class GlobalAsyncStateManager { | |
private long retroactiveGlobalStateId = 0; | ||
// All access to this field MUST be guarded by a synchronized(lock) block | ||
private long arrivalNumber = 0; | ||
private final Consumer<AirbyteMessage> outputRecordCollector; | ||
|
||
private final Object LOCK = new Object(); | ||
private static final Object LOCK = new Object(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. static seems unnecessary ? |
||
|
||
public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager) { | ||
public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager, final Consumer<AirbyteMessage> outputRecordCollector) { | ||
this.memoryManager = memoryManager; | ||
this.outputRecordCollector = outputRecordCollector; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we avoid holding a copy of outputRecordCollector at instance level and use only in flushStates ? This reduces the possibility of future abuse of this instance variable. |
||
this.memoryAllocated = new AtomicLong(memoryManager.requestMemory()); | ||
this.memoryUsed = new AtomicLong(); | ||
} | ||
|
@@ -161,8 +164,7 @@ public void decrement(final long stateId, final long count) { | |
* | ||
* @return list of state messages with no more inflight records. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: update javadoc. |
||
*/ | ||
public List<PartialStateWithDestinationStats> flushStates() { | ||
final List<PartialStateWithDestinationStats> output = new ArrayList<>(); | ||
public void flushStates() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: what if Looks like it might minimise this PR's change set too. |
||
Long bytesFlushed = 0L; | ||
synchronized (LOCK) { | ||
for (final Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>> entry : descToStateIdQ.entrySet()) { | ||
|
@@ -195,8 +197,13 @@ public List<PartialStateWithDestinationStats> flushStates() { | |
if (allRecordsCommitted) { | ||
final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft(); | ||
final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue(); | ||
output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(), | ||
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber())); | ||
|
||
log.info("State with arrival number {} emitted from thread {} at {}", stateMessage.arrivalNumber(), Thread.currentThread().getName(), | ||
Instant.now().toString()); | ||
final AirbyteMessage message = Jsons.deserialize(stateMessage.partialAirbyteStateMessage.getSerialized(), AirbyteMessage.class); | ||
message.getState().setDestinationStats(new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)); | ||
outputRecordCollector.accept(message); | ||
|
||
bytesFlushed += oldestState.getRight(); | ||
|
||
// cleanup | ||
|
@@ -212,7 +219,6 @@ public List<PartialStateWithDestinationStats> flushStates() { | |
} | ||
|
||
freeBytes(bytesFlushed); | ||
return output; | ||
} | ||
|
||
private Long getStateIdAndIncrement(final StreamDescriptor streamDescriptor, final long increment) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change java doc.