Skip to content

Commit

Permalink
fix: Add ffmpeg time limit enforcing for flaky streams (#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhamilton723 authored Mar 5, 2021
1 parent ec7cb78 commit 9b75183
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import java.io.{BufferedInputStream, ByteArrayInputStream, Closeable, InputStrea
import java.lang.ProcessBuilder.Redirect
import java.net.{URI, URL}
import java.util.UUID
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import com.microsoft.cognitiveservices.speech._
import com.microsoft.cognitiveservices.speech.audio._
import com.microsoft.cognitiveservices.speech.transcription.{
Conversation, ConversationTranscriber, ConversationTranscriptionEventArgs, Participant
}
import com.microsoft.cognitiveservices.speech.transcription.{Conversation,
ConversationTranscriber, ConversationTranscriptionEventArgs, Participant}
import com.microsoft.cognitiveservices.speech.util.EventHandler
import com.microsoft.ml.spark.build.BuildInfo
import com.microsoft.ml.spark.cognitive.SpeechFormat._
Expand All @@ -34,6 +33,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import spray.json._

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.language.existentials

object OsUtils {
Expand Down Expand Up @@ -248,12 +248,29 @@ abstract class SpeechSDKBase extends Transformer
val extension = FilenameUtils.getExtension(new URI(uri).getPath).toLowerCase()

if (Set("m3u8", "m4a")(extension) && uri.startsWith("http")) {
val stream = new ProcessBuilder()
val proc = new ProcessBuilder()
.redirectError(Redirect.INHERIT)
.redirectInput(Redirect.INHERIT)
.command(ffmpegCommand: _*)
.start()
.getInputStream
val stream = proc.getInputStream

if (getExtraFfmpegArgs.contains("-t")) {
val timeLimit = getExtraFfmpegArgs(getExtraFfmpegArgs.indexOf("-t") + 1).toInt
Future {
blocking {
proc.waitFor(timeLimit + 10, TimeUnit.SECONDS)
}
if (proc.isAlive) {
proc.destroy()
proc.waitFor(10, TimeUnit.SECONDS)
proc.destroyForcibly()
proc.waitFor()
log.warn("Had to forcibly stop ffmpeg")
}
}(ExecutionContext.global)
}

(stream, "mp3")
} else if (uri.startsWith("http")) {
val conn = new URL(uri).openConnection
Expand Down

0 comments on commit 9b75183

Please sign in to comment.