Skip to content

Commit

Permalink
GEOMESA-2299 Fixing Bigtable SpatialRDDProvider (#1974)
Browse files Browse the repository at this point in the history
Signed-off-by: Emilio Lahr-Vivaz <elahrvivaz@ccri.com>
  • Loading branch information
elahrvivaz authored and jahhulbert-ccri committed Jun 11, 2018
1 parent cacd1de commit 46ac9d1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import org.apache.spark.SparkContext
import org.geotools.data.{DataStoreFinder, Query}
import org.geotools.filter.text.ecql.ECQL
import org.locationtech.geomesa.bigtable.data.BigtableDataStoreFactory
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.hbase.data.{EmptyPlan, HBaseDataStore}
import org.locationtech.geomesa.hbase.index.HBaseFeatureIndex
import org.locationtech.geomesa.hbase.index.{HBaseFeatureIndex, HBaseIndexAdapter}
import org.locationtech.geomesa.hbase.jobs.HBaseGeoMesaRecordReader
import org.locationtech.geomesa.jobs.GeoMesaConfigurator
import org.locationtech.geomesa.spark.SpatialRDD
Expand Down Expand Up @@ -92,11 +93,11 @@ class GeoMesaBigtableInputFormat extends InputFormat[Text, SimpleFeature] {
var delegate: BigtableInputFormat = _

var sft: SimpleFeatureType = _
var table: HBaseFeatureIndex = _
var table: HBaseIndexAdapter = _

private def init(conf: Configuration) = if (sft == null) {
private def init(conf: Configuration): Unit = if (sft == null) {
sft = GeoMesaConfigurator.getSchema(conf)
table = HBaseFeatureIndex.index(GeoMesaConfigurator.getIndexIn(conf))
table = HBaseFeatureIndex.index(GeoMesaConfigurator.getIndexIn(conf)).asInstanceOf[HBaseIndexAdapter]
delegate = new BigtableInputFormat(TableName.valueOf(GeoMesaConfigurator.getTable(conf)))
delegate.setConf(conf)
// see TableMapReduceUtil.java
Expand All @@ -117,11 +118,10 @@ class GeoMesaBigtableInputFormat extends InputFormat[Text, SimpleFeature] {
context: TaskAttemptContext): RecordReader[Text, SimpleFeature] = {
init(context.getConfiguration)
val rr = delegate.createRecordReader(split, context)
val transformSchema = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
val q = GeoMesaConfigurator.getFilter(context.getConfiguration).map { f => ECQL.toFilter(f) }
new HBaseGeoMesaRecordReader(sft, table, rr, q, transformSchema)
val transform = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
val ecql = GeoMesaConfigurator.getFilter(context.getConfiguration).map(FastFilterFactory.toFilter(sft, _))
new HBaseGeoMesaRecordReader(table, sft, ecql, transform, rr, false)
}

}

object BigtableInputFormat {
Expand All @@ -133,7 +133,7 @@ class BigtableInputFormat(val name: TableName) extends BigtableInputFormatBase w
setName(name)

/** The configuration. */
private var conf: Configuration = null
private var conf: Configuration = _


/**
Expand Down Expand Up @@ -161,4 +161,3 @@ class BigtableInputFormat(val name: TableName) extends BigtableInputFormatBase w
setScans(s)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce._
import org.geotools.filter.identity.FeatureIdImpl
import org.geotools.process.vector.TransformProcess
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.hbase.data.HBaseConnectionPool
import org.locationtech.geomesa.hbase.index.{HBaseFeatureIndex, HBaseIndexAdapter}
import org.locationtech.geomesa.jobs.GeoMesaConfigurator
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

import scala.util.control.NonFatal
import org.opengis.filter.Filter

/**
* Input format that allows processing of simple features from GeoMesa based on a CQL query
Expand Down Expand Up @@ -59,18 +57,23 @@ class GeoMesaHBaseInputFormat extends InputFormat[Text, SimpleFeature] with Lazy
context: TaskAttemptContext): RecordReader[Text, SimpleFeature] = {
init(context.getConfiguration)
val rr = delegate.createRecordReader(split, context)
val ecql = GeoMesaConfigurator.getFilter(context.getConfiguration).map(FastFilterFactory.toFilter(sft, _))
val transform = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
// transforms are pushed down in HBase
new HBaseGeoMesaRecordReader(table, sft, transform, rr)
// TODO GEOMESA-2300 support local filtering
new HBaseGeoMesaRecordReader(table, sft, ecql, transform, rr, true)
}
}

class HBaseGeoMesaRecordReader(table: HBaseIndexAdapter,
sft: SimpleFeatureType,
ecql: Option[Filter],
transform: Option[SimpleFeatureType],
reader: RecordReader[ImmutableBytesWritable, Result])
reader: RecordReader[ImmutableBytesWritable, Result],
remoteFiltering: Boolean)
extends RecordReader[Text, SimpleFeature] with LazyLogging {

import scala.collection.JavaConverters._

private val results = new Iterator[Result] {

private var current: Result = _
Expand All @@ -93,7 +96,18 @@ class HBaseGeoMesaRecordReader(table: HBaseIndexAdapter,
}
}

private val features = table.resultsToFeatures(sft, transform.getOrElse(sft))(results)
private val features =
if (remoteFiltering) {
// transforms and filter are pushed down, so we don't have to deal with them here
table.resultsToFeatures(sft, transform.getOrElse(sft))(results)
} else {
// TODO GEOMESA-2300 this doesn't handle anything beyond simple attribute projection
val transforms = transform.map { tsft =>
(tsft.getAttributeDescriptors.asScala.map(d => s"${d.getLocalName}=${d.getLocalName}").mkString(";"), tsft)
}
table.resultsToFeatures(sft, ecql, transforms)(results)
}

private var staged: SimpleFeature = _

override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = reader.initialize(split, context)
Expand Down

0 comments on commit 46ac9d1

Please sign in to comment.