@@ -23,16 +23,13 @@ package arachne
23
23
import (
24
24
coreLog "log"
25
25
"os"
26
- "sync"
27
26
"time"
28
27
29
- "github.com/uber/arachne/collector"
30
28
"github.com/uber/arachne/config"
31
29
d "github.com/uber/arachne/defines"
32
- "github.com/uber/arachne/internal/ip"
33
30
"github.com/uber/arachne/internal/log"
34
- "github.com/uber/arachne/internal/tcp"
35
31
"github.com/uber/arachne/internal/util"
32
+ "github.com/uber/arachne/pkg/engine"
36
33
37
34
"go.uber.org/zap"
38
35
)
@@ -87,115 +84,72 @@ func Run(ec *config.Extended, opts ...Option) {
87
84
logger .Error ("error initializing stats" , zap .Error (err ))
88
85
}
89
86
90
- // Hold raw socket connection for IPv4 packets
91
- var connIPv4 * ip.Conn
87
+ engineCallbacks := newEngineCallbacks (& gl , logger )
88
+
89
+ engine := engine .NewEngine (& gl .Engine , logger , engineCallbacks , sr , engineModeFromCLIConfig (gl .CLI ), sigC )
92
90
93
91
logger .Info ("Starting up arachne" )
94
92
95
- for {
96
- var (
97
- err error
98
- currentDSCP ip.DSCPValue
99
- dnsWg sync.WaitGroup
100
- finishedCycleUpload sync.WaitGroup
101
- )
102
-
103
- // Channels to tell goroutines to terminate
104
- killC := new (util.KillChannels )
105
-
106
- // If Orchestrator mode enabled, fetch JSON configuration file, otherwise try
107
- // to retrieve default local file
108
- err = config .FetchRemoteList (& gl , d .MaxNumRemoteTargets , d .MaxNumSrcTCPPorts ,
109
- d .MinBatchInterval , d .HTTPResponseHeaderTimeout , d .OrchestratorRESTConf , sigC , logger )
110
- if err != nil {
111
- break
112
- }
113
- logger .Debug ("Global JSON configuration" , zap .Any ("configuration" , gl .RemoteConfig ))
93
+ engine .Run ()
114
94
115
- if len (gl .Remotes ) == 0 {
116
- logger .Debug ("No targets to be echoed have been specified" )
117
- apply (& gl , ReceiverOnlyMode (true ))
118
- }
95
+ // Clean-up
96
+ sr .Close ()
97
+ util .RemovePID (gl .App .PIDPath , logger )
119
98
120
- configRefresh := time .NewTicker (gl .RemoteConfig .PollOrchestratorInterval .Success )
121
-
122
- if gl .RemoteConfig .ResolveDNS && ! * gl .CLI .ReceiverOnlyMode {
123
- // Refresh DNS resolutions
124
- dnsRefresh := time .NewTicker (d .DNSRefreshInterval )
125
- dnsWg .Add (1 )
126
- killC .DNSRefresh = make (chan struct {})
127
- config .ResolveDNSTargets (gl .Remotes , gl .RemoteConfig , dnsRefresh , & dnsWg ,
128
- killC .DNSRefresh , logger )
129
- dnsWg .Wait ()
130
- logger .Debug ("Remotes after DNS resolution include" ,
131
- zap .Int ("count" , len (gl .Remotes )),
132
- zap .Any ("remotes" , gl .Remotes ))
133
- }
99
+ logger .Info ("Exiting arachne" )
134
100
135
- // Channels for Collector to receive Probes and Responses from.
136
- sentC := make (chan tcp.Message , d .ChannelOutBufferSize )
137
- rcvdC := make (chan tcp.Message , d .ChannelInBufferSize )
138
-
139
- // Connection for IPv4 packets
140
- if connIPv4 == nil {
141
- connIPv4 = ip .NewConn (
142
- d .AfInet ,
143
- gl .RemoteConfig .TargetTCPPort ,
144
- gl .RemoteConfig .InterfaceName ,
145
- gl .RemoteConfig .SrcAddress ,
146
- logger )
147
- }
101
+ os .Exit (0 )
102
+ }
148
103
149
- // Actual echoing is a percentage of the total configured batch cycle duration.
150
- realBatchInterval := time .Duration (float32 (gl .RemoteConfig .BatchInterval ) *
151
- d .BatchIntervalEchoingPerc )
152
- uploadBatchInterval := time .Duration (float32 (gl .RemoteConfig .BatchInterval ) *
153
- d .BatchIntervalUploadStats )
154
- batchEndCycle := time .NewTicker (uploadBatchInterval )
155
- completeCycleUpload := make (chan bool , 1 )
156
-
157
- if ! * gl .CLI .SenderOnlyMode && ! * gl .CLI .ReceiverOnlyMode {
158
- // Start gathering and reporting results.
159
- killC .Collector = make (chan struct {})
160
- collector .Run (& gl , sentC , rcvdC , gl .Remotes , & currentDSCP , sr , completeCycleUpload ,
161
- & finishedCycleUpload , killC .Collector , logger )
162
- }
104
+ type engineCallbacks struct {
105
+ configRefreshTicker * time.Ticker
106
+ gl * config.Global
107
+ logger * log.Logger
108
+ }
163
109
164
- if ! * gl .CLI .SenderOnlyMode {
165
- // Listen for responses or probes from other IPv4 arachne agents.
166
- killC .Receiver = make (chan struct {})
167
- err = tcp .Receiver (connIPv4 , sentC , rcvdC , killC .Receiver , logger )
168
- if err != nil {
169
- logger .Fatal ("IPv4 receiver failed to start" , zap .Error (err ))
170
- }
171
- logger .Debug ("IPv4 receiver now ready..." )
172
- //TODO IPv6 receiver
173
- }
110
+ func newEngineCallbacks (gl * config.Global , logger * log.Logger ) * engineCallbacks {
111
+ return & engineCallbacks {
112
+ gl : gl ,
113
+ logger : logger ,
114
+ }
115
+ }
174
116
175
- if ! * gl .CLI .ReceiverOnlyMode {
176
- logger .Debug ("Echoing..." )
177
- // Start echoing all targets.
178
- killC .Echo = make (chan struct {})
179
- tcp .EchoTargets (gl .Remotes , connIPv4 , gl .RemoteConfig .TargetTCPPort ,
180
- gl .RemoteConfig .SrcTCPPortRange , gl .RemoteConfig .QoSEnabled , & currentDSCP ,
181
- realBatchInterval , batchEndCycle , sentC , * gl .CLI .SenderOnlyMode ,
182
- completeCycleUpload , & finishedCycleUpload , killC .Echo , logger )
183
- }
117
+ func (ec * engineCallbacks ) FetchRemoteList (_ * config.Engine , stopChannel <- chan struct {}, logger * log.Logger ) (<- chan struct {}, error ) {
118
+ err := config .FetchRemoteList (ec .gl , d .MaxNumRemoteTargets , d .MaxNumSrcTCPPorts ,
119
+ d .MinBatchInterval , d .HTTPResponseHeaderTimeout , d .OrchestratorRESTConf , stopChannel , ec .logger )
120
+ if err != nil {
121
+ return nil , err
122
+ }
123
+
124
+ ec .configRefreshTicker = time .NewTicker (ec .gl .RemoteConfig .PollOrchestratorInterval .Success )
184
125
126
+ ch := make (chan struct {})
127
+
128
+ go func () {
185
129
select {
186
- case <- configRefresh .C :
187
- util .CleanUpRefresh (killC , * gl .CLI .ReceiverOnlyMode ,
188
- * gl .CLI .SenderOnlyMode , gl .RemoteConfig .ResolveDNS )
189
- log .ResetLogFiles (gl .App .Logging .OutputPaths , d .LogFileSizeMaxMB , d .LogFileSizeKeepKB , logger )
190
- logger .Info ("Refreshing target list file, if needed" )
191
- configRefresh .Stop ()
192
- case <- sigC :
193
- logger .Debug ("Received SIG" )
194
- configRefresh .Stop ()
195
- util .CleanUpAll (killC , * gl .CLI .ReceiverOnlyMode , * gl .CLI .SenderOnlyMode ,
196
- gl .RemoteConfig .ResolveDNS , connIPv4 , gl .App .PIDPath , sr , logger )
197
- logger .Info ("Exiting" )
198
- os .Exit (0 )
130
+ case <- ec .configRefreshTicker .C :
131
+ ch <- struct {}{}
132
+ }
133
+ }()
134
+
135
+ return ch , err
136
+ }
137
+
138
+ func (ec * engineCallbacks ) FetchRemoteListNeeded () {
139
+ ec .configRefreshTicker .Stop ()
140
+ log .ResetLogFiles (ec .gl .App .Logging .OutputPaths , d .LogFileSizeMaxMB , d .LogFileSizeKeepKB , ec .logger )
141
+ }
142
+
143
+ func (ec * engineCallbacks ) Stopping () {
144
+ ec .configRefreshTicker .Stop ()
145
+ }
146
+
147
+ func engineModeFromCLIConfig (conf * config.CLIConfig ) engine.EngineMode {
148
+ if * conf .ReceiverOnlyMode {
149
+ if * conf .SenderOnlyMode {
150
+ return engine .EngineSendReceiveMode
199
151
}
152
+ return engine .EngineReceiveOnlyMode
200
153
}
154
+ return engine .EngineSendOnlyMode
201
155
}
0 commit comments