Skip to content

Commit

Permalink
updated LocalPeer.kt
Browse files Browse the repository at this point in the history
  • Loading branch information
ambjn committed Oct 22, 2024
1 parent ec7619f commit 1e5ac75
Showing 1 changed file with 54 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ import java.util.Locale
/** LocalPeer is the main class which handles all the functionality of the client
* Where Client Means the currently Running Application.
*/
class LocalPeer(
context: Context
) : EventEmitter() {
class LocalPeer(context: Context) : EventEmitter() {

companion object {
/** LocalPeer Instance, Singleton class, only one instance of this class can be created
Expand Down Expand Up @@ -339,9 +337,7 @@ class LocalPeer(
return try {
socket.publish(
Request.RequestCase.SEND_DATA, mapOf(
"to" to parsedTo,
"payload" to data.payload,
"label" to data.label
"to" to parsedTo, "payload" to data.payload, "label" to data.label
)
)
Pair(true, null)
Expand Down Expand Up @@ -430,7 +426,7 @@ class LocalPeer(


suspend fun produce(
label: String, audioTrack: AudioTrack?, videoTrack: VideoTrack?, appData: String?
label: String, audioTrack: AudioTrack?, videoTrack: VideoTrack?, appData: String?,
) {
Timber.i("produce called")
try {
Expand Down Expand Up @@ -583,17 +579,17 @@ class LocalPeer(
}
}

fun changeCam(){
localVideoManager?.switchCamera(
object : CameraSwitchHandler {
override fun onCameraSwitchDone(b: Boolean) {
store.setCamInProgress(false)
}
override fun onCameraSwitchError(s: String) {
Timber.w("❌ Error Enabling Video $s")
store.setCamInProgress(false)
}
})
fun changeCam() {
localVideoManager?.switchCamera(object : CameraSwitchHandler {
override fun onCameraSwitchDone(b: Boolean) {
store.setCamInProgress(false)
}

override fun onCameraSwitchError(s: String) {
Timber.w("❌ Error Enabling Video $s")
store.setCamInProgress(false)
}
})
}


Expand Down Expand Up @@ -639,7 +635,7 @@ class LocalPeer(
* @returns Consumer?; Returns null if consumer is not found
*/
fun getConsumer(
label: String, peerId: String
label: String, peerId: String,
): Consumer? {
val consumer = consumers.get(label, peerId)
return consumer
Expand Down Expand Up @@ -783,28 +779,20 @@ class LocalPeer(
waitingToProduce.clear()
waitingToConsume.clear()

activeAudioTrack.values.forEach { stream ->
stream.dispose()
}
activeVideoTrack.values.forEach { stream ->
stream.dispose()
}
joined = false

// dispose audio manager
localAudioManager?.dispose()
// dispose video manager
localVideoManager?.dispose()
camCapturer?.stopCapture()

_sendTransport?.close()
_sendTransport = null

_recvTransport?.close()
_recvTransport = null

permissions.reset()
// dispose audio manager
localAudioManager?.dispose()
// dispose video manager
localVideoManager?.dispose()

permissions.reset()
// store setRoomState
store.setRoomState(RoomStates.CLOSED)

Expand Down Expand Up @@ -927,10 +915,9 @@ class LocalPeer(
try {
Timber.i("✅ Client recovered after reconnecting => $syncMeetingStateResponse")

val latestPeersSet = syncMeetingStateResponse.roomInfo.peersList
.orEmpty()
.mapNotNull { it.peerId }
.toSet()
val latestPeersSet =
syncMeetingStateResponse.roomInfo.peersList.orEmpty().mapNotNull { it.peerId }
.toSet()

remotePeers.entries.toList().forEach { (peerId, peer) ->
if (peerId in latestPeersSet) {
Expand All @@ -941,13 +928,11 @@ class LocalPeer(
remotePeers.remove(peerId)
room.emit("peer-left", peerId)
} else {
val latestPeerInfo = syncMeetingStateResponse.roomInfo.peersList
.find { it.peerId == peerId }
val latestPeerInfo =
syncMeetingStateResponse.roomInfo.peersList.find { it.peerId == peerId }

val newProducerSet = latestPeerInfo?.producersList
.orEmpty()
.mapNotNull { it.label }
.toSet()
val newProducerSet =
latestPeerInfo?.producersList.orEmpty().mapNotNull { it.label }.toSet()

peer.labels.forEach { label ->
if (label in newProducerSet) {
Expand All @@ -973,32 +958,30 @@ class LocalPeer(
}

// Handle new peers
syncMeetingStateResponse.roomInfo.peersList
.filter { it.peerId != null && !remotePeers.containsKey(it.peerId) && it.peerId != this.peerId }
.forEach { latestPeer ->
val peerId = latestPeer.peerId

val remotePeer = RemotePeer(
peerId = peerId,
role = latestPeer.role,
metadata = latestPeer.metadata
)
syncMeetingStateResponse.roomInfo.peersList.filter {
it.peerId != null && !remotePeers.containsKey(
it.peerId
) && it.peerId != this.peerId
}.forEach { latestPeer ->
val peerId = latestPeer.peerId

val remotePeer = RemotePeer(
peerId = peerId, role = latestPeer.role, metadata = latestPeer.metadata
)

remotePeers[peerId] = remotePeer
remotePeers[peerId] = remotePeer

latestPeer.producersList.forEach { producer ->
val producerId = producer.id
val label = producer.label
latestPeer.producersList.forEach { producer ->
val producerId = producer.id
val label = producer.label

remotePeer.addLabelData(
label = label,
producerId = producerId,
this@LocalPeer.appContext
)
}

room.emit("new-peer-joined", mapOf("peer" to remotePeer))
remotePeer.addLabelData(
label = label, producerId = producerId, this@LocalPeer.appContext
)
}

room.emit("new-peer-joined", mapOf("peer" to remotePeer))
}
} catch (error: Throwable) {
Timber.e("❌ Error Syncing Meeting State, Can't Recover | error: $error")
}
Expand Down Expand Up @@ -1514,7 +1497,7 @@ class LocalPeer(
}

override fun onProduce(
transport: Transport, kind: String, rtpParameters: String, appData: String?
transport: Transport, kind: String, rtpParameters: String, appData: String?,
): String {
try {
socket.publish(
Expand All @@ -1538,7 +1521,7 @@ class LocalPeer(
sctpStreamParameters: String,
label: String,
protocol: String,
appData: String?
appData: String?,
): String {
TODO("Not yet implemented")
}
Expand Down Expand Up @@ -1576,7 +1559,7 @@ class LocalPeer(
dtlsParameters: String,
sctpParameters: String? = null,
rtcConfig: PeerConnection.RTCConfiguration? = null,
appData: String? = null
appData: String? = null,
): Transport? {
Timber.i("createDeviceTransport called for $transportType")
val transport = when (transportType) {
Expand Down Expand Up @@ -1612,7 +1595,7 @@ class LocalPeer(
}

private fun connectionStateChangeHandler(
transport: Transport?, state: String, transportType: String
transport: Transport?, state: String, transportType: String,
) {
try {
Timber.d("🔔 $transportType Transport Connection State Changed, state: $state")
Expand Down Expand Up @@ -1673,9 +1656,7 @@ class LocalPeer(
val peerId = peer.peerId
if (peerId != this.peerId) {
val remotePeer = RemotePeer(
peerId = peerId,
metadata = peer.metadata.orEmpty(),
role = peer.role
peerId = peerId, metadata = peer.metadata.orEmpty(), role = peer.role
)
remotePeers[peerId] = remotePeer
CoroutineScope(Dispatchers.Main).launch {
Expand All @@ -1687,9 +1668,7 @@ class LocalPeer(
put("role", peer.role)
})
remotePeer.addLabelData(
producer.label,
producer.id,
this@LocalPeer.appContext
producer.label, producer.id, this@LocalPeer.appContext
)
}
}
Expand All @@ -1709,7 +1688,7 @@ class LocalPeer(
* Helper function to close the consumer of a remote peer
*/
private fun closeRemotePeerConsumer(
peerId: String, label: String
peerId: String, label: String,
) {
try {
val remotePeer = room.getRemotePeerById(peerId)
Expand Down

0 comments on commit 1e5ac75

Please sign in to comment.