Getting Started with Subscribing

Follow these steps to add the subscribing capability to your application.

1. Create a subscriber object

Use the createSubscriber method to create a subscriber object.

import com.millicast.Core
import kotlinx.coroutines.*
import com.millicast.subscribers.*

// Helper for later usage
fun <T> CoroutineScope.safeLaunch(
    onError: (suspend CoroutineScope.(err: Throwable) -> T)? = null,
    block: suspend CoroutineScope.() -> T
) = launch {
    try {
        block()
    } catch (err: Throwable) {
        onError?.invoke(this, err)
    }
}

// Creating the subscriber
val subscriber = Core.createSubscriber()

2. Set up your credentials

Get your stream name and stream ID from the dashboard and set them up in the SDK using the setCredentials method .

coroutineScope.safeLaunch {
    val credential = Credential(
        // Set the streamName, accountId, and API URL
        streamName = "myStreamName",
        accountId = "qwertyuiop",
        apiUrl = "https://director.millicast.com/api/director/subscribe"
    )

    subscriber.setCredentials(credential)
}

3. Use flows to be notified about the subscriber's state changes

The state flow provides the current subscriber's state and the track flow returns all available tracks.

val coroutineScope = CoroutineScope(Dispatchers.IO)
coroutineScope.async {
    subscriber.state.collect { newSubscriberState ->
        Log.d("SAMPLE", "having new State ${newSubscriberState}")
    }
}
coroutineScope.async {
    subscriber.track.collect { newTrackHolder ->
        Log.d("SAMPLE", "new track ${newTrackHolder.mid.orEmpty()}")
    }
}
// ... others follow

4. Configure the viewer by setting your preferred options

Define your subscription preferences and then call the connect method to connect to the Millicast platform. After this step, call the subscribe method and provide the defined options as its parameter.

coroutineScope.safeLaunch {
    val option = Option(
        // The main source that will be received by the default media stream
        pinnedSourceId = "mainSource",
        // Enables audio multiplexing and denotes the number of audio tracks to receive as Voice Activity Detection (VAD) multiplexed audio
        multiplexedAudioTrack = 3U,
        // Audio streams that should not be included in the multiplex, for example your own audio stream
        excludedSourceId = arrayOf("excluded")
    )
    // Set the selected options
    subscriber.connect()
    subscriber.subscribe(option)
}

5. Manage broadcast events

When broadcast events occur, the SDK publishes the update to one of the flows maintained by the client object. The following Subscriber event listeners are available:

  • The activity flow:
    • The ActivityStream.Active event is published whenever a new source starts publishing a stream. It contains the stream ID, the IDs of tracks within a stream, and the source ID.
    • The ActivityStream.Inactive event is published whenever a source is no longer published within a stream. It contains the stream ID and the source ID.
  • The state flow:
    • The subscriptionState field is set to SubscriptionState.Stopped whenever a stream stops.
    • The updated viewers field is published each time a new viewer enters or leaves a stream. All clients connected to the stream are notified about the current number of viewers.
    • The tracks field contains a list of cached local tracks and their corresponding media IDs. This list updates itself and triggers a new state whenever the tracks are sent after subscribing or calling addRemoteTrack or addRemoteTrackForResult.
    • The streamSourceActivities field contains a list of cached StreamSourceActivity. This class is similar to ActivityStream.Active and ActivityStream.Inactive, the difference is that whenever the tracks field inside the StreamSourceActivity is empty, the instance can be considered inactive; otherwise, it is active.
  • The vad flow produces the Vad object when a source ID is multiplexed into an audio track based on the voice activity level. It contains the mid of the track and the source ID.
  • The layers flow produces the Layers object when Simulcast or SVC layers are available. It contains arrays of the LayerData object that you can use in the select command.

6. Project media

Using the multi-source feature requires projecting tracks into a specified transceiver using its media ID (mid). When you start subscribing, you will receive the activity flow updates. To forward the selected media to the subscriber, use the project method of the subscriber. It requires providing the source ID you are targeting and an array of the tracks you want to project.

By default, only one video and audio track is negotiated in the SDP. If there are several publishers sending media in one stream, you can dynamically add more tracks using the addRemoteTrackForResult method each time you receive the ActivityStream.Active event. The method returns a new track and the attached media ID. When successful, the SDK creates a new track and triggers both the track flow update and the cached list of tracks in the Subscriber's state object field.

Note that the tracks added to the subscriber remain there until you unsubscribe. Each disconnection results in resetting the subscriber's state. We recommend caching the source ID and media ID mappings whenever you get either tracks and activity updates or when the corresponding lists inside the Subscriber's state update themselves. This way, you can add the new tracks and call the project or unproject method when the source becomes inactive and then active again.

// Initialize the local items important for this local sample
val streamIdMap = mutableMapOf<String, StreamSourceActivity>()
val queue = Queue()
// com.millicast.utils.Queue provides a scope.launch { } that will wait
// for its head to complete before calling the next item

/**
 * Retrieve the list of new tracks given modifications on the StreamSourceActivity object
 */
fun checkTracksInStreamSourceActivity(streamSourceActivity: StreamSourceActivity): StreamSourceActivity {
  // First, ensure that you find a corresponding cached version
  streamIdMap.putIfAbsent(
    streamSourceActivity.streamId,
    streamSourceActivity.copy(activeTracks = emptyArray())
  )

  val cached = streamIdMap[streamSourceActivity.streamId]!!

  // All the new tracks, for example each track that is in the new array but not the previous
  val newTracks = streamSourceActivity.activeTracks.filter { activeTrack ->
    null == cached.activeTracks.find { it.trackId == activeTrack.trackId }
  }

  // Send a copy that contains only the new tracks
  return streamSourceActivity.copy(activeTracks = newTracks.toTypedArray())
}

/**
 * When a new source becomes active, store information about it, create a local track,
 * and send the projection command
 */
coroutineScope.async {
  subscriber.state.map { it.streamSourceActivities }.collect { streamSourceActivities ->
    queue.post {
      // Check whether the list of activity or tracks
      // is empty, which means that everything local needs to be flushed

      streamSourceActivities.map { activity ->
        // Fetch a copy of the activity object containing only the new tracks
        val newlyFoundRemoteTracks = checkTracksInStreamSourceActivity(activity)

        // Update the cached list of activeTracks with the one obtained at the beginning
        streamIdMap[activity.streamId] = streamIdMap[activity.streamId]!!.copy(
          activeTracks = activity.activeTracks
        )

        newlyFoundRemoteTracks
      }.forEach { activity ->
        // The activity only contains the newly added tracks
        activity.activeTracks.forEach {
          // You can add the remote track locally
          val newlyAddedLocalTrack = subscriber.addRemoteTrackForResult(it.media)

          // Send the command to project the remote content locally
          subscriber.project(
            activity.sourceId, arrayListOf(
              ProjectionData(
                trackId = it.trackId,
                media = it.media.name.lowercase(),
                mid = newlyAddedLocalTrack.mid!!
                  )
            )
          )
        }
      }
    }
  }
} 

To stop projecting the track, call the unproject method. In its parameter, provide an array of the media IDs that you want to stop projecting.

7. Select a layer that you want to receive

When a publisher uses Simulcast or the Scalable Video Coding (SVC) mode when sending a video feed, the media server automatically chooses the right layer to send to the viewer according to the bandwidth estimation. However, you can force the server to send you a specific layer by calling the select method. The layers available in the stream are updated in the layers flow.

For example, if the sender uses Simulcast, it is possible to receive three different encoding IDs: 'h' for the high resolution, 'm' for the medium one, and 'l' for the low. In order to choose the medium resolution, you have to do the following:

// Get layer updates:
coroutineScope.async {
    subscriber.layers.collect { layers ->
        // Got layers update
        if (!layers.activeLayers.isEmpty()) {
            // Select some layer; passing null will make an automatic selection
            subscriber.select(layers.activeLayers[0])
        }
    }
}

8. Render video and select audio devices

The SDK provides an interface that lets you implement a class responsible for receiving video frames.

<!-- Add a layout in your fragment to add a view in Kotlin code later -->

<LinearLayout
  android:id="@+id/linear_layout_video"
  android:layout_width="match_parent"
  android:layout_height="0dp"
  android:layout_weight="0.6"
	android:orientation="horizontal">
</LinearLayout>

Either implement a class which implements the org.webrtc.VideoSink interface or use one of the classes provided by the SDK, which also can be used as views.

If you implement your own video sink, you will get the VideoFrame whenever it is available. You can render the frame using any graphic library.

To get video data from the VideoFrame, use the getBuffer method. To extract pixel data from the buffer, cast the buffer to one of the proper interface types it implements, such as I420Buffer, I444Buffer or TextureBuffer.

To make the SDK render video automatically you can use the TextureViewRenderer view provided by the SDK though:

coroutineScope.async {
    subscriber.track.collect { trackHolder ->
        when(trackHolder) {
            is TrackHolder.VideoTrackHolder -> {
                // Create the renderer:
                val view = com.millicast.video.TextureViewRenderer(applicationContext)
                // Add the renderer to the layout:
                val layout = findViewById<LinearLayout>(R.id.linear_layout_video)
                layout.addView(view)
                // Set the renderer as a sink on the video track:
                trackHolder.videoTrack.setVideoSink(view)
            }
            is TrackHolder.AudioTrackHolder -> {
                // No video, ignore
            }
        }
    }
}

To select the audio device and adjust the playback volume, use the following code:

// Select one playback device to be able to play audio
Media.audioPlayback.firstOrNull()?.initPlayback()

// Adjust the volume of all new remote tracks
coroutineScope.async {
    subscriber.track.collect { trackHolder ->
        when(trackHolder) {
            is TrackHolder.VideoTrackHolder -> {
                // No audio, ignore
            }
            is TrackHolder.AudioTrackHolder -> {
                trackHolder.audioTrack.setVolume(1.0) // The volume must be between 0 and 1
            }
        }
    }
}

Collecting RTC statistics

You can periodically collect the WebRTC peer connection statistics if you enable them through the enableStats method of the subscriber. After enabling the statistics, you will get a report every second through the onStatsReport callback in the listener object. The identifiers and way to browse the stats are following the RTC specification.