Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-7] Feature/onepipeline2 (#7788)
Browse files Browse the repository at this point in the history
* [Refactor] Remvoe object ClickhouseOptimisticTransaction

* [Refactor] Fix typo

* [Improve] we now parse partition at java side

* [Improve] Make BasicWriteTaskStats correct

* [Bug Fix] Fix header mismatch issue: We need to get header from pipeline builder instead of query plan, since query plan only contains read rel output header.

* [Refactor] Pass write proto for one pipeline write

* [Improve] Introduce RuntimeConfig and RuntiemSettings

[Improve] Using callback to update config without copying config
[Refactor] NativeExpressionEvaluator and refactor Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter
[Refactor] Simplify the logic of evaluating tmp_path
[Refactor] Remove Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_injectWriteFilesTempPath

* [New Feature] one pipeline
  • Loading branch information
baibaichen authored Nov 10, 2024
1 parent 2b62a40 commit be0435a
Show file tree
Hide file tree
Showing 68 changed files with 1,270 additions and 667 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils

import scala.collection.mutable.ListBuffer

object ClickhouseOptimisticTransaction {}
class ClickhouseOptimisticTransaction(
override val deltaLog: DeltaLog,
override val snapshot: Snapshot)(implicit override val clock: Clock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils

import scala.collection.mutable.ListBuffer

object ClickhouseOptimisticTransaction {}
class ClickhouseOptimisticTransaction(
override val deltaLog: DeltaLog,
override val snapshot: Snapshot)(implicit override val clock: Clock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHConf

import org.apache.spark.SparkException
Expand All @@ -24,12 +25,12 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeDelayedCommitProtocol, TransactionalWrite}
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
import org.apache.spark.sql.execution.{CHDelayedCommitProtocol, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, GlutenWriterColumnarRules, WriteFiles, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.MergeTreeWriterInjects
Expand All @@ -43,8 +44,6 @@ import org.apache.hadoop.fs.Path

import scala.collection.mutable.ListBuffer

object ClickhouseOptimisticTransaction {}

class ClickhouseOptimisticTransaction(
override val deltaLog: DeltaLog,
override val catalogTable: Option[CatalogTable],
Expand All @@ -66,7 +65,12 @@ class ClickhouseOptimisticTransaction(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
if (ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {

// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = GlutenConfig.getConf.enableNativeWriter.getOrElse(
false) && CHConf.get.enableOnePipelineMergeTreeWrite

if (!onePipeline && ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {
hasWritten = true

val spark = inputData.sparkSession
Expand Down Expand Up @@ -169,12 +173,6 @@ class ClickhouseOptimisticTransaction(
}
committer.addedStatuses.toSeq ++ committer.changeFiles
} else {
// TODO: support native delta parquet write
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
}
Expand All @@ -188,7 +186,7 @@ class ClickhouseOptimisticTransaction(
}

override protected def getCommitter(outputPath: Path): DelayedCommitProtocol =
new CHDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir)
new FileDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir)

override def writeFiles(
inputData: Dataset[_],
Expand Down Expand Up @@ -231,7 +229,17 @@ class ClickhouseOptimisticTransaction(
WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty)

val queryExecution = new QueryExecution(spark, write)
val committer = getCommitter(outputPath)
val committer = fileFormat.toString match {
case "MergeTree" =>
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
new MergeTreeDelayedCommitProtocol2(
outputPath.toString,
None,
deltaDataSubdir,
tableV2.dataBaseName,
tableV2.tableName)
case _ => getCommitter(outputPath)
}

// If Statistics Collection is enabled, then create a stats tracker that will be injected during
// the FileFormatWriter.write call below and will collect per-file stats using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.files

import org.apache.hadoop.mapreduce.TaskAttemptContext

class MergeTreeDelayedCommitProtocol(
val outputPath: String,
randomPrefixLength: Option[Int],
Expand All @@ -24,3 +26,49 @@ class MergeTreeDelayedCommitProtocol(
val tableName: String)
extends DelayedCommitProtocol("delta-mergetree", outputPath, randomPrefixLength, subdir)
with MergeTreeFileCommitProtocol {}

/**
* A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and fields in a pipeline
* write for parquet.
*/
class FileDelayedCommitProtocol(
jobId: String,
val outputPath: String,
randomPrefixLength: Option[Int],
subdir: Option[String])
extends DelayedCommitProtocol(jobId, outputPath, randomPrefixLength, subdir) {

override def getFileName(
taskContext: TaskAttemptContext,
ext: String,
partitionValues: Map[String, String]): String = {
super.getFileName(taskContext, ext, partitionValues)
}

def updateAddedFiles(files: Seq[(Map[String, String], String)]): Unit = {
assert(addedFiles.isEmpty)
addedFiles ++= files
}

override def parsePartitions(dir: String): Map[String, String] =
super.parsePartitions(dir)
}

/**
* A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and fields in a pipeline
* write for mergetree.
*/
class MergeTreeDelayedCommitProtocol2(
val outputPath: String,
randomPrefixLength: Option[Int],
subdir: Option[String],
val database: String,
val tableName: String)
extends DelayedCommitProtocol("delta-mergetree", outputPath, randomPrefixLength, subdir) {

override def newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String = outputPath

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,19 @@ import org.apache.gluten.exception.GlutenNotSupportException

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.delta.files.DelayedCommitProtocol
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.delta.files.{FileDelayedCommitProtocol, MergeTreeDelayedCommitProtocol2}
import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils

import org.apache.hadoop.mapreduce.TaskAttemptContext

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/** A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and fields. */
class CHDelayedCommitProtocol(
jobId: String,
val outputPath: String,
randomPrefixLength: Option[Int],
subdir: Option[String])
extends DelayedCommitProtocol(jobId, outputPath, randomPrefixLength, subdir) {

override def getFileName(
taskContext: TaskAttemptContext,
ext: String,
partitionValues: Map[String, String]): String = {
super.getFileName(taskContext, ext, partitionValues)
}

def updateAddedFiles(files: Seq[(Map[String, String], String)]): Unit = {
assert(addedFiles.isEmpty)
addedFiles ++= files
}

override def parsePartitions(dir: String): Map[String, String] =
super.parsePartitions(dir)
}

case class CHDelayedCommitProtocolWrite(
case class FileDeltaColumnarWrite(
override val jobTrackerID: String,
override val description: WriteJobDescription,
override val committer: CHDelayedCommitProtocol)
extends CHColumnarWrite[CHDelayedCommitProtocol]
override val committer: FileDelayedCommitProtocol)
extends CHColumnarWrite[FileDelayedCommitProtocol]
with Logging {

override def doSetupNativeTask(): Unit = {
Expand All @@ -73,39 +47,41 @@ case class CHDelayedCommitProtocolWrite(

private def doCollectNativeResult(
cb: ColumnarBatch): Option[(Seq[(Map[String, String], String)], ExecutedWriteSummary)] = {
val numFiles = cb.numRows()
val basicNativeStats = BasicNativeStats(cb)

// TODO: we need close iterator here before processing the result.

// Write an empty iterator
if (numFiles == 0) {
if (basicNativeStats.isEmpty) {
None
} else {
val file_col = cb.column(0)
val partition_col = cb.column(1)
val count_col = cb.column(2)

val partitions: mutable.Set[String] = mutable.Set[String]()
// process stats
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
new ArrayBuffer[(Map[String, String], String)]

var numWrittenRows: Long = 0
Range(0, cb.numRows()).foreach {
i =>
val fileName = file_col.getUTF8String(i).toString
val partition = partition_col.getUTF8String(i).toString
if (partition == "__NO_PARTITION_ID__") {
addedFiles.append((Map.empty[String, String], fileName))

basicNativeStats.foreach {
stat =>
val absolutePath = s"${description.path}/${stat.relativePath}"
if (stat.partition_id == "__NO_PARTITION_ID__") {
addedFiles.append((Map.empty[String, String], stat.filename))
} else {
val partitionValues = committer.parsePartitions(partition)
addedFiles.append((partitionValues, s"$partition/$fileName"))
val partitionValues = committer.parsePartitions(stat.partition_id)
addedFiles.append((partitionValues, stat.relativePath))
basicWriteJobStatsTracker.newPartition(
new GenericInternalRow(Array[Any](stat.partition_id)))
}
numWrittenRows += count_col.getLong(i)
basicWriteJobStatsTracker.newFile(absolutePath)
basicWriteJobStatsTracker.closeFile(absolutePath)
numWrittenRows += stat.record_count
}
val updatedPartitions = partitions.toSet

Some(
(
addedFiles.toSeq,
ExecutedWriteSummary(
updatedPartitions = updatedPartitions,
stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions, numWrittenRows)))))
updatedPartitions = Set.empty,
stats = Seq(finalStats.copy(numRows = numWrittenRows)))))
}
}

Expand Down Expand Up @@ -134,8 +110,11 @@ object CHDeltaColumnarWrite {
jobTrackerID: String,
description: WriteJobDescription,
committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = committer match {
case c: CHDelayedCommitProtocol =>
CHDelayedCommitProtocolWrite(jobTrackerID, description, c)
case c: FileDelayedCommitProtocol =>
FileDeltaColumnarWrite(jobTrackerID, description, c)
.asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
case m: MergeTreeDelayedCommitProtocol2 =>
MergeTreeDeltaColumnarWrite(jobTrackerID, description, m)
.asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
case _ =>
throw new GlutenNotSupportException(
Expand Down
Loading

0 comments on commit be0435a

Please sign in to comment.