Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE due to GCS auth not being passed in when reading BAM header. #2450

Merged
merged 1 commit into from
Mar 15, 2017

Conversation

tomwhite
Copy link
Contributor

Fixes #2449.

@codecov-io
Copy link

codecov-io commented Mar 10, 2017

Codecov Report

Merging #2450 into master will increase coverage by 0.005%.
The diff coverage is 60%.

@@               Coverage Diff               @@
##              master     #2450       +/-   ##
===============================================
+ Coverage     76.261%   76.267%   +0.005%     
  Complexity     10866     10866               
===============================================
  Files            750       750               
  Lines          39560     39556        -4     
  Branches        6916      6916               
===============================================
- Hits           30169     30168        -1     
+ Misses          6771      6768        -3     
  Partials        2620      2620
Impacted Files Coverage Δ Complexity Δ
...nder/tools/spark/BaseRecalibratorSparkSharded.java 23.729% <100%> (ø) 2 <0> (ø)
...stitute/hellbender/engine/spark/GATKSparkTool.java 84.211% <100%> (-0.164%) 53 <0> (ø)
...der/engine/spark/datasources/ReadsSparkSource.java 66.316% <33.333%> (+2.03%) 28 <4> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 987e2f9...05211ec. Read the comment docs.

public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, final List<SimpleInterval> intervals, final long splitSize) {
SAMFileHeader header = getHeader(readFileName, referencePath, null);
public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, final AuthHolder auth, final List<SimpleInterval> intervals, final long splitSize) {
SAMFileHeader header = getHeader(readFileName, referencePath, auth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just convert getHeader() to use an NIO Path in the GCS case, instead of passing around this AuthHolder object and calling into BAMIO.openBAM() from the dataflow library? If yes, I'd prefer to do that rather than spread use of the obsolete AuthHolder class further.

@jean-philippe-martin do you concur?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would be nicer. Did we succeed at getting NIO working on Spark clients?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a getPath method that does the construction and works around the classpath issue, but there's currently a bug in the NIO code that makes it not work properly. See my stack trace below. I think you said that that bug is fixed already and we may just need to bump the version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I think that's in googleapis/google-cloud-java#1470. I see it was merged in February this year.

@droazen
Copy link
Contributor

droazen commented Mar 10, 2017

Back to @tomwhite with a question/request.

@droazen droazen assigned tomwhite and unassigned droazen and lbergelson Mar 10, 2017
@lbergelson
Copy link
Member

@tomwhite I ran your branch manually on gcs and get a new error which I believe is a GCS NIO bug that we discussed in samtools/htsjdk#724.

	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
	at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:360)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource.putPairsInSamePartition(ReadsSparkSource.java:233)
	at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource.getParallelReads(ReadsSparkSource.java:125)
	at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.getUnfilteredReads(GATKSparkTool.java:238)
	at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.getReads(GATKSparkTool.java:212)
	at org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark.runTool(MarkDuplicatesSpark.java:68)
	at org.broadinstitute.hellbender.engine.spark.GATKSparkTool.runPipeline(GATKSparkTool.java:353)
	at org.broadinstitute.hellbender.engine.spark.SparkCommandLineProgram.doWork(SparkCommandLineProgram.java:38)
	at org.broadinstitute.hellbender.cmdline.CommandLineProgram.runTool(CommandLineProgram.java:111)
	at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMainPostParseArgs(CommandLineProgram.java:169)
	at org.broadinstitute.hellbender.cmdline.CommandLineProgram.instanceMain(CommandLineProgram.java:188)
	at org.broadinstitute.hellbender.Main.runCommandLineProgram(Main.java:121)
	at org.broadinstitute.hellbender.Main.mainEntry(Main.java:142)
	at org.broadinstitute.hellbender.Main.main(Main.java:218)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: GCS FileSystem URIs mustn't have: port, userinfo, path, query, or fragment: gs://broad-gatk-test-jenkins/CEUTrio.HiSeq.WEx.b37.NA12892.readnamesort.bam
	at shaded.cloud-nio.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
	at com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider.newFileSystem(CloudStorageFileSystemProvider.java:192)
	at com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider.newFileSystem(CloudStorageFileSystemProvider.java:83)
	at java.nio.file.FileSystems.newFileSystem(FileSystems.java:336)
	at org.seqdoop.hadoop_bam.util.NIOFileUtil.asPath(NIOFileUtil.java:40)
	at org.seqdoop.hadoop_bam.BAMRecordReader.initialize(BAMRecordReader.java:140)
	at org.seqdoop.hadoop_bam.BAMInputFormat.createRecordReader(BAMInputFormat.java:121)
	at org.seqdoop.hadoop_bam.AnySAMInputFormat.createRecordReader(AnySAMInputFormat.java:190)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:170)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:130)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

@tomwhite tomwhite force-pushed the tw_fix_spark_on_gcs_auth_failure branch from 8fd790f to 2cd57bd Compare March 13, 2017 16:31
@tomwhite
Copy link
Contributor Author

tomwhite commented Mar 13, 2017

I agree about avoiding AuthHolder, so here's a new PR that uses NIO for getting the header from GCS. I haven't set the reference on the SamReaderFactory since there is no Path-based method. This means that reading CRAMs from GCS will not work, but that's no worse than it is now.

} catch (Exception e) {
throw new UserException("Failed to read bam header from " + filePath + "\n Caused by:" + e.getMessage(), e);
}
return new ReadsDataSource(IOUtils.getPath(filePath)).getHeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the construction of the ReadsDataSource within a try-with-resources block to ensure it's closed before we return the header.

@droazen
Copy link
Contributor

droazen commented Mar 13, 2017

Review complete, back to @tomwhite. Go ahead and merge after you've addressed my one comment.

@droazen droazen removed their assignment Mar 13, 2017
@droazen
Copy link
Contributor

droazen commented Mar 13, 2017

Oh, and please also create a ticket to fix the CRAM header reading support after this is merged. As discussed at our meeting today, all you have to do is provide a SamReaderFactory with a referenceSequence() set on it when you construct your ReadsDataSource. An example of how to do this is in GATKTool.

@tomwhite tomwhite force-pushed the tw_fix_spark_on_gcs_auth_failure branch from 2cd57bd to 05211ec Compare March 15, 2017 12:15
@tomwhite
Copy link
Contributor Author

Opened #2463 for the CRAM issue.

@tomwhite tomwhite merged commit 5d2f859 into master Mar 15, 2017
@tomwhite tomwhite deleted the tw_fix_spark_on_gcs_auth_failure branch March 15, 2017 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants