Skip to content

Commit 2eca24d

Browse files
committed
patch: Update goutils to v0.7.0
Update `goutils` to `v0.7.0` and expose metrics for task processors.
1 parent f886681 commit 2eca24d

21 files changed

+80
-21
lines changed

bin/control.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ func DefineControlNode(
123123
pubsubMetricsAgent = metrics.InstallPubSubMetrics()
124124
}
125125

126+
var taskProcessorMetricsAgent goutils.TaskProcessorMetricHelper
127+
if config.Metrics.Features.EnableTaskProcessorMetrics {
128+
taskProcessorMetricsAgent = metrics.InstallTaskProcessorMetrics()
129+
}
130+
126131
// Create server to host metrics collection endpoint
127132
theNode.MetricsServer, err = api.BuildMetricsCollectionServer(
128133
config.Metrics.Server, metrics, config.Metrics.MetricsEndpoint, config.Metrics.MaxRequests,
@@ -231,6 +236,7 @@ func DefineControlNode(
231236
config.VODConfig.SegmentReadMaxWaitTime(),
232237
s3Client,
233238
metrics,
239+
taskProcessorMetricsAgent,
234240
)
235241
if err != nil {
236242
log.WithError(err).WithFields(logTags).Error("Failed to create segment reader")
@@ -447,7 +453,12 @@ func DefineControlNode(
447453
}
448454

449455
theNode.playlistManager, err = vod.NewPlaylistManager(
450-
parentCtxt, dbConns, config.VODConfig.SegmentReaderWorkerCount, plBuilder, vodSegmentMgnt,
456+
parentCtxt,
457+
dbConns,
458+
config.VODConfig.SegmentReaderWorkerCount,
459+
plBuilder,
460+
vodSegmentMgnt,
461+
taskProcessorMetricsAgent,
451462
)
452463
if err != nil {
453464
log.WithError(err).WithFields(logTags).Error("Failed to create video playlist manager")

bin/edge.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func DefineEdgeNode(
104104
pubsubMetricsAgent = metrics.InstallPubSubMetrics()
105105
}
106106

107+
var taskProcessorMetricsAgent goutils.TaskProcessorMetricHelper
108+
if config.Metrics.Features.EnableTaskProcessorMetrics {
109+
taskProcessorMetricsAgent = metrics.InstallTaskProcessorMetrics()
110+
}
111+
107112
// Create server to host metrics collection endpoint
108113
theNode.MetricsServer, err = api.BuildMetricsCollectionServer(
109114
config.Metrics.Server, metrics, config.Metrics.MetricsEndpoint, config.Metrics.MaxRequests,
@@ -358,7 +363,11 @@ func DefineEdgeNode(
358363
return theNode, err
359364
}
360365
liveForwarder, err := forwarder.NewHTTPLiveStreamSegmentForwarder(
361-
parentCtxt, dbConns, httpSegmentSender, config.Forwarder.Live.MaxInFlight,
366+
parentCtxt,
367+
dbConns,
368+
httpSegmentSender,
369+
config.Forwarder.Live.MaxInFlight,
370+
taskProcessorMetricsAgent,
362371
)
363372
if err != nil {
364373
log.WithError(err).WithFields(logTags).Error("Failed to create live stream segment forwarder")
@@ -417,6 +426,7 @@ func DefineEdgeNode(
417426
s3SegmentSender,
418427
psBroadcast,
419428
config.Forwarder.Recording.MaxInFlight,
429+
taskProcessorMetricsAgent,
420430
)
421431
if err != nil {
422432
log.
@@ -450,7 +460,7 @@ func DefineEdgeNode(
450460

451461
// Define video source operator
452462
theNode.operator, err = edge.NewManager(
453-
parentCtxt, edgeOperatorConfig, edgeToCtrlRRClient, metrics,
463+
parentCtxt, edgeOperatorConfig, edgeToCtrlRRClient, metrics, taskProcessorMetricsAgent,
454464
)
455465
if err != nil {
456466
log.
@@ -476,7 +486,12 @@ func DefineEdgeNode(
476486

477487
// Define video segment reader
478488
theNode.segmentReader, err = utils.NewSegmentReader(
479-
parentCtxt, config.MonitorConfig.SegmentReaderWorkerCount, 0, nil, metrics,
489+
parentCtxt,
490+
config.MonitorConfig.SegmentReaderWorkerCount,
491+
0,
492+
nil,
493+
metrics,
494+
taskProcessorMetricsAgent,
480495
)
481496
if err != nil {
482497
log.WithError(err).WithFields(logTags).Error("Failed to create video segment reader")
@@ -504,6 +519,7 @@ func DefineEdgeNode(
504519
theNode.segmentReader,
505520
theNode.operator.NewSegmentFromSource,
506521
metrics,
522+
taskProcessorMetricsAgent,
507523
)
508524
if err != nil {
509525
log.WithError(err).WithFields(logTags).Error("Failed to create HLS monitor")
@@ -540,7 +556,7 @@ func DefineEdgeNode(
540556
}
541557

542558
theNode.playlistManager, err = vod.NewPlaylistManager(
543-
parentCtxt, dbConns, 2, plBuilder, segmentMgnt,
559+
parentCtxt, dbConns, 2, plBuilder, segmentMgnt, taskProcessorMetricsAgent,
544560
)
545561
if err != nil {
546562
log.WithError(err).WithFields(logTags).Error("Failed to create video playlist manager")

common/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ type MetricsFeatureConfig struct {
138138
EnableHTTPMetrics bool `mapstructure:"enableHTTPMetrics" json:"enableHTTPMetrics"`
139139
// EnablePubSubMetrics whether to enable PubSub operational metrics
140140
EnablePubSubMetrics bool `mapstructure:"enablePubSubMetrics" json:"enablePubSubMetrics"`
141+
// EnableTaskProcessorMetrics whether to enable Task processor operational metrics
142+
EnableTaskProcessorMetrics bool `mapstructure:"enableTaskProcessorMetrics" json:"enableTaskProcessorMetrics"`
141143
}
142144

143145
// MetricsConfig application metrics config
@@ -474,6 +476,7 @@ func InstallDefaultControlNodeConfigValues() {
474476
viper.SetDefault("metrics.features.enableAppMetrics", false)
475477
viper.SetDefault("metrics.features.enableHTTPMetrics", true)
476478
viper.SetDefault("metrics.features.enablePubSubMetrics", true)
479+
viper.SetDefault("metrics.features.enableTaskProcessorMetrics", true)
477480
// Default metrics HTTP server config
478481
viper.SetDefault("metrics.service.listenOn", "0.0.0.0")
479482
viper.SetDefault("metrics.service.appPort", 3001)
@@ -549,6 +552,7 @@ func InstallDefaultEdgeNodeConfigValues() {
549552
viper.SetDefault("metrics.features.enableAppMetrics", false)
550553
viper.SetDefault("metrics.features.enableHTTPMetrics", true)
551554
viper.SetDefault("metrics.features.enablePubSubMetrics", true)
555+
viper.SetDefault("metrics.features.enableTaskProcessorMetrics", true)
552556
// Default metrics HTTP server config
553557
viper.SetDefault("metrics.service.listenOn", "0.0.0.0")
554558
viper.SetDefault("metrics.service.appPort", 3001)

edge/manager.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func NewManager(
169169
params VideoSourceOperatorConfig,
170170
rrClient ControlRequestClient,
171171
metrics goutils.MetricsCollector,
172+
tpMetrics goutils.TaskProcessorMetricHelper,
172173
) (VideoSourceOperator, error) {
173174
logTags := log.Fields{
174175
"module": "edge", "component": "video-source-operator", "source-id": params.Self.ID,
@@ -233,7 +234,7 @@ func NewManager(
233234
workerLogTags[lKey] = lVal
234235
}
235236
worker, err := goutils.GetNewTaskProcessorInstance(
236-
parentCtxt, "support-worker", 4, workerLogTags,
237+
parentCtxt, "support-worker", 4, workerLogTags, tpMetrics,
237238
)
238239
if err != nil {
239240
log.WithError(err).WithFields(logTags).Error("Failed to define support worker")

edge/manager_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestVideoSourceOperatorStartRecording(t *testing.T) {
6767
mock.AnythingOfType("time.Time"),
6868
).Return(nil)
6969

70-
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil)
70+
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil, nil)
7171
assert.Nil(err)
7272

7373
// ====================================================================================
@@ -256,7 +256,7 @@ func TestVideoSourceOperatorStopRecording(t *testing.T) {
256256
mock.AnythingOfType("time.Time"),
257257
).Return(nil)
258258

259-
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil)
259+
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil, nil)
260260
assert.Nil(err)
261261

262262
// ====================================================================================
@@ -352,7 +352,7 @@ func TestVideoSourceOperatorNewSegmentFromSource(t *testing.T) {
352352
mock.AnythingOfType("time.Time"),
353353
).Return(nil)
354354

355-
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil)
355+
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil, nil)
356356
assert.Nil(err)
357357

358358
// ====================================================================================
@@ -520,7 +520,7 @@ func TestVideoSourceOperatorSyncActiveRecordingState(t *testing.T) {
520520
mock.AnythingOfType("time.Time"),
521521
).Return(nil)
522522

523-
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil)
523+
uut, err := edge.NewManager(utCtxt, uutConfig, mockRR, nil, nil)
524524
assert.Nil(err)
525525

526526
timestamp := time.Now().UTC()

forwarder/live.go

+3
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ NewHTTPLiveStreamSegmentForwarder define new HTTP version of LiveStreamSegmentFo
5151
@param dbConns db.ConnectionManager - DB connection manager
5252
@param sender SegmentSender - client for forwarding video segments to system control node
5353
@param maxInFlightSegments int - max number of segment being forwarded at any one time
54+
@param tpMetrics goutils.TaskProcessorMetricHelper - task processor metrics helper
5455
@returns new LiveStreamSegmentForwarder
5556
*/
5657
func NewHTTPLiveStreamSegmentForwarder(
5758
parentCtxt context.Context,
5859
dbConns db.ConnectionManager,
5960
sender SegmentSender,
6061
maxInFlightSegments int,
62+
tpMetrics goutils.TaskProcessorMetricHelper,
6163
) (LiveStreamSegmentForwarder, error) {
6264
logTags := log.Fields{
6365
"module": "forwarder",
@@ -91,6 +93,7 @@ func NewHTTPLiveStreamSegmentForwarder(
9193
maxInFlightSegments+1,
9294
maxInFlightSegments+1,
9395
workerLogsTags,
96+
tpMetrics,
9497
)
9598
if err != nil {
9699
log.WithError(err).WithFields(logTags).Error("Failed to define support worker")

forwarder/live_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestHTTPLiveStreamForwarder(t *testing.T) {
2626
mockDB.On("Close").Return()
2727
mockSender := mocks.NewSegmentSender(t)
2828

29-
uut, err := forwarder.NewHTTPLiveStreamSegmentForwarder(utCtxt, mockSQL, mockSender, 2)
29+
uut, err := forwarder.NewHTTPLiveStreamSegmentForwarder(utCtxt, mockSQL, mockSender, 2, nil)
3030
assert.Nil(err)
3131

3232
// ------------------------------------------------------------------------------------

forwarder/recording.go

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ NewS3RecordingSegmentForwarder define new S3 version of RecordingSegmentForwarde
5555
@param s3Client SegmentSender - S3 segment transport client
5656
@param broadcastClient utils.Broadcaster - message broadcast client
5757
@param maxInFlightSegments int - max number of segment being stored at any one time
58+
@param tpMetrics goutils.TaskProcessorMetricHelper - task processor metrics helper
5859
@returns new RecordingSegmentForwarder
5960
*/
6061
func NewS3RecordingSegmentForwarder(
@@ -63,6 +64,7 @@ func NewS3RecordingSegmentForwarder(
6364
s3Client SegmentSender,
6465
broadcastClient utils.Broadcaster,
6566
maxInFlightSegments int,
67+
tpMetrics goutils.TaskProcessorMetricHelper,
6668
) (RecordingSegmentForwarder, error) {
6769
logTags := log.Fields{
6870
"module": "forwarder",
@@ -98,6 +100,7 @@ func NewS3RecordingSegmentForwarder(
98100
maxInFlightSegments+1,
99101
maxInFlightSegments+1,
100102
txWorkerLogTags,
103+
tpMetrics,
101104
)
102105
if err != nil {
103106
log.WithError(err).WithFields(logTags).Error("Failed to define transmission worker")

forwarder/recording_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestS3RecordingSegmentForwarder(t *testing.T) {
3030
}
3131

3232
uut, err := forwarder.NewS3RecordingSegmentForwarder(
33-
utCtxt, storageCfg, mockS3, mockBroadcaster, 2,
33+
utCtxt, storageCfg, mockS3, mockBroadcaster, 2, nil,
3434
)
3535
assert.Nil(err)
3636

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ toolchain go1.22.1
66

77
require (
88
cloud.google.com/go/pubsub v1.31.0
9-
github.com/alwitt/goutils v0.6.4
9+
github.com/alwitt/goutils v0.7.0
1010
github.com/apex/log v1.9.0
1111
github.com/bradfitz/gomemcache v0.0.0-20230611145640-acc696258285
1212
github.com/fsnotify/fsnotify v1.6.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
5050
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
5151
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
5252
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
53-
github.com/alwitt/goutils v0.6.4 h1:8L2JcqQ5qpQZnwZ47GwG4IluuRvKwJg7LRZu2t4bgCQ=
54-
github.com/alwitt/goutils v0.6.4/go.mod h1:vUuby9IQsHG/BCwo2Dd5b29ouHYy3ll2pZMeivHZlNU=
53+
github.com/alwitt/goutils v0.7.0 h1:ADQPYcPHoavyRjlXoDVsiKzQ8r/XxRnY8l7ZR9ar2do=
54+
github.com/alwitt/goutils v0.7.0/go.mod h1:vUuby9IQsHG/BCwo2Dd5b29ouHYy3ll2pZMeivHZlNU=
5555
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
5656
github.com/apex/log v1.9.0 h1:FHtw/xuaM8AgmvDDTI9fiwoAL25Sq2cxojnZICUU8l0=
5757
github.com/apex/log v1.9.0/go.mod h1:m82fZlWIuiWzWP04XCTXmnX0xRkYYbCdYn8jbJeLBEA=

ref/example-control-node-cfg.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ metrics:
3838
# Whether to enable PubSub metrics
3939
enablePubSubMetrics: true
4040

41+
# Whether to enable Task processor operational metrics
42+
enableTaskProcessorMetrics: true
43+
4144
#########################################################################################
4245
# Postgres persistence configuration
4346
postgres:

ref/example-control-node-docker-cfg.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ metrics:
3838
# Whether to enable PubSub metrics
3939
enablePubSubMetrics: true
4040

41+
# Whether to enable Task processor operational metrics
42+
enableTaskProcessorMetrics: true
43+
4144
#########################################################################################
4245
# Postgres persistence configuration
4346
postgres:

ref/example-edge-node-cfg.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ metrics:
3838
# Whether to enable PubSub metrics
3939
enablePubSubMetrics: true
4040

41+
# Whether to enable Task processor operational metrics
42+
enableTaskProcessorMetrics: true
43+
4144
#########################################################################################
4245
# REST API
4346
api:

ref/example-edge-node-docker-cfg.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ metrics:
3838
# Whether to enable PubSub metrics
3939
enablePubSubMetrics: true
4040

41+
# Whether to enable Task processor operational metrics
42+
enableTaskProcessorMetrics: true
43+
4144
#########################################################################################
4245
# Video source being monitored
4346
videoSource:

tracker/monitor.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ after that.
8787
@param reader utils.SegmentReader - HLS video segment data reader
8888
@param forwardSegment SegmentForwardCallback - callback to send out read video segments
8989
@param metrics goutils.MetricsCollector - metrics framework client
90+
@param tpMetrics goutils.TaskProcessorMetricHelper - task processor metrics helper
9091
@returns new SourceHLSMonitor
9192
*/
9293
func NewSourceHLSMonitor(
@@ -98,6 +99,7 @@ func NewSourceHLSMonitor(
9899
reader utils.SegmentReader,
99100
forwardSegment SegmentForwardCallback,
100101
metrics goutils.MetricsCollector,
102+
tpMetrics goutils.TaskProcessorMetricHelper,
101103
) (SourceHLSMonitor, error) {
102104
logTags := log.Fields{
103105
"module": "tracker",
@@ -117,7 +119,9 @@ func NewSourceHLSMonitor(
117119
}
118120

119121
// Support worker
120-
worker, err := goutils.GetNewTaskProcessorInstance(parentContext, "source-monitor", 4, logTags)
122+
worker, err := goutils.GetNewTaskProcessorInstance(
123+
parentContext, "source-monitor", 4, logTags, tpMetrics,
124+
)
121125
if err != nil {
122126
log.WithError(err).WithFields(logTags).Error("Unable to define support worker")
123127
return nil, err

tracker/monitor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestSourceHLSMonitor(t *testing.T) {
5555

5656
// Define SourceHLSMonitor
5757
uut, err := tracker.NewSourceHLSMonitor(
58-
utCtxt, testSource, conns, trackingWindow, testCache, mockSegReader, receiveSegCB, nil,
58+
utCtxt, testSource, conns, trackingWindow, testCache, mockSegReader, receiveSegCB, nil, nil,
5959
)
6060
assert.Nil(err)
6161

utils/segment_fetch.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ NewSegmentReader define new SegmentReader
6464
@param maxSegReadTime time.Duration - max time allowed to completed a segment read
6565
@param s3 S3Client - S3 client for operating against the S3 server
6666
@param metrics goutils.MetricsCollector - metrics framework client
67+
@param tpMetrics goutils.TaskProcessorMetricHelper - task processor metrics helper
6768
@return new SegmentReader
6869
*/
6970
func NewSegmentReader(
@@ -72,13 +73,14 @@ func NewSegmentReader(
7273
maxSegReadTime time.Duration,
7374
s3 S3Client,
7475
metrics goutils.MetricsCollector,
76+
tpMetrics goutils.TaskProcessorMetricHelper,
7577
) (SegmentReader, error) {
7678
logTags := log.Fields{
7779
"module": "utils",
7880
"component": "hls-video-segment-reader",
7981
}
8082
workers, err := goutils.GetNewTaskDemuxProcessorInstance(
81-
parentContext, "segment-readers", workerCount, workerCount, logTags,
83+
parentContext, "segment-readers", workerCount, workerCount, logTags, tpMetrics,
8284
)
8385
if err != nil {
8486
log.WithError(err).WithFields(logTags).Error("Unable to define worker thread pool")

utils/segment_fetch_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestReadingSegmentFromFile(t *testing.T) {
2626

2727
mockS3 := mocks.NewS3Client(t)
2828

29-
uut, err := utils.NewSegmentReader(utCtxt, 2, 0, mockS3, nil)
29+
uut, err := utils.NewSegmentReader(utCtxt, 2, 0, mockS3, nil, nil)
3030
assert.Nil(err)
3131

3232
// Define test file
@@ -86,7 +86,7 @@ func TestReadingSegmentFromS3(t *testing.T) {
8686

8787
mockS3 := mocks.NewS3Client(t)
8888

89-
uut, err := utils.NewSegmentReader(utCtxt, 2, time.Minute, mockS3, nil)
89+
uut, err := utils.NewSegmentReader(utCtxt, 2, time.Minute, mockS3, nil, nil)
9090
assert.Nil(err)
9191

9292
// Prepare result callback

0 commit comments

Comments
 (0)