Skip to content

Commit

Permalink
move partition consolidator and add LocalAggregator API
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft committed Jun 4, 2021
1 parent 2a716c1 commit cfb06d5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.ml.spark.io.http
package com.microsoft.ml.spark.stages

import java.util.concurrent.LinkedBlockingQueue

import com.microsoft.ml.spark.core.contracts.{HasInputCol, HasOutputCol}
import com.microsoft.ml.spark.io.http.{HTTPParams, SharedSingleton}
import com.microsoft.ml.spark.logging.BasicLogging
import org.apache.spark.ml.{ComplexParamsWritable, Transformer}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
import org.apache.spark.ml.{ComplexParamsWritable, Transformer}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand Down Expand Up @@ -134,3 +136,8 @@ class PartitionConsolidator(val uid: String)

override def transformSchema(schema: StructType): StructType = schema
}

trait LocalAggregator[T] {
def prep(iter: Iterator[Row]): T
def merge(ts: Seq[T]): T
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.microsoft.ml.spark.flaky

import com.microsoft.ml.spark.core.test.base.{SparkSessionFactory, TestBase, TimeLimitedFlaky}
import com.microsoft.ml.spark.core.test.fuzzing.{TestObject, TransformerFuzzing}
import com.microsoft.ml.spark.io.http.PartitionConsolidator
import com.microsoft.ml.spark.stages.PartitionConsolidator
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DoubleType, StructType}
Expand Down

0 comments on commit cfb06d5

Please sign in to comment.