Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ebean): refactor, fix & optimize #12613

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
try {
aspectRecord =
EntityUtils.toSystemAspect(
context.opContext().getRetrieverContext(), aspect.toEntityAspect())
context.opContext().getRetrieverContext(), aspect.toEntityAspect(), false)

Check warning on line 183 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java#L183

Added line #L183 was not covered by tests
.get()
.getRecordTemplate();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String id() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(opContext, mcpTemplate);
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(opContext, mcpTemplate, id());
log.info("Ingesting {} MCPs", batch.getItems().size());
entityService.ingestProposal(opContext, batch, mcpTemplate.isAsync());
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeProposal;
import io.datahubproject.metadata.context.OperationContext;
Expand Down Expand Up @@ -67,7 +68,7 @@ static List<UpgradeStep> generateSteps(
}

static AspectsBatch generateAspectBatch(
OperationContext opContext, BootstrapMCPConfigFile.MCPTemplate mcpTemplate)
OperationContext opContext, BootstrapMCPConfigFile.MCPTemplate mcpTemplate, String runId)
throws IOException {

final AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
Expand All @@ -91,6 +92,7 @@ static AspectsBatch generateAspectBatch(
convenienceConversions(opContext, mcp.getAspectName(), aspect));
GenericAspect genericAspect = GenericRecordUtils.serializeAspect(jsonAspect);
mcp.setAspect(genericAspect);
mcp.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata(runId));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesStep;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.EntityAspect;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;

import com.datahub.util.RecordUtils;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
Expand All @@ -27,6 +28,7 @@
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.Topics;
import com.linkedin.upgrade.DataHubUpgradeResult;
import com.linkedin.upgrade.DataHubUpgradeState;
Expand Down Expand Up @@ -265,7 +267,8 @@ private static EbeanAspectV2 createMockEbeanAspect(String urn, String aspectName
now, // createdOn
"urn:li:corpuser:testUser", // createdBy
null, // createdFor
null // systemMetadata
RecordUtils.toJsonString(
SystemMetadataUtils.createDefaultSystemMetadata()) // systemMetadata
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -221,13 +222,14 @@ public void testMCPBatch() throws IOException {
.getTemplates()
.get(0);

AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(OP_CONTEXT, template);
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(OP_CONTEXT, template, "testMCPBatch");
assertEquals(batch.getMCPItems().size(), 1);

MCPItem item = batch.getMCPItems().get(0);
assertEquals(item.getUrn(), UrnUtils.getUrn("urn:li:dataHubIngestionSource:datahub-test"));
assertEquals(item.getAspectName(), "dataHubIngestionSourceInfo");
assertEquals(item.getChangeType(), ChangeType.UPSERT);
assertNotNull(item.getSystemMetadata());

DataHubIngestionSourceInfo ingestionSource = item.getAspect(DataHubIngestionSourceInfo.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package com.linkedin.metadata.entity;
package com.linkedin.metadata.aspect;

import com.datahub.util.RecordUtils;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.AspectType;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityApiUtils;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
Expand All @@ -21,6 +24,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -35,6 +39,7 @@
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Builder(toBuilder = true)
public class EntityAspect {

@Nonnull private String urn;
Expand Down Expand Up @@ -76,57 +81,48 @@
/**
* Provide a typed EntityAspect without breaking the existing public contract with generic types.
*/
@Builder
@Accessors(chain = true)
@Builder(toBuilder = true)
@Getter
@EqualsAndHashCode
@AllArgsConstructor
public static class EntitySystemAspect implements SystemAspect {
@Nonnull private final EntityAspect entityAspect;
@Nullable private EntityAspect entityAspect;
@Nonnull private final Urn urn;

/** Note that read mutations depend on the mutability of recordTemplate */
@Nullable private final RecordTemplate recordTemplate;
@Setter @Nullable private RecordTemplate recordTemplate;

@Setter @Nullable private SystemMetadata systemMetadata;
@Setter @Nullable private AuditStamp auditStamp;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be useful to specify in the naming that this is the createdAuditstamp


@Nonnull private final EntitySpec entitySpec;
@Nullable private final AspectSpec aspectSpec;

@Nonnull
public String getUrnRaw() {
return entityAspect.getUrn();
}

@Nullable
public String getSystemMetadataRaw() {
return entityAspect.getSystemMetadata();
}

public String getMetadataRaw() {
return entityAspect.getMetadata();
return urn.toString();
}

@Override
public Timestamp getCreatedOn() {
return entityAspect.getCreatedOn();
return auditStamp != null ? new Timestamp(auditStamp.getTime()) : null;
}

@Override
public String getCreatedBy() {
return entityAspect.getCreatedBy();
return auditStamp != null ? auditStamp.getActor().toString() : null;
}

@Override
@Nonnull
public String getAspectName() {
return entityAspect.aspect;
return aspectSpec.getName();
}

@Override
public long getVersion() {
return entityAspect.getVersion();
}

@Nullable
public SystemMetadata getSystemMetadata() {
return EntityApiUtils.parseSystemMetadata(getSystemMetadataRaw());
return entityAspect == null ? 0 : entityAspect.getVersion();
}

/**
Expand Down Expand Up @@ -165,24 +161,53 @@
return envelopedAspect;
}

@Override
public String toString() {
return entityAspect.toString();
}

public static class EntitySystemAspectBuilder {

private EntityAspect.EntitySystemAspect build() {
return null;
return new EntitySystemAspect(
this.entityAspect,
this.urn,
this.recordTemplate,
this.systemMetadata,
this.auditStamp,
this.entitySpec,
this.aspectSpec);
}

public EntityAspect.EntitySystemAspect build(
public EntityAspect.EntitySystemAspect forInsert(
@Nonnull EntityAspect entityAspect, @Nonnull EntityRegistry entityRegistry) {
this.urn = UrnUtils.getUrn(entityAspect.getUrn());
this.entitySpec = entityRegistry.getEntitySpec(this.urn.getEntityType());
this.aspectSpec = entitySpec.getAspectSpec(entityAspect.getAspect());
fromEntityAspect(entityAspect);
return build();
}

public EntityAspect.EntitySystemAspect forUpdate(
@Nonnull EntityAspect entityAspect, @Nonnull EntityRegistry entityRegistry) {

this.entityAspect = entityAspect;

Check warning on line 189 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L189

Added line #L189 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it will make sense once I see how this is used, but it's confusing at this point why we only set the EntitySystemAspect.entityAspect in update scenarios when we have a non-null one in both.


this.urn = UrnUtils.getUrn(entityAspect.getUrn());
this.entitySpec = entityRegistry.getEntitySpec(this.urn.getEntityType());
this.aspectSpec = entitySpec.getAspectSpec(entityAspect.getAspect());
fromEntityAspect(this.entityAspect);
return build();

Check warning on line 195 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L191-L195

Added lines #L191 - L195 were not covered by tests
}

public EntityAspect.EntitySystemAspect forUpdate(
@Nonnull EntitySpec entitySpec,
@Nullable AspectSpec aspectSpec,
@Nonnull EntityAspect entityAspect) {
this.entityAspect = entityAspect;
this.urn = UrnUtils.getUrn(entityAspect.getUrn());
this.entitySpec = entitySpec;

Check warning on line 203 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L203

Added line #L203 was not covered by tests
this.aspectSpec = aspectSpec;
fromEntityAspect(this.entityAspect);
return build();

Check warning on line 206 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L205-L206

Added lines #L205 - L206 were not covered by tests
}

private void fromEntityAspect(@Nonnull EntityAspect entityAspect) {
this.urn = UrnUtils.getUrn(entityAspect.getUrn());
if (entityAspect.getMetadata() != null) {
this.recordTemplate =
RecordUtils.toRecordTemplate(
Expand All @@ -192,9 +217,59 @@
: aspectSpec.getDataTemplateClass()),
entityAspect.getMetadata());
}

return new EntitySystemAspect(entityAspect, urn, recordTemplate, entitySpec, aspectSpec);
if (entityAspect.getSystemMetadata() != null) {
this.systemMetadata =
SystemMetadataUtils.parseSystemMetadata(entityAspect.getSystemMetadata());
}
if (entityAspect.getCreatedBy() != null) {
this.auditStamp =
new AuditStamp()
.setActor(UrnUtils.getUrn(entityAspect.getCreatedBy()))
.setTime(entityAspect.getCreatedOn().getTime())
.setImpersonator(
Optional.ofNullable(entityAspect.getCreatedFor())
.map(UrnUtils::getUrn)
.orElse(null),
SetMode.IGNORE_NULL);
}
}
}

@Nonnull
@Override
public SystemAspect copy() {
return this.toBuilder().entityAspect(null).build();
}

@Nonnull
@Override
public Optional<SystemAspect> getDatabaseAspect() {
return Optional.ofNullable(entityAspect)
.map(a -> EntitySystemAspect.builder().forUpdate(entitySpec, aspectSpec, a));

Check warning on line 248 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L247-L248

Added lines #L247 - L248 were not covered by tests
}

@Nonnull
@Override
public SystemAspect setDatabaseAspect(@Nonnull SystemAspect databaseAspect) {
this.entityAspect = databaseAspect.withVersion(databaseAspect.getVersion());
return this;

Check warning on line 255 in entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java

View check run for this annotation

Codecov / codecov/patch

entity-registry/src/main/java/com/linkedin/metadata/aspect/EntityAspect.java#L254-L255

Added lines #L254 - L255 were not covered by tests
}

@Override
@Nonnull
public EntityAspect withVersion(long version) {
return new EntityAspect(
urn.toString(),
aspectSpec.getName(),
version,
Optional.ofNullable(recordTemplate).map(RecordUtils::toJsonString).orElse(null),
Optional.ofNullable(systemMetadata).map(RecordUtils::toJsonString).orElse(null),
Optional.ofNullable(auditStamp).map(a -> new Timestamp(a.getTime())).orElse(null),
Optional.ofNullable(auditStamp).map(AuditStamp::getActor).map(Urn::toString).orElse(null),
Optional.ofNullable(auditStamp)
.map(AuditStamp::getImpersonator)
.map(Urn::toString)
.orElse(null));
}
}
}
Loading
Loading