Skip to content

Commit

Permalink
Replace event_tag FK to get rid of insert and return akka#710
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Oct 25, 2023
1 parent 408b560 commit 0ed72af
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 34 deletions.
3 changes: 2 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ jdbc-journal {
schemaName = ""

columnNames {
eventId = "event_id"
persistenceId = "persistence_id"
sequenceNumber = "sequence_number"
tag = "tag"
}
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ CREATE TABLE IF NOT EXISTS "event_journal" (
CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");

CREATE TABLE IF NOT EXISTS "event_tag" (
"event_id" BIGINT NOT NULL,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
PRIMARY KEY("persistence_id", "sequence_number", "tag"),
CONSTRAINT fk_event_journal
FOREIGN KEY("event_id")
REFERENCES "event_journal"("ordering")
FOREIGN KEY("persistence_id", "sequence_number")
REFERENCES "event_journal"("persistence_id", "sequence_number"))
ON DELETE CASCADE
);

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/resources/schema/mysql/mysql-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE IF NOT EXISTS event_tag (
event_id BIGINT UNSIGNED NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
tag VARCHAR(255) NOT NULL,
PRIMARY KEY(event_id, tag),
FOREIGN KEY (event_id)
REFERENCES event_journal(ordering)
PRIMARY KEY(persistence_id, sequence_number, tag),
FOREIGN KEY (persistence_id, sequence_number)
REFERENCES event_journal(persistence_id, sequence_number)
ON DELETE CASCADE
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOU
/

CREATE TABLE EVENT_TAG (
EVENT_ID NUMERIC NOT NULL,
PERSISTENCE_ID VARCHAR(255) NOT NULL,
SEQUENCE_NUMBER NUMERIC NOT NULL,
TAG VARCHAR(255) NOT NULL,
PRIMARY KEY(EVENT_ID, TAG),
FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING)
PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG),
FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) REFERENCES EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER)
ON DELETE CASCADE
)
/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ CREATE TABLE IF NOT EXISTS public.event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering);

CREATE TABLE IF NOT EXISTS public.event_tag(
event_id BIGINT,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
tag VARCHAR(256),
PRIMARY KEY(event_id, tag),
PRIMARY KEY(persistence_id, sequence_number, tag),
CONSTRAINT fk_event_journal
FOREIGN KEY(event_id)
REFERENCES event_journal(ordering)
FOREIGN KEY(persistence_id, sequence_number)
REFERENCES event_journal(persistence_id, sequence_number)
ON DELETE CASCADE
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ CREATE TABLE event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE event_tag (
"event_id" BIGINT NOT NULL,
"persistence_id" NVARCHAR(255) NOT NULL,
"sequence_number" NUMERIC(10,0) NOT NULL,
"tag" NVARCHAR(255) NOT NULL
PRIMARY KEY ("event_id","tag")
PRIMARY KEY ("persistence_id", "sequence_number","tag")
constraint "fk_event_journal"
foreign key("event_id")
references "dbo"."event_journal"("ordering")
foreign key("persistence_id", "sequence_number")
references "dbo"."event_journal"("persistence_id", "sequence_number")
on delete CASCADE
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class EventJournalTableColumnNames(config: Config) {

class EventTagTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.event_tag.columnNames")
val eventId: String = cfg.getString("eventId")
val persistenceId: String = cfg.getString("persistenceId")
val sequenceNumber: String = cfg.getString("sequenceNumber")
val tag: String = cfg.getString("tag")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@ class JournalQueries(
import profile.api._

private val JournalTableC = Compiled(JournalTable)
private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering))
private val TagTableC = Compiled(TagTable)

def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext) = {
def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val sorted = xs.sortBy((event => event._1.sequenceNumber))
// not matter what, always insert event.
val events = sorted.map(_._1)
if (sorted.exists(_._2.nonEmpty)) {
// only if there are any tags
val (events, tags) = sorted.unzip
val tagsRows = sorted.flatMap(e => e._2.map(v => TagRow(e._1.persistenceId, e._1.sequenceNumber, v)))
for {
ids <- insertAndReturn ++= events
tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => TagRow(id, tag)) }
_ <- TagTableC ++= tagInserts
_ <- JournalTableC ++= events
_ <- TagTableC ++= tagsRows
} yield ()
} else {
// optimization avoid some work when not using tags
val events = sorted.map(_._1)
JournalTableC ++= events
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object JournalTables {
metaSerId: Option[Int],
metaSerManifest: Option[String])

case class TagRow(eventId: Long, tag: String)
case class TagRow(persistenceId: String, sequenceNumber: Long, tag: String)
}

/**
Expand Down Expand Up @@ -91,13 +91,16 @@ trait JournalTables {
lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag))

class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) {
override def * = (eventId, tag) <> (TagRow.tupled, TagRow.unapply)
override def * = (persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply)

val eventId: Rep[Long] = column[Long](tagTableCfg.columnNames.eventId)
val persistenceId: Rep[String] = column[String](tagTableCfg.columnNames.persistenceId)
val sequenceNumber: Rep[Long] = column[Long](tagTableCfg.columnNames.sequenceNumber)
val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag)

val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (eventId, tag))
val journalEvent = foreignKey(s"fk_${journalTableCfg.tableName}", eventId, JournalTable)(_.ordering)
val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId, sequenceNumber, tag))
val journalEvent =
foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId, sequenceNumber), JournalTable)(e =>
(e.persistenceId, e.sequenceNumber))
}

lazy val TagTable = new TableQuery(tag => new EventTags(tag))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
JournalTable.filter(_.deleted === false)

private def baseTableWithTagsQuery() = {
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
baseTableQuery().join(TagTable).on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber)
}

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)
Expand Down

0 comments on commit 0ed72af

Please sign in to comment.