Skip to content

Commit df2b791

Browse files
authored
Merge branch 'TAK-Product-Center:main' into main
2 parents 4361692 + a4e4453 commit df2b791

File tree

188 files changed

+11040
-6629
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

188 files changed

+11040
-6629
lines changed
1.7 KB
Binary file not shown.
529 KB
Binary file not shown.

src/federation-common/src/main/java/io/grpc/internal/ServerCallImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private void sendHeadersInternal(Metadata headers) {
151151
// Don't check if sendMessage has been called, since it requires that sendHeaders was already
152152
// called.
153153
sendHeadersCalled = true;
154-
stream.writeHeaders(headers);
154+
stream.writeHeaders(headers, !getMethodDescriptor().getType().serverSendsOneMessage());
155155
}
156156

157157
@Override

src/federation-common/src/main/java/tak/server/federation/GuardedStreamHolder.java

+9
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,15 @@ public void throwDeadlineExceptionToClient() {
306306
}
307307
}
308308

309+
public void throwCanceledExceptionToClient() {
310+
try {
311+
if (clientStream != null)
312+
clientStream.onError(new StatusRuntimeException(Status.CANCELLED));
313+
} catch (Exception e) {
314+
logger.warn("exception sending StatusRuntimeException - CANCELLED to client", e);
315+
}
316+
}
317+
309318
public void throwPermissionDeniedToClient() {
310319
try {
311320
if (clientStream != null)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tak.server.federation.hub;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class FederationHubIgniteConfig {
7+
8+
private int ignitePoolSize = -1;
9+
private int ignitePoolSizeMultiplier = 2;
10+
11+
public int getIgnitePoolSize() {
12+
return ignitePoolSize;
13+
}
14+
public void setIgnitePoolSize(int ignitePoolSize) {
15+
this.ignitePoolSize = ignitePoolSize;
16+
}
17+
public int getIgnitePoolSizeMultiplier() {
18+
return ignitePoolSizeMultiplier;
19+
}
20+
public void setIgnitePoolSizeMultiplier(int ignitePoolSizeMultiplier) {
21+
this.ignitePoolSizeMultiplier = ignitePoolSizeMultiplier;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return "FederationHubIgniteConfig [ignitePoolSize=" + ignitePoolSize + ", ignitePoolSizeMultiplier="
27+
+ ignitePoolSizeMultiplier + "]";
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package tak.server.federation.hub;
2+
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.CancellationException;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Future;
8+
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.ScheduledExecutorService;
10+
import java.util.concurrent.ScheduledThreadPoolExecutor;
11+
import java.util.concurrent.ThreadFactory;
12+
import java.util.concurrent.ThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
18+
19+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
20+
21+
import io.netty.channel.EventLoopGroup;
22+
import io.netty.channel.epoll.Epoll;
23+
import io.netty.channel.epoll.EpollEventLoopGroup;
24+
import io.netty.channel.nio.NioEventLoopGroup;
25+
26+
public class FederationHubResources {
27+
private static final int POOL_SIZE_INITIAL = 1;
28+
29+
private static final int NUM_AVAIL_CORES = Runtime.getRuntime().availableProcessors();
30+
31+
private static final int DEFAULT_POOL_MAX = NUM_AVAIL_CORES;
32+
33+
private static final int POOL_SIZE_MAX = DEFAULT_POOL_MAX < 8 ? 8 : DEFAULT_POOL_MAX;
34+
35+
public static final int EXEC_QUEUE_SIZE = 1024 * NUM_AVAIL_CORES;
36+
37+
public static final boolean IS_LOW_CORE = NUM_AVAIL_CORES < 4;
38+
39+
public static final ExecutorService lowCoreExecutorService;
40+
public static final ScheduledExecutorService lowCoreScheduledExecutorService;
41+
public static final ExecutorService lowCoreGrpcExecutorService;
42+
public static final EventLoopGroup lowCoreGrpcEventLoopGroup;
43+
44+
// create a minimal set of executors if low core mode is enabled
45+
static {
46+
if (IS_LOW_CORE) {
47+
lowCoreExecutorService = newExecutorService("federation-hub", POOL_SIZE_INITIAL, DEFAULT_POOL_MAX);
48+
lowCoreScheduledExecutorService = newScheduledExecutor("federation-hub-scheduled", DEFAULT_POOL_MAX);
49+
lowCoreGrpcExecutorService = newGrpcThreadPoolExecutor("federation-hub-grpc", POOL_SIZE_INITIAL, DEFAULT_POOL_MAX);
50+
lowCoreGrpcEventLoopGroup = newGrpcEventLoopGroup("federation-hub-grpc-eventgroup", DEFAULT_POOL_MAX);
51+
} else {
52+
lowCoreExecutorService = null;
53+
lowCoreScheduledExecutorService = null;
54+
lowCoreGrpcExecutorService = null;
55+
lowCoreGrpcEventLoopGroup = null;
56+
}
57+
}
58+
59+
public static final ExecutorService rolExecutor = !IS_LOW_CORE ? newGrpcThreadPoolExecutor("rol-federation-hub-executor", POOL_SIZE_INITIAL, NUM_AVAIL_CORES) : lowCoreGrpcExecutorService;
60+
61+
public static final ScheduledExecutorService mfdtScheduler = !IS_LOW_CORE ? newScheduledExecutor("mfdt-federation-hub-scheduler", POOL_SIZE_MAX) : lowCoreScheduledExecutorService;
62+
63+
public static final ScheduledExecutorService healthCheckScheduler = !IS_LOW_CORE ? newScheduledExecutor("health-check-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;
64+
65+
public static final ScheduledExecutorService retryScheduler = !IS_LOW_CORE ? newScheduledExecutor("outgoing-connection-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;
66+
67+
public static final ScheduledExecutorService dbRetentionScheduler = !IS_LOW_CORE ? newScheduledExecutor("db-retention-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;
68+
69+
public static final ScheduledExecutorService metricsScheduler = !IS_LOW_CORE ? newScheduledExecutor("metrics-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;
70+
71+
// Bounded Executor pool for federation grpc server and channel builders
72+
public static final ExecutorService federationGrpcExecutor = !IS_LOW_CORE ? newGrpcThreadPoolExecutor("grpc-federation-hub-executor", POOL_SIZE_INITIAL, NUM_AVAIL_CORES) : lowCoreGrpcExecutorService;
73+
74+
// Bounded worker pool for federation grpc server and channel builders
75+
public static final EventLoopGroup federationGrpcWorkerEventLoopGroup = !IS_LOW_CORE ? newGrpcEventLoopGroup("grpc-federation-hub-worker", NUM_AVAIL_CORES) : lowCoreGrpcEventLoopGroup;
76+
77+
78+
public static ExecutorService newExecutorService(String name, int initialPoolSize, int maxPoolSize) {
79+
80+
return newExecutorService(name, initialPoolSize, maxPoolSize, EXEC_QUEUE_SIZE);
81+
}
82+
83+
private static ExecutorService newExecutorService(String name, int initialPoolSize, int maxPoolSize, int queueSize) {
84+
85+
ThreadFactory threadFactory =
86+
new ThreadFactoryBuilder()
87+
.setNameFormat(name + "-%1$d")
88+
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
89+
.build();
90+
91+
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueSize);
92+
return new FederationHubThreadPoolExecutor(initialPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue, threadFactory);
93+
}
94+
95+
private static ScheduledExecutorService newScheduledExecutor(String name, int size) {
96+
97+
ThreadFactory threadFactory =
98+
new ThreadFactoryBuilder()
99+
.setNameFormat(name + "-%1$d")
100+
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
101+
.build();
102+
103+
return new ScheduledThreadPoolExecutor(size, threadFactory);
104+
}
105+
106+
public static ThreadPoolTaskExecutor websocketExecutor() {
107+
108+
ThreadFactory threadFactory =
109+
new ThreadFactoryBuilder()
110+
.setNameFormat("federation-hub-socket-%1$d")
111+
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
112+
.build();
113+
114+
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
115+
taskExecutor.setCorePoolSize(POOL_SIZE_INITIAL);
116+
taskExecutor.setMaxPoolSize(POOL_SIZE_MAX);
117+
taskExecutor.setQueueCapacity(EXEC_QUEUE_SIZE);
118+
taskExecutor.setAllowCoreThreadTimeOut(true);
119+
taskExecutor.setKeepAliveSeconds(120);
120+
taskExecutor.setThreadFactory(threadFactory);
121+
122+
return taskExecutor;
123+
}
124+
125+
private static EventLoopGroup newGrpcEventLoopGroup(String name, int maxPoolSize) {
126+
ThreadFactory threadFactory =
127+
new ThreadFactoryBuilder()
128+
.setNameFormat(name + "-%1$d")
129+
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
130+
.setDaemon(true)
131+
.build();
132+
133+
if (Epoll.isAvailable()) {
134+
return new EpollEventLoopGroup(maxPoolSize, threadFactory);
135+
} else {
136+
return new NioEventLoopGroup(maxPoolSize, threadFactory);
137+
}
138+
}
139+
140+
private static ExecutorService newGrpcThreadPoolExecutor(String name, int initialPoolSize, int maxPoolSize) {
141+
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(EXEC_QUEUE_SIZE);
142+
143+
ThreadFactory threadFactory =
144+
new ThreadFactoryBuilder()
145+
.setNameFormat("websocket-%1$d")
146+
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
147+
.build();
148+
149+
150+
return new ThreadPoolExecutor(initialPoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
151+
}
152+
153+
private static class FederationHubThreadPoolExecutor extends ThreadPoolExecutor {
154+
155+
FederationHubThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
156+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
157+
}
158+
159+
Logger logger = LoggerFactory.getLogger(FederationHubThreadPoolExecutor.class);
160+
161+
public FederationHubThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
162+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
163+
}
164+
165+
@Override
166+
public void afterExecute(Runnable r, Throwable t) {
167+
super.afterExecute(r, t);
168+
// If submit() method is called instead of execute()
169+
if (t == null && r instanceof Future<?>) {
170+
try {
171+
((Future<?>) r).get();
172+
} catch (CancellationException e) {
173+
t = e;
174+
} catch (ExecutionException e) {
175+
t = e.getCause();
176+
} catch (InterruptedException e) {
177+
Thread.currentThread().interrupt();
178+
}
179+
}
180+
if (t != null) {
181+
// Exception occurred
182+
183+
logger.error("Uncaught exception ", t);
184+
}
185+
// can perform cleanup actions here
186+
}
187+
}
188+
189+
private static class FederationHubExceptionHandler implements Thread.UncaughtExceptionHandler {
190+
191+
Logger logger = LoggerFactory.getLogger(FederationHubExceptionHandler.class);
192+
193+
@Override
194+
public void uncaughtException(Thread thread, Throwable t) {
195+
logger.error("Uncaught exception", t);
196+
}
197+
}
198+
}

src/federation-common/src/main/java/tak/server/federation/hub/FederationHubUtils.java

+77
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,61 @@
11
package tak.server.federation.hub;
22

3+
import java.io.FileInputStream;
4+
import java.io.FileNotFoundException;
5+
import java.io.IOException;
36
import java.util.Arrays;
47
import java.util.Collections;
58

9+
import org.apache.ignite.IgniteCheckedException;
10+
import org.apache.ignite.configuration.ClientConnectorConfiguration;
611
import org.apache.ignite.configuration.IgniteConfiguration;
712
import org.apache.ignite.failure.NoOpFailureHandler;
13+
import org.apache.ignite.internal.util.typedef.internal.U;
814
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
915
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
1016
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import com.fasterxml.jackson.core.JsonParseException;
21+
import com.fasterxml.jackson.databind.JsonMappingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
24+
import com.google.common.base.Strings;
1125

1226
public class FederationHubUtils {
27+
private static final Logger logger = LoggerFactory.getLogger(FederationHubUtils.class);
1328

1429
public static IgniteConfiguration getIgniteConfiguration(String profile, boolean isClient) {
30+
FederationHubIgniteConfig igniteConfig = null;
31+
32+
String igniteFile = System.getProperty("fedhub.ignite.config");
33+
if (Strings.isNullOrEmpty(igniteFile)) {
34+
igniteFile = "/opt/tak/federation-hub/configs/ignite.yml";
35+
logger.info("Ignite config file not supplied. Assigning default to: " + igniteFile);
36+
} else {
37+
logger.info("Ignite Config file supplied: " + igniteFile);
38+
}
39+
40+
try {
41+
igniteConfig = new FederationHubUtils().loadIgniteConfig(igniteFile);
42+
logger.info("Loaded ignite config from file");
43+
} catch (Exception e) {
44+
logger.info("Ignite config not found, generating default one");
45+
// failed to load file, use defaults
46+
igniteConfig = new FederationHubIgniteConfig();
47+
}
48+
1549
IgniteConfiguration conf = new IgniteConfiguration();
50+
51+
String defaultWorkDir = "/opt/tak/federation-hub";
52+
try {
53+
defaultWorkDir = U.defaultWorkDirectory();
54+
} catch (IgniteCheckedException e) {
55+
logger.error(" error getting Ignite work dir, default to /opt/tak/federation-hub ", e);
56+
}
57+
58+
conf.setWorkDirectory(defaultWorkDir + "/" + profile + "-tmp-work");
1659

1760
String address = FederationHubConstants.FEDERATION_HUB_IGNITE_HOST + ":" +
1861
FederationHubConstants.NON_MULTICAST_DISCOVERY_PORT + ".." +
@@ -44,7 +87,41 @@ public static IgniteConfiguration getIgniteConfiguration(String profile, boolean
4487
profile));
4588

4689
conf.setFailureHandler(new NoOpFailureHandler());
90+
91+
int poolSize;
92+
// dynamic
93+
if (igniteConfig.getIgnitePoolSize() < 0) {
94+
poolSize = Math.min(Runtime.getRuntime().availableProcessors() * igniteConfig.getIgnitePoolSizeMultiplier(), 1024);
95+
} else {
96+
poolSize = igniteConfig.getIgnitePoolSize();
97+
}
98+
99+
if (isClient) {
100+
ClientConnectorConfiguration ccc = conf.getClientConnectorConfiguration();
101+
ccc.setThreadPoolSize(poolSize);
102+
}
103+
104+
conf.setSystemThreadPoolSize(poolSize + 1);
105+
conf.setPublicThreadPoolSize(poolSize);
106+
conf.setQueryThreadPoolSize(poolSize);
107+
conf.setServiceThreadPoolSize(poolSize);
108+
conf.setStripedPoolSize(poolSize);
109+
conf.setDataStreamerThreadPoolSize(poolSize);
110+
conf.setRebalanceThreadPoolSize(poolSize);
47111

48112
return conf;
49113
}
114+
115+
private FederationHubIgniteConfig loadIgniteConfig(String configFile)
116+
throws JsonParseException, JsonMappingException, FileNotFoundException, IOException {
117+
if (getClass().getResource(configFile) != null) {
118+
// It's a resource.
119+
return new ObjectMapper(new YAMLFactory()).readValue(getClass().getResourceAsStream(configFile),
120+
FederationHubIgniteConfig.class);
121+
}
122+
123+
// It's a file.
124+
return new ObjectMapper(new YAMLFactory()).readValue(new FileInputStream(configFile),
125+
FederationHubIgniteConfig.class);
126+
}
50127
}

src/federation-common/src/main/java/tak/server/federation/hub/broker/FederationHubBroker.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ public interface FederationHubBroker {
1313
FederationHubBrokerMetrics getFederationHubBrokerMetrics();
1414
List<String> getGroupsForNode(String federateId);
1515
void deleteGroupCa(String groupId);
16+
void disconnectFederate(String connectionId);
1617
Map<String, X509Certificate> getCAsFromFile();
18+
byte[] getSelfCaFile();
1719
}

0 commit comments

Comments
 (0)