From dfe7357de9461de51437f8f8b4a64f01ecc10af4 Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Mon, 23 Sep 2019 11:03:58 +0200 Subject: [PATCH 1/6] added ngram support --- .../ml/spark/vw/VowpalWabbitFeaturizer.scala | 91 ++++++++++++++----- .../vw/VerifyVowpalWabbitClassifier.scala | 26 ++++++ .../vw/VerifyVowpalWabbitFeaturizer.scala | 29 ++++++ 3 files changed, 125 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala index 02188cf1da..0ad351b7a8 100644 --- a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala +++ b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala @@ -6,7 +6,7 @@ package com.microsoft.ml.spark.vw import com.microsoft.ml.spark.core.contracts.{HasInputCols, HasOutputCol, Wrappable} import com.microsoft.ml.spark.vw.featurizer._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.ml.param.{IntParam, ParamMap, StringArrayParam} +import org.apache.spark.ml.param.{BooleanParam, IntParam, ParamMap, StringArrayParam} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, struct, udf} @@ -41,51 +41,79 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer def getStringSplitInputCols: Array[String] = $(stringSplitInputCols) def setStringSplitInputCols(value: Array[String]): this.type = set(stringSplitInputCols, value) + val preserveOrderNumBits = new IntParam(this, "preserveOrderNumBits", + "Number of bits used to preserve the feature order. This will reduce the hash size. " + + "Needs to be large enough to fit count the maximum number of words") + setDefault(preserveOrderNumBits -> 0) + + def getPreserveOrderNumBits: Int = $(preserveOrderNumBits) + def setPreserveOrderNumBits(value: Int): this.type = { + if (value < 1 || value > 28) + throw new IllegalArgumentException("preserveOrderNumBits must be between 1 and 28 bits") + set(preserveOrderNumBits, value) + } + + val prefixStringsWithColumnName = new BooleanParam(this, "prefixStringsWithColumnName", + "Prefix string features with column name") + setDefault(prefixStringsWithColumnName -> true) + + def getPrefixStringsWithColumnName: Boolean = $(prefixStringsWithColumnName) + def setPrefixStringsWithColumnName(value: Boolean): this.type = set(prefixStringsWithColumnName, value) + private def getAllInputCols = getInputCols ++ getStringSplitInputCols private def getFeaturizer(name: String, dataType: DataType, idx: Int, namespaceHash: Int): Featurizer = { val stringSplitInputCols = getStringSplitInputCols + val prefixName = if (getPrefixStringsWithColumnName) name else "" + dataType match { - case DoubleType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getDouble(idx)) - case FloatType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getFloat(idx).toDouble) - case IntegerType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getInt(idx).toDouble) - case LongType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getLong(idx).toDouble) - case ShortType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getShort(idx).toDouble) - case ByteType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getByte(idx).toDouble) - case BooleanType => new BooleanFeaturizer(idx, name, namespaceHash, getMask) + case DoubleType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getDouble(idx)) + case FloatType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getFloat(idx).toDouble) + case IntegerType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getInt(idx).toDouble) + case LongType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getLong(idx).toDouble) + case ShortType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getShort(idx).toDouble) + case ByteType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getByte(idx).toDouble) + case BooleanType => new BooleanFeaturizer(idx, prefixName, namespaceHash, getMask) case StringType => if (stringSplitInputCols.contains(name)) - new StringSplitFeaturizer(idx, name, namespaceHash, getMask) - else new StringFeaturizer(idx, name, namespaceHash, getMask) + new StringSplitFeaturizer(idx, prefixName, namespaceHash, getMask) + else new StringFeaturizer(idx, prefixName, namespaceHash, getMask) case arr: ArrayType => if (arr.elementType != DataTypes.StringType) throw new RuntimeException(s"Unsupported array element type: $dataType") - new StringArrayFeaturizer(idx, name, namespaceHash, getMask) + new StringArrayFeaturizer(idx, prefixName, namespaceHash, getMask) case m: MapType => if (m.keyType != DataTypes.StringType) throw new RuntimeException(s"Unsupported map key type: $dataType") m.valueType match { - case StringType => new MapStringFeaturizer(idx, name, namespaceHash, getMask) - case DoubleType => new MapFeaturizer[Double](idx, name, namespaceHash, getMask, v => v) - case FloatType => new MapFeaturizer[Float](idx, name, namespaceHash, getMask, v => v.toDouble) - case IntegerType => new MapFeaturizer[Int](idx, name, namespaceHash, getMask, v => v.toDouble) - case LongType => new MapFeaturizer[Long](idx, name, namespaceHash, getMask, v => v.toDouble) - case ShortType => new MapFeaturizer[Short](idx, name, namespaceHash, getMask, v => v.toDouble) - case ByteType => new MapFeaturizer[Byte](idx, name, namespaceHash, getMask, v => v.toDouble) + case StringType => new MapStringFeaturizer(idx, prefixName, namespaceHash, getMask) + case DoubleType => new MapFeaturizer[Double](idx, prefixName, namespaceHash, getMask, v => v) + case FloatType => new MapFeaturizer[Float](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case IntegerType => new MapFeaturizer[Int](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case LongType => new MapFeaturizer[Long](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ShortType => new MapFeaturizer[Short](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ByteType => new MapFeaturizer[Byte](idx, prefixName, namespaceHash, getMask, v => v.toDouble) case _ => throw new RuntimeException(s"Unsupported map value type: $dataType") } case m: Any => if (m == VectorType) // unfortunately the type is private - new VectorFeaturizer(idx, name, getMask) + new VectorFeaturizer(idx, prefixName, getMask) else throw new RuntimeException(s"Unsupported data type: $dataType") } } override def transform(dataset: Dataset[_]): DataFrame = { + if (getPreserveOrderNumBits + getNumBits > 30) + throw new IllegalArgumentException( + s"Number of bits used for hashing (${getNumBits} and " + + s"number of bits used for order preserving (${getPreserveOrderNumBits}) must be less than 30") + + val maxFeaturesForOrdering = 1 << getPreserveOrderNumBits + val inputColsList = getAllInputCols val namespaceHash: Int = VowpalWabbitMurmur.hash(this.getOutputCol, this.getSeed) @@ -116,13 +144,34 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer if (!r.isNullAt(f.fieldIdx)) f.featurize(r, indices, values) + val indicesArray = indices.result + if (getPreserveOrderNumBits > 0) { + var idxPrefixBits = 30 - getPreserveOrderNumBits + + if (indicesArray.length > maxFeaturesForOrdering) + throw new IllegalArgumentException( + s"Too many features ${indicesArray.length} for " + + s"number of bits used for order preserving (${getPreserveOrderNumBits})") + + // prefix every feature index with a counter value + // will be stripped when passing to VW + for (i <- 0 until indicesArray.length) { + val idxPrefix = i << idxPrefixBits + indicesArray(i) = indicesArray(i) | idxPrefix + } + } + + // if we use the highest order bits to preserve the ordering + // the maximum index size is larger + val size = if(getPreserveOrderNumBits > 0) 1 << 30 else 1 << getNumBits + // sort by indices and remove duplicate values // Warning: // - due to SparseVector limitations (which doesn't allow duplicates) we need filter // - VW command line allows for duplicate features with different values (just updates twice) - val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indices.result, values.result, getSumCollisions) + val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indicesArray, values.result, getSumCollisions) - Vectors.sparse(1 << getNumBits, indicesSorted, valuesSorted) + Vectors.sparse(size, indicesSorted, valuesSorted) }) dataset.toDF.withColumn(getOutputCol, mode.apply(struct(fieldSubset.map(f => col(f.name)): _*))) diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala index 44afadd56b..3818d17cf5 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala @@ -224,6 +224,32 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp println(labelOneCnt1) } + case class ClassificationInput[T](label: Int, in: T) + + test("Verify VowpalWabbit Classifier w/ ngrams") { + val featurizer = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + + val dataset = session.createDataFrame(Seq( + ClassificationInput[String](1, "marie markus fun"), + ClassificationInput[String](0, "marie markus no fun") + )).coalesce(1) + + val datasetFeaturized = featurizer.transform(dataset) + + val vw1 = new VowpalWabbitClassifier() + .setArgs("--ngram f2 -a") + val classifier1 = vw1.fit(datasetFeaturized) + + // 3 (words) + 2 (ngrams) + 1 (constant) = 6 + // 4 (words) + 3 (ngrams) + 1 (constant) = 8 + assert (classifier1.getPerformanceStatistics.select("totalNumberOfFeatures").head.get(0) == 14) + } + /** Reads a CSV file given the file name and file location. * @param fileName The name of the csv file. * @param fileLocation The full path to the csv file. diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala index 909c0af7a6..1a1834a1fd 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala @@ -23,6 +23,35 @@ class VerifyVowpalWabbitFeaturizer extends TestBase with TransformerFuzzing[Vowp case class Input2[T, S](in1: T, in2: S) + val namespaceFeatures = VowpalWabbitMurmur.hash("features", 0) + + test("Verify order preserving") { + val featurizer1 = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + val df1 = session.createDataFrame(Seq(Input[String]("marie markus fun"))) + + val v1 = featurizer1.transform(df1).select(col("features")).collect.apply(0).getAs[SparseVector](0) + + assert(v1.numNonzeros == 3) + + val bitMask = (1 << 18) - 1 + + // the order is the same as in the string above + assert((bitMask & v1.indices(0)) == (bitMask & + VowpalWabbitMurmur.hash("marie", namespaceFeatures))) + assert((bitMask & v1.indices(1)) == (bitMask & + VowpalWabbitMurmur.hash("markus", namespaceFeatures))) + assert((bitMask & v1.indices(2)) == (bitMask & + VowpalWabbitMurmur.hash("fun", namespaceFeatures))) + assert(v1.values(0) == 1.0) + assert(v1.values(1) == 1.0) + assert(v1.values(2) == 1.0) + } + test("Verify VowpalWabbit Featurizer can be run with seq and string") { val featurizer1 = new VowpalWabbitFeaturizer() .setInputCols(Array("str", "seq")) From 828d452a89855cec0d2d0c7506e0acdc6967b879 Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Mon, 23 Sep 2019 11:03:58 +0200 Subject: [PATCH 2/6] added ngram support --- .../ml/spark/vw/VowpalWabbitFeaturizer.scala | 91 ++++++++++++++----- .../vw/VerifyVowpalWabbitClassifier.scala | 26 ++++++ .../vw/VerifyVowpalWabbitFeaturizer.scala | 27 ++++++ 3 files changed, 123 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala index 02188cf1da..0ad351b7a8 100644 --- a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala +++ b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala @@ -6,7 +6,7 @@ package com.microsoft.ml.spark.vw import com.microsoft.ml.spark.core.contracts.{HasInputCols, HasOutputCol, Wrappable} import com.microsoft.ml.spark.vw.featurizer._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.ml.param.{IntParam, ParamMap, StringArrayParam} +import org.apache.spark.ml.param.{BooleanParam, IntParam, ParamMap, StringArrayParam} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, struct, udf} @@ -41,51 +41,79 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer def getStringSplitInputCols: Array[String] = $(stringSplitInputCols) def setStringSplitInputCols(value: Array[String]): this.type = set(stringSplitInputCols, value) + val preserveOrderNumBits = new IntParam(this, "preserveOrderNumBits", + "Number of bits used to preserve the feature order. This will reduce the hash size. " + + "Needs to be large enough to fit count the maximum number of words") + setDefault(preserveOrderNumBits -> 0) + + def getPreserveOrderNumBits: Int = $(preserveOrderNumBits) + def setPreserveOrderNumBits(value: Int): this.type = { + if (value < 1 || value > 28) + throw new IllegalArgumentException("preserveOrderNumBits must be between 1 and 28 bits") + set(preserveOrderNumBits, value) + } + + val prefixStringsWithColumnName = new BooleanParam(this, "prefixStringsWithColumnName", + "Prefix string features with column name") + setDefault(prefixStringsWithColumnName -> true) + + def getPrefixStringsWithColumnName: Boolean = $(prefixStringsWithColumnName) + def setPrefixStringsWithColumnName(value: Boolean): this.type = set(prefixStringsWithColumnName, value) + private def getAllInputCols = getInputCols ++ getStringSplitInputCols private def getFeaturizer(name: String, dataType: DataType, idx: Int, namespaceHash: Int): Featurizer = { val stringSplitInputCols = getStringSplitInputCols + val prefixName = if (getPrefixStringsWithColumnName) name else "" + dataType match { - case DoubleType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getDouble(idx)) - case FloatType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getFloat(idx).toDouble) - case IntegerType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getInt(idx).toDouble) - case LongType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getLong(idx).toDouble) - case ShortType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getShort(idx).toDouble) - case ByteType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getByte(idx).toDouble) - case BooleanType => new BooleanFeaturizer(idx, name, namespaceHash, getMask) + case DoubleType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getDouble(idx)) + case FloatType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getFloat(idx).toDouble) + case IntegerType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getInt(idx).toDouble) + case LongType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getLong(idx).toDouble) + case ShortType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getShort(idx).toDouble) + case ByteType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getByte(idx).toDouble) + case BooleanType => new BooleanFeaturizer(idx, prefixName, namespaceHash, getMask) case StringType => if (stringSplitInputCols.contains(name)) - new StringSplitFeaturizer(idx, name, namespaceHash, getMask) - else new StringFeaturizer(idx, name, namespaceHash, getMask) + new StringSplitFeaturizer(idx, prefixName, namespaceHash, getMask) + else new StringFeaturizer(idx, prefixName, namespaceHash, getMask) case arr: ArrayType => if (arr.elementType != DataTypes.StringType) throw new RuntimeException(s"Unsupported array element type: $dataType") - new StringArrayFeaturizer(idx, name, namespaceHash, getMask) + new StringArrayFeaturizer(idx, prefixName, namespaceHash, getMask) case m: MapType => if (m.keyType != DataTypes.StringType) throw new RuntimeException(s"Unsupported map key type: $dataType") m.valueType match { - case StringType => new MapStringFeaturizer(idx, name, namespaceHash, getMask) - case DoubleType => new MapFeaturizer[Double](idx, name, namespaceHash, getMask, v => v) - case FloatType => new MapFeaturizer[Float](idx, name, namespaceHash, getMask, v => v.toDouble) - case IntegerType => new MapFeaturizer[Int](idx, name, namespaceHash, getMask, v => v.toDouble) - case LongType => new MapFeaturizer[Long](idx, name, namespaceHash, getMask, v => v.toDouble) - case ShortType => new MapFeaturizer[Short](idx, name, namespaceHash, getMask, v => v.toDouble) - case ByteType => new MapFeaturizer[Byte](idx, name, namespaceHash, getMask, v => v.toDouble) + case StringType => new MapStringFeaturizer(idx, prefixName, namespaceHash, getMask) + case DoubleType => new MapFeaturizer[Double](idx, prefixName, namespaceHash, getMask, v => v) + case FloatType => new MapFeaturizer[Float](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case IntegerType => new MapFeaturizer[Int](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case LongType => new MapFeaturizer[Long](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ShortType => new MapFeaturizer[Short](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ByteType => new MapFeaturizer[Byte](idx, prefixName, namespaceHash, getMask, v => v.toDouble) case _ => throw new RuntimeException(s"Unsupported map value type: $dataType") } case m: Any => if (m == VectorType) // unfortunately the type is private - new VectorFeaturizer(idx, name, getMask) + new VectorFeaturizer(idx, prefixName, getMask) else throw new RuntimeException(s"Unsupported data type: $dataType") } } override def transform(dataset: Dataset[_]): DataFrame = { + if (getPreserveOrderNumBits + getNumBits > 30) + throw new IllegalArgumentException( + s"Number of bits used for hashing (${getNumBits} and " + + s"number of bits used for order preserving (${getPreserveOrderNumBits}) must be less than 30") + + val maxFeaturesForOrdering = 1 << getPreserveOrderNumBits + val inputColsList = getAllInputCols val namespaceHash: Int = VowpalWabbitMurmur.hash(this.getOutputCol, this.getSeed) @@ -116,13 +144,34 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer if (!r.isNullAt(f.fieldIdx)) f.featurize(r, indices, values) + val indicesArray = indices.result + if (getPreserveOrderNumBits > 0) { + var idxPrefixBits = 30 - getPreserveOrderNumBits + + if (indicesArray.length > maxFeaturesForOrdering) + throw new IllegalArgumentException( + s"Too many features ${indicesArray.length} for " + + s"number of bits used for order preserving (${getPreserveOrderNumBits})") + + // prefix every feature index with a counter value + // will be stripped when passing to VW + for (i <- 0 until indicesArray.length) { + val idxPrefix = i << idxPrefixBits + indicesArray(i) = indicesArray(i) | idxPrefix + } + } + + // if we use the highest order bits to preserve the ordering + // the maximum index size is larger + val size = if(getPreserveOrderNumBits > 0) 1 << 30 else 1 << getNumBits + // sort by indices and remove duplicate values // Warning: // - due to SparseVector limitations (which doesn't allow duplicates) we need filter // - VW command line allows for duplicate features with different values (just updates twice) - val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indices.result, values.result, getSumCollisions) + val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indicesArray, values.result, getSumCollisions) - Vectors.sparse(1 << getNumBits, indicesSorted, valuesSorted) + Vectors.sparse(size, indicesSorted, valuesSorted) }) dataset.toDF.withColumn(getOutputCol, mode.apply(struct(fieldSubset.map(f => col(f.name)): _*))) diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala index f63e546a5c..af6a7fca17 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala @@ -209,6 +209,32 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp println(labelOneCnt1) } + case class ClassificationInput[T](label: Int, in: T) + + test("Verify VowpalWabbit Classifier w/ ngrams") { + val featurizer = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + + val dataset = session.createDataFrame(Seq( + ClassificationInput[String](1, "marie markus fun"), + ClassificationInput[String](0, "marie markus no fun") + )).coalesce(1) + + val datasetFeaturized = featurizer.transform(dataset) + + val vw1 = new VowpalWabbitClassifier() + .setArgs("--ngram f2 -a") + val classifier1 = vw1.fit(datasetFeaturized) + + // 3 (words) + 2 (ngrams) + 1 (constant) = 6 + // 4 (words) + 3 (ngrams) + 1 (constant) = 8 + assert (classifier1.getPerformanceStatistics.select("totalNumberOfFeatures").head.get(0) == 14) + } + /** Reads a CSV file given the file name and file location. * @param fileName The name of the csv file. * @param fileLocation The full path to the csv file. diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala index a96d07acd8..9abdf31b69 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala @@ -25,6 +25,33 @@ class VerifyVowpalWabbitFeaturizer extends TestBase with TransformerFuzzing[Vowp val namespaceFeatures = VowpalWabbitMurmur.hash("features", 0) + test("Verify order preserving") { + val featurizer1 = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + val df1 = session.createDataFrame(Seq(Input[String]("marie markus fun"))) + + val v1 = featurizer1.transform(df1).select(col("features")).collect.apply(0).getAs[SparseVector](0) + + assert(v1.numNonzeros == 3) + + val bitMask = (1 << 18) - 1 + + // the order is the same as in the string above + assert((bitMask & v1.indices(0)) == (bitMask & + VowpalWabbitMurmur.hash("marie", namespaceFeatures))) + assert((bitMask & v1.indices(1)) == (bitMask & + VowpalWabbitMurmur.hash("markus", namespaceFeatures))) + assert((bitMask & v1.indices(2)) == (bitMask & + VowpalWabbitMurmur.hash("fun", namespaceFeatures))) + assert(v1.values(0) == 1.0) + assert(v1.values(1) == 1.0) + assert(v1.values(2) == 1.0) + } + test("Verify VowpalWabbit Featurizer can be run with seq and string") { val featurizer1 = new VowpalWabbitFeaturizer() .setInputCols(Array("str", "seq")) From f39af7a49d10678804ba0258c4b118dd368f0a98 Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Tue, 8 Oct 2019 11:29:58 +0200 Subject: [PATCH 3/6] moved validation into isValid lambda --- .../microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala index 0ad351b7a8..8f800b0fc1 100644 --- a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala +++ b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala @@ -43,15 +43,12 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer val preserveOrderNumBits = new IntParam(this, "preserveOrderNumBits", "Number of bits used to preserve the feature order. This will reduce the hash size. " + - "Needs to be large enough to fit count the maximum number of words") + "Needs to be large enough to fit count the maximum number of words", + (value: Int) => value >= 0 && value < 29) setDefault(preserveOrderNumBits -> 0) def getPreserveOrderNumBits: Int = $(preserveOrderNumBits) - def setPreserveOrderNumBits(value: Int): this.type = { - if (value < 1 || value > 28) - throw new IllegalArgumentException("preserveOrderNumBits must be between 1 and 28 bits") - set(preserveOrderNumBits, value) - } + def setPreserveOrderNumBits(value: Int): this.type = set(preserveOrderNumBits, value) val prefixStringsWithColumnName = new BooleanParam(this, "prefixStringsWithColumnName", "Prefix string features with column name") From f64f9e56286eb324a11aebe546bc14654dccaa4b Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Mon, 23 Sep 2019 11:03:58 +0200 Subject: [PATCH 4/6] added ngram support --- .../ml/spark/vw/VowpalWabbitFeaturizer.scala | 91 ++++++++++++++----- .../vw/VerifyVowpalWabbitClassifier.scala | 26 ++++++ .../vw/VerifyVowpalWabbitFeaturizer.scala | 27 ++++++ 3 files changed, 123 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala index 02188cf1da..0ad351b7a8 100644 --- a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala +++ b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala @@ -6,7 +6,7 @@ package com.microsoft.ml.spark.vw import com.microsoft.ml.spark.core.contracts.{HasInputCols, HasOutputCol, Wrappable} import com.microsoft.ml.spark.vw.featurizer._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.ml.param.{IntParam, ParamMap, StringArrayParam} +import org.apache.spark.ml.param.{BooleanParam, IntParam, ParamMap, StringArrayParam} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, struct, udf} @@ -41,51 +41,79 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer def getStringSplitInputCols: Array[String] = $(stringSplitInputCols) def setStringSplitInputCols(value: Array[String]): this.type = set(stringSplitInputCols, value) + val preserveOrderNumBits = new IntParam(this, "preserveOrderNumBits", + "Number of bits used to preserve the feature order. This will reduce the hash size. " + + "Needs to be large enough to fit count the maximum number of words") + setDefault(preserveOrderNumBits -> 0) + + def getPreserveOrderNumBits: Int = $(preserveOrderNumBits) + def setPreserveOrderNumBits(value: Int): this.type = { + if (value < 1 || value > 28) + throw new IllegalArgumentException("preserveOrderNumBits must be between 1 and 28 bits") + set(preserveOrderNumBits, value) + } + + val prefixStringsWithColumnName = new BooleanParam(this, "prefixStringsWithColumnName", + "Prefix string features with column name") + setDefault(prefixStringsWithColumnName -> true) + + def getPrefixStringsWithColumnName: Boolean = $(prefixStringsWithColumnName) + def setPrefixStringsWithColumnName(value: Boolean): this.type = set(prefixStringsWithColumnName, value) + private def getAllInputCols = getInputCols ++ getStringSplitInputCols private def getFeaturizer(name: String, dataType: DataType, idx: Int, namespaceHash: Int): Featurizer = { val stringSplitInputCols = getStringSplitInputCols + val prefixName = if (getPrefixStringsWithColumnName) name else "" + dataType match { - case DoubleType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getDouble(idx)) - case FloatType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getFloat(idx).toDouble) - case IntegerType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getInt(idx).toDouble) - case LongType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getLong(idx).toDouble) - case ShortType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getShort(idx).toDouble) - case ByteType => new NumericFeaturizer(idx, name, namespaceHash, getMask, r => r.getByte(idx).toDouble) - case BooleanType => new BooleanFeaturizer(idx, name, namespaceHash, getMask) + case DoubleType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getDouble(idx)) + case FloatType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getFloat(idx).toDouble) + case IntegerType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getInt(idx).toDouble) + case LongType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getLong(idx).toDouble) + case ShortType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getShort(idx).toDouble) + case ByteType => new NumericFeaturizer(idx, prefixName, namespaceHash, getMask, r => r.getByte(idx).toDouble) + case BooleanType => new BooleanFeaturizer(idx, prefixName, namespaceHash, getMask) case StringType => if (stringSplitInputCols.contains(name)) - new StringSplitFeaturizer(idx, name, namespaceHash, getMask) - else new StringFeaturizer(idx, name, namespaceHash, getMask) + new StringSplitFeaturizer(idx, prefixName, namespaceHash, getMask) + else new StringFeaturizer(idx, prefixName, namespaceHash, getMask) case arr: ArrayType => if (arr.elementType != DataTypes.StringType) throw new RuntimeException(s"Unsupported array element type: $dataType") - new StringArrayFeaturizer(idx, name, namespaceHash, getMask) + new StringArrayFeaturizer(idx, prefixName, namespaceHash, getMask) case m: MapType => if (m.keyType != DataTypes.StringType) throw new RuntimeException(s"Unsupported map key type: $dataType") m.valueType match { - case StringType => new MapStringFeaturizer(idx, name, namespaceHash, getMask) - case DoubleType => new MapFeaturizer[Double](idx, name, namespaceHash, getMask, v => v) - case FloatType => new MapFeaturizer[Float](idx, name, namespaceHash, getMask, v => v.toDouble) - case IntegerType => new MapFeaturizer[Int](idx, name, namespaceHash, getMask, v => v.toDouble) - case LongType => new MapFeaturizer[Long](idx, name, namespaceHash, getMask, v => v.toDouble) - case ShortType => new MapFeaturizer[Short](idx, name, namespaceHash, getMask, v => v.toDouble) - case ByteType => new MapFeaturizer[Byte](idx, name, namespaceHash, getMask, v => v.toDouble) + case StringType => new MapStringFeaturizer(idx, prefixName, namespaceHash, getMask) + case DoubleType => new MapFeaturizer[Double](idx, prefixName, namespaceHash, getMask, v => v) + case FloatType => new MapFeaturizer[Float](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case IntegerType => new MapFeaturizer[Int](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case LongType => new MapFeaturizer[Long](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ShortType => new MapFeaturizer[Short](idx, prefixName, namespaceHash, getMask, v => v.toDouble) + case ByteType => new MapFeaturizer[Byte](idx, prefixName, namespaceHash, getMask, v => v.toDouble) case _ => throw new RuntimeException(s"Unsupported map value type: $dataType") } case m: Any => if (m == VectorType) // unfortunately the type is private - new VectorFeaturizer(idx, name, getMask) + new VectorFeaturizer(idx, prefixName, getMask) else throw new RuntimeException(s"Unsupported data type: $dataType") } } override def transform(dataset: Dataset[_]): DataFrame = { + if (getPreserveOrderNumBits + getNumBits > 30) + throw new IllegalArgumentException( + s"Number of bits used for hashing (${getNumBits} and " + + s"number of bits used for order preserving (${getPreserveOrderNumBits}) must be less than 30") + + val maxFeaturesForOrdering = 1 << getPreserveOrderNumBits + val inputColsList = getAllInputCols val namespaceHash: Int = VowpalWabbitMurmur.hash(this.getOutputCol, this.getSeed) @@ -116,13 +144,34 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer if (!r.isNullAt(f.fieldIdx)) f.featurize(r, indices, values) + val indicesArray = indices.result + if (getPreserveOrderNumBits > 0) { + var idxPrefixBits = 30 - getPreserveOrderNumBits + + if (indicesArray.length > maxFeaturesForOrdering) + throw new IllegalArgumentException( + s"Too many features ${indicesArray.length} for " + + s"number of bits used for order preserving (${getPreserveOrderNumBits})") + + // prefix every feature index with a counter value + // will be stripped when passing to VW + for (i <- 0 until indicesArray.length) { + val idxPrefix = i << idxPrefixBits + indicesArray(i) = indicesArray(i) | idxPrefix + } + } + + // if we use the highest order bits to preserve the ordering + // the maximum index size is larger + val size = if(getPreserveOrderNumBits > 0) 1 << 30 else 1 << getNumBits + // sort by indices and remove duplicate values // Warning: // - due to SparseVector limitations (which doesn't allow duplicates) we need filter // - VW command line allows for duplicate features with different values (just updates twice) - val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indices.result, values.result, getSumCollisions) + val (indicesSorted, valuesSorted) = VectorUtils.sortAndDistinct(indicesArray, values.result, getSumCollisions) - Vectors.sparse(1 << getNumBits, indicesSorted, valuesSorted) + Vectors.sparse(size, indicesSorted, valuesSorted) }) dataset.toDF.withColumn(getOutputCol, mode.apply(struct(fieldSubset.map(f => col(f.name)): _*))) diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala index f63e546a5c..af6a7fca17 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala @@ -209,6 +209,32 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp println(labelOneCnt1) } + case class ClassificationInput[T](label: Int, in: T) + + test("Verify VowpalWabbit Classifier w/ ngrams") { + val featurizer = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + + val dataset = session.createDataFrame(Seq( + ClassificationInput[String](1, "marie markus fun"), + ClassificationInput[String](0, "marie markus no fun") + )).coalesce(1) + + val datasetFeaturized = featurizer.transform(dataset) + + val vw1 = new VowpalWabbitClassifier() + .setArgs("--ngram f2 -a") + val classifier1 = vw1.fit(datasetFeaturized) + + // 3 (words) + 2 (ngrams) + 1 (constant) = 6 + // 4 (words) + 3 (ngrams) + 1 (constant) = 8 + assert (classifier1.getPerformanceStatistics.select("totalNumberOfFeatures").head.get(0) == 14) + } + /** Reads a CSV file given the file name and file location. * @param fileName The name of the csv file. * @param fileLocation The full path to the csv file. diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala index a96d07acd8..9abdf31b69 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitFeaturizer.scala @@ -25,6 +25,33 @@ class VerifyVowpalWabbitFeaturizer extends TestBase with TransformerFuzzing[Vowp val namespaceFeatures = VowpalWabbitMurmur.hash("features", 0) + test("Verify order preserving") { + val featurizer1 = new VowpalWabbitFeaturizer() + .setStringSplitInputCols(Array("in")) + .setPreserveOrderNumBits(2) + .setNumBits(18) + .setPrefixStringsWithColumnName(false) + .setOutputCol("features") + val df1 = session.createDataFrame(Seq(Input[String]("marie markus fun"))) + + val v1 = featurizer1.transform(df1).select(col("features")).collect.apply(0).getAs[SparseVector](0) + + assert(v1.numNonzeros == 3) + + val bitMask = (1 << 18) - 1 + + // the order is the same as in the string above + assert((bitMask & v1.indices(0)) == (bitMask & + VowpalWabbitMurmur.hash("marie", namespaceFeatures))) + assert((bitMask & v1.indices(1)) == (bitMask & + VowpalWabbitMurmur.hash("markus", namespaceFeatures))) + assert((bitMask & v1.indices(2)) == (bitMask & + VowpalWabbitMurmur.hash("fun", namespaceFeatures))) + assert(v1.values(0) == 1.0) + assert(v1.values(1) == 1.0) + assert(v1.values(2) == 1.0) + } + test("Verify VowpalWabbit Featurizer can be run with seq and string") { val featurizer1 = new VowpalWabbitFeaturizer() .setInputCols(Array("str", "seq")) From 01c31c20b9ec08157029e23a3f578eb8f45695ba Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Tue, 8 Oct 2019 11:29:58 +0200 Subject: [PATCH 5/6] moved validation into isValid lambda --- .../microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala index 0ad351b7a8..8f800b0fc1 100644 --- a/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala +++ b/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitFeaturizer.scala @@ -43,15 +43,12 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer val preserveOrderNumBits = new IntParam(this, "preserveOrderNumBits", "Number of bits used to preserve the feature order. This will reduce the hash size. " + - "Needs to be large enough to fit count the maximum number of words") + "Needs to be large enough to fit count the maximum number of words", + (value: Int) => value >= 0 && value < 29) setDefault(preserveOrderNumBits -> 0) def getPreserveOrderNumBits: Int = $(preserveOrderNumBits) - def setPreserveOrderNumBits(value: Int): this.type = { - if (value < 1 || value > 28) - throw new IllegalArgumentException("preserveOrderNumBits must be between 1 and 28 bits") - set(preserveOrderNumBits, value) - } + def setPreserveOrderNumBits(value: Int): this.type = set(preserveOrderNumBits, value) val prefixStringsWithColumnName = new BooleanParam(this, "prefixStringsWithColumnName", "Prefix string features with column name") From 21fbe1908c281116919779fcfe79f49e941ffdec Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Tue, 8 Oct 2019 16:33:33 +0200 Subject: [PATCH 6/6] removed space --- .../microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala index af6a7fca17..5a26c7e4f1 100644 --- a/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala +++ b/src/test/scala/com/microsoft/ml/spark/vw/VerifyVowpalWabbitClassifier.scala @@ -232,7 +232,7 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp // 3 (words) + 2 (ngrams) + 1 (constant) = 6 // 4 (words) + 3 (ngrams) + 1 (constant) = 8 - assert (classifier1.getPerformanceStatistics.select("totalNumberOfFeatures").head.get(0) == 14) + assert(classifier1.getPerformanceStatistics.select("totalNumberOfFeatures").head.get(0) == 14) } /** Reads a CSV file given the file name and file location.