|
25 | 25 |
|
26 | 26 | import org.apache.storm.Config;
|
27 | 27 | import org.apache.storm.DaemonConfig;
|
| 28 | +import org.apache.storm.blobstore.BlobStore; |
| 29 | +import org.apache.storm.blobstore.KeySequenceNumber; |
| 30 | +import org.apache.storm.blobstore.LocalFsBlobStore; |
| 31 | +import org.apache.storm.cluster.IStormClusterState; |
28 | 32 | import org.apache.storm.generated.InvalidTopologyException;
|
| 33 | +import org.apache.storm.generated.KeyNotFoundException; |
29 | 34 | import org.apache.storm.generated.StormTopology;
|
| 35 | +import org.apache.storm.metric.StormMetricsRegistry; |
| 36 | +import org.apache.storm.nimbus.ILeaderElector; |
| 37 | +import org.apache.storm.nimbus.NimbusInfo; |
| 38 | +import org.apache.storm.scheduler.INimbus; |
30 | 39 | import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
|
31 | 40 | import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
|
32 | 41 | import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
|
33 | 42 | import org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
|
| 43 | +import org.apache.storm.security.auth.IGroupMappingServiceProvider; |
34 | 44 | import org.apache.storm.testing.TestWordSpout;
|
| 45 | +import org.apache.storm.thrift.TException; |
35 | 46 | import org.apache.storm.topology.TopologyBuilder;
|
36 | 47 | import org.apache.storm.utils.ServerUtils;
|
37 | 48 | import org.apache.storm.utils.Time;
|
| 49 | +import org.junit.jupiter.api.BeforeEach; |
38 | 50 | import org.junit.jupiter.api.Test;
|
| 51 | +import org.mockito.Mock; |
| 52 | +import org.mockito.MockedConstruction; |
| 53 | +import org.mockito.MockitoAnnotations; |
39 | 54 |
|
40 | 55 | import static org.junit.jupiter.api.Assertions.assertEquals;
|
| 56 | +import static org.junit.jupiter.api.Assertions.assertThrows; |
41 | 57 | import static org.junit.jupiter.api.Assertions.assertTrue;
|
42 | 58 | import static org.junit.jupiter.api.Assertions.fail;
|
43 | 59 | import static org.junit.jupiter.api.Assertions.assertNull;
|
| 60 | +import static org.mockito.ArgumentMatchers.any; |
| 61 | +import static org.mockito.ArgumentMatchers.eq; |
| 62 | +import static org.mockito.Mockito.doThrow; |
| 63 | +import static org.mockito.Mockito.mock; |
| 64 | +import static org.mockito.Mockito.mockConstruction; |
| 65 | +import static org.mockito.Mockito.never; |
| 66 | +import static org.mockito.Mockito.verify; |
| 67 | +import static org.mockito.Mockito.when; |
| 68 | + |
| 69 | +class NimbusTest { |
| 70 | + private static final String BLOB_FILE_KEY = "file-key"; |
| 71 | + |
| 72 | + @Mock |
| 73 | + private StormMetricsRegistry metricRegistry; |
| 74 | + @Mock |
| 75 | + private INimbus iNimbus; |
| 76 | + @Mock |
| 77 | + private IStormClusterState stormClusterState; |
| 78 | + @Mock |
| 79 | + private NimbusInfo nimbusInfo; |
| 80 | + @Mock |
| 81 | + private LocalFsBlobStore localBlobStore; |
| 82 | + @Mock |
| 83 | + private ILeaderElector leaderElector; |
| 84 | + @Mock |
| 85 | + private IGroupMappingServiceProvider groupMapper; |
| 86 | + |
| 87 | + private Nimbus nimbus; |
| 88 | + |
| 89 | + @BeforeEach |
| 90 | + public void setUp() throws Exception { |
| 91 | + MockitoAnnotations.openMocks(this).close(); |
| 92 | + |
| 93 | + Map<String, Object> conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10); |
| 94 | + nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, localBlobStore, leaderElector, groupMapper, metricRegistry); |
| 95 | + } |
44 | 96 |
|
45 |
| -public class NimbusTest { |
46 | 97 | @Test
|
47 | 98 | public void testMemoryLoadLargerThanMaxHeapSize() {
|
48 | 99 | // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
|
@@ -112,4 +163,41 @@ public void validateNoTopoConfOverrides() {
|
112 | 163 | Map<String, Object> normalized = Nimbus.normalizeConf(conf, topoConf, topology);
|
113 | 164 | assertNull(normalized.get(Config.STORM_WORKERS_ARTIFACTS_DIR));
|
114 | 165 | }
|
| 166 | + |
| 167 | + @Test |
| 168 | + void testCreateStateInZookeeper() throws TException { |
| 169 | + nimbus.createStateInZookeeper(BLOB_FILE_KEY); |
| 170 | + |
| 171 | + verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); |
| 172 | + } |
| 173 | + |
| 174 | + @Test |
| 175 | + void testCreateStateInZookeeperWithoutLocalFsBlobStoreInstanceShouldNotCreate() throws Exception { |
| 176 | + BlobStore blobStore = mock(BlobStore.class); |
| 177 | + Map<String, Object> conf = Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10); |
| 178 | + nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, blobStore, leaderElector, groupMapper, metricRegistry); |
| 179 | + |
| 180 | + nimbus.createStateInZookeeper(BLOB_FILE_KEY); |
| 181 | + |
| 182 | + verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); |
| 183 | + } |
| 184 | + |
| 185 | + @Test |
| 186 | + void testCreateStateInZookeeperWhenFailToSetupBlobWithRuntimeExceptionThrowsRuntimeException() { |
| 187 | + doThrow(new RuntimeException("Failed to setup blob")).when(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); |
| 188 | + |
| 189 | + assertThrows(RuntimeException.class, () -> nimbus.createStateInZookeeper(BLOB_FILE_KEY)); |
| 190 | + verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); |
| 191 | + } |
| 192 | + |
| 193 | + @Test |
| 194 | + void testCreateStateInZookeeperWhenKeyNotFoundHandlesException() throws Exception { |
| 195 | + try (MockedConstruction<KeySequenceNumber> keySequenceNumber = mockConstruction(KeySequenceNumber.class, (mock, context) -> |
| 196 | + when(mock.getKeySequenceNumber(any())).thenThrow(new KeyNotFoundException("Failed to setup blob")))) { |
| 197 | + nimbus.createStateInZookeeper(BLOB_FILE_KEY); |
| 198 | + |
| 199 | + verify(keySequenceNumber.constructed().get(0)).getKeySequenceNumber(any()); |
| 200 | + verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), any()); |
| 201 | + } |
| 202 | + } |
115 | 203 | }
|
0 commit comments