Skip to content

Commit e32e7a9

Browse files
committed
[INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module.
1 parent fccece5 commit e32e7a9

File tree

19 files changed

+1441
-185
lines changed

19 files changed

+1441
-185
lines changed

inlong-sort-standalone/pom.xml

+3-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
<plugin.assembly.version>3.2.0</plugin.assembly.version>
4545
<pulsar.version>2.7.2</pulsar.version>
4646
<junit.version>4.13</junit.version>
47+
<powermock.version>2.0.2</powermock.version>
4748
<guava.version>19.0</guava.version>
4849
<skipTests>false</skipTests>
4950
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -85,13 +86,13 @@
8586
<dependency>
8687
<groupId>org.powermock</groupId>
8788
<artifactId>powermock-module-junit4</artifactId>
88-
<version>2.0.2</version>
89+
<version>${powermock.version}</version>
8990
<scope>test</scope>
9091
</dependency>
9192
<dependency>
9293
<groupId>org.powermock</groupId>
9394
<artifactId>powermock-api-mockito2</artifactId>
94-
<version>2.0.2</version>
95+
<version>${powermock.version}</version>
9596
<scope>test</scope>
9697
</dependency>
9798
<dependency>

inlong-sort-standalone/sort-standalone-common/pom.xml

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
3-
license agreements. See the NOTICE file distributed with this work for additional
4-
information regarding copyright ownership. The ASF licenses this file to
5-
you under the Apache License, Version 2.0 (the "License"); you may not use
6-
this file except in compliance with the License. You may obtain a copy of
7-
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
8-
by applicable law or agreed to in writing, software distributed under the
9-
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
10-
OF ANY KIND, either express or implied. See the License for the specific
11-
language governing permissions and limitations under the License. -->
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
1220

1321
<project xmlns="http://maven.apache.org/POM/4.0.0"
1422
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java

-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ private static SortClusterConfigHolder get() {
7878
}
7979
} catch (Throwable t) {
8080
LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
81-
LOG.error(t.getMessage(), t);
8281
}
8382
if (instance.loader == null) {
8483
instance.loader = new ClassResourceSortClusterConfigLoader();

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java

-108
Original file line numberDiff line numberDiff line change
@@ -104,112 +104,4 @@ public SortClusterConfig getData() {
104104
public void setData(SortClusterConfig data) {
105105
this.data = data;
106106
}
107-
//
108-
// /**
109-
// * generateTdbankConfig
110-
// *
111-
// * @return
112-
// */
113-
// public static SortClusterConfig generateTdbankConfig() {
114-
// SortClusterConfig clusterConfig = new SortClusterConfig();
115-
// clusterConfig.setClusterName("tdbankv3-sz-sz1");
116-
// //
117-
// List<SortTaskConfig> sortTasks = new ArrayList<>();
118-
// clusterConfig.setSortTasks(sortTasks);
119-
// SortTaskConfig taskConfig = new SortTaskConfig();
120-
// sortTasks.add(taskConfig);
121-
// taskConfig.setName("sid_tdbank_atta6th_v3");
122-
// taskConfig.setType(SortType.TQTDBANK);
123-
// //
124-
// Map<String, String> sinkParams = new HashMap<>();
125-
// taskConfig.setSinkParams(sinkParams);
126-
// sinkParams.put("b_pcg_venus_szrecone_124_153_utf8", "10.56.15.195:46801,10.56.15.212:46801,"
127-
// + "10.56.15.220:46801,10.56.15.221:46801,"
128-
// + "10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801,"
129-
// + "10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801,"
130-
// + "10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801,"
131-
// + "10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801,"
132-
// + "10.56.84.17:46801");
133-
// //
134-
// List<Map<String, String>> idParams = new ArrayList<>();
135-
// Map<String, String> idParam = new HashMap<>();
136-
// idParams.add(idParam);
137-
// taskConfig.setIdParams(idParams);
138-
// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
139-
// idParam.put(Constants.INLONG_STREAM_ID, "");
140-
// idParam.put(TdbankConfig.KEY_BID, "b_pcg_venus_szrecone_124_153_utf8");
141-
// idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046");
142-
// idParam.put(TdbankConfig.KEY_DATA_TYPE, TdbankConfig.DATA_TYPE_ATTA_TEXT);
143-
// return clusterConfig;
144-
// }
145-
//
146-
// /**
147-
// * generateCdmqConfig
148-
// *
149-
// * @return
150-
// */
151-
// public static SortClusterConfig generateCdmqConfig() {
152-
// SortClusterConfig clusterConfig = new SortClusterConfig();
153-
// clusterConfig.setClusterName("cdmqv3-sz-sz1");
154-
// //
155-
// List<SortTaskConfig> sortTasks = new ArrayList<>();
156-
// clusterConfig.setSortTasks(sortTasks);
157-
// SortTaskConfig taskConfig = new SortTaskConfig();
158-
// sortTasks.add(taskConfig);
159-
// taskConfig.setName("sid_cdmq_kg_videorequest_v3");
160-
// taskConfig.setType(SortType.CDMQ);
161-
// //
162-
// Map<String, String> sinkParams = new HashMap<>();
163-
// taskConfig.setSinkParams(sinkParams);
164-
// sinkParams.put("cdmqAccessPoint", "cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033");
165-
// sinkParams.put("cdmqClusterId", "kg_videorequest");
166-
// sinkParams.put("clientId", "p_video_atta_196");
167-
// sinkParams.put("batchSize", "122880");
168-
// sinkParams.put("maxRequestSize", "8388608");
169-
// sinkParams.put("lingerMs", "150");
170-
// //
171-
// List<Map<String, String>> idParams = new ArrayList<>();
172-
// Map<String, String> idParam = new HashMap<>();
173-
// idParams.add(idParam);
174-
// taskConfig.setIdParams(idParams);
175-
// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
176-
// idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046");
177-
// return clusterConfig;
178-
// }
179-
//
180-
// /**
181-
// * main
182-
// *
183-
// * @param args
184-
// */
185-
// public static void main(String[] args) {
186-
// // tdbank
187-
// {
188-
// SortClusterConfig config = generateTdbankConfig();
189-
// String configString = JSON.toJSONString(config, false);
190-
// System.out.println("tdbank:" + configString);
191-
// String md5 = DigestUtils.md5Hex(configString);
192-
// SortClusterResponse response = new SortClusterResponse();
193-
// response.setResult(true);
194-
// response.setErrCode(SUCC);
195-
// response.setMd5(md5);
196-
// response.setData(config);
197-
// String responseString = JSON.toJSONString(response, true);
198-
// System.out.println("tdbank responseString:" + responseString);
199-
// }
200-
// // cdmq
201-
// {
202-
// SortClusterConfig config = generateCdmqConfig();
203-
// String configString = JSON.toJSONString(config, false);
204-
// System.out.println("cdmq:" + configString);
205-
// String md5 = DigestUtils.md5Hex(configString);
206-
// SortClusterResponse response = new SortClusterResponse();
207-
// response.setResult(true);
208-
// response.setErrCode(SUCC);
209-
// response.setMd5(md5);
210-
// response.setData(config);
211-
// String responseString = JSON.toJSONString(response, true);
212-
// System.out.println("cdmq responseString:" + responseString);
213-
// }
214-
// }
215107
}

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
*/
2323
public enum CacheType {
2424

25-
TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
25+
TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), UNKNOWN("n");
2626

2727
private final String value;
2828

@@ -65,6 +65,6 @@ public static CacheType convert(String value) {
6565
return v;
6666
}
6767
}
68-
return N;
68+
return UNKNOWN;
6969
}
7070
}

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
*/
2323
public enum DataType {
2424

25-
TEXT("text"), PB("pb"), JCE("jce"), N("n");
25+
TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n");
2626

2727
private final String value;
2828

@@ -65,6 +65,6 @@ public static DataType convert(String value) {
6565
return v;
6666
}
6767
}
68-
return N;
68+
return UNKNOWN;
6969
}
7070
}

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public enum SortType {
2525

2626
HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), THTDBANK(
27-
"thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n");
27+
"thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), UNKNOWN("n");
2828

2929
private final String value;
3030

@@ -67,6 +67,6 @@ public static SortType convert(String value) {
6767
return v;
6868
}
6969
}
70-
return N;
70+
return UNKNOWN;
7171
}
7272
}

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java

-22
Original file line numberDiff line numberDiff line change
@@ -60,26 +60,4 @@ public static String getClassNamePrefix(String className, int layer) {
6060
String namePrefix = className.substring(0, index);
6161
return namePrefix;
6262
}
63-
64-
// /**
65-
// * main
66-
// * @param args
67-
// */
68-
// public static void main(String[] args) {
69-
// int layer = 3;
70-
// String className = "";
71-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
72-
// className = "ccc";
73-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
74-
// className = "org.ccc";
75-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
76-
// className = "org.apache.ccc";
77-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
78-
// className = "org.apache.inlong.ccc";
79-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
80-
// className = "org.apache.inlong.sort.ccc";
81-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
82-
// className = "org.apache.inlong.sort.standalone.ccc";
83-
// System.out.println(className + ":" + getClassNamePrefix(className, layer));
84-
// }
8563
}
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
3-
license agreements. See the NOTICE file distributed with this work for additional
4-
information regarding copyright ownership. The ASF licenses this file to
5-
you under the Apache License, Version 2.0 (the "License"); you may not use
6-
this file except in compliance with the License. You may obtain a copy of
7-
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
8-
by applicable law or agreed to in writing, software distributed under the
9-
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
10-
OF ANY KIND, either express or implied. See the License for the specific
11-
language governing permissions and limitations under the License. -->
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
1220

1321
<project xmlns="http://maven.apache.org/POM/4.0.0"
1422
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -26,34 +34,13 @@
2634
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2735
<compiler.source>1.8</compiler.source>
2836
<compiler.target>1.8</compiler.target>
29-
<junit.version>4.13</junit.version>
30-
<guava.version>19.0</guava.version>
31-
<skipTests>false</skipTests>
3237
</properties>
3338

3439
<dependencies>
3540
<dependency>
36-
<groupId>com.google.guava</groupId>
37-
<artifactId>guava</artifactId>
38-
<version>${guava.version}</version>
39-
</dependency>
40-
<dependency>
41-
<groupId>org.powermock</groupId>
42-
<artifactId>powermock-module-junit4</artifactId>
43-
<version>2.0.2</version>
44-
<scope>test</scope>
45-
</dependency>
46-
<dependency>
47-
<groupId>org.powermock</groupId>
48-
<artifactId>powermock-api-mockito2</artifactId>
49-
<version>2.0.2</version>
50-
<scope>test</scope>
51-
</dependency>
52-
<dependency>
53-
<groupId>junit</groupId>
54-
<artifactId>junit</artifactId>
55-
<version>${junit.version}</version>
56-
<scope>test</scope>
41+
<groupId>org.apache.inlong</groupId>
42+
<artifactId>sort-standalone-common</artifactId>
43+
<version>${project.version}</version>
5744
</dependency>
5845
</dependencies>
5946
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.sort.standalone;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.flume.conf.FlumeConfiguration;
24+
import org.apache.flume.node.AbstractConfigurationProvider;
25+
import org.slf4j.Logger;
26+
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
27+
28+
/**
29+
*
30+
* PropertiesConfigurationProvider
31+
*/
32+
public class PropertiesConfigurationProvider extends
33+
AbstractConfigurationProvider {
34+
35+
public static final Logger LOG = InlongLoggerFactory.getLogger(PropertiesConfigurationProvider.class);
36+
37+
private final Map<String, String> flumeConf;
38+
39+
/**
40+
* PropertiesConfigurationProvider
41+
*
42+
* @param agentName
43+
* @param flumeConf
44+
*/
45+
public PropertiesConfigurationProvider(String agentName, Map<String, String> flumeConf) {
46+
super(agentName);
47+
this.flumeConf = flumeConf;
48+
}
49+
50+
/**
51+
* getFlumeConfiguration
52+
*
53+
* @return
54+
*/
55+
@Override
56+
public FlumeConfiguration getFlumeConfiguration() {
57+
try {
58+
return new FlumeConfiguration(flumeConf);
59+
} catch (Exception e) {
60+
LOG.error("exception catch:" + e.getMessage(), e);
61+
}
62+
return new FlumeConfiguration(new HashMap<String, String>());
63+
}
64+
}

0 commit comments

Comments
 (0)