Skip to content

Commit 8de3d99

Browse files
authored
[INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599)
1 parent be7d72c commit 8de3d99

File tree

3 files changed

+7
-3
lines changed

3 files changed

+7
-3
lines changed

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
public class PulsarSource extends StreamSource {
4444

4545
@ApiModelProperty("Pulsar tenant")
46-
private String tenant = "default";
46+
private String tenant = "public";
4747

4848
@ApiModelProperty("Pulsar namespace")
4949
private String namespace;

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java

+3
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ private Map<String, List<StreamSource>> createPulsarSources(
144144
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
145145
String adminUrl = pulsarCluster.getAdminUrl();
146146
String serviceUrl = pulsarCluster.getUrl();
147+
String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT
148+
: pulsarCluster.getTenant();
147149
streamInfoList.forEach(streamInfo -> {
148150
PulsarSource pulsarSource = new PulsarSource();
149151
String streamId = streamInfo.getInlongStreamId();
152+
pulsarSource.setTenant(tenant);
150153
pulsarSource.setSourceName(streamId);
151154
pulsarSource.setNamespace(groupInfo.getMqResource());
152155
pulsarSource.setTopic(streamInfo.getMqResource());

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
231231
List<FieldInfo> fieldInfos = streamFields.stream()
232232
.map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
233233
.collect(Collectors.toList());
234-
String topic = pulsarSource.getTopic();
234+
String fullTopicName =
235+
pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
235236

236237
Format format;
237238
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
@@ -269,7 +270,7 @@ public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
269270
fieldInfos,
270271
null,
271272
Maps.newHashMap(),
272-
topic,
273+
fullTopicName,
273274
adminUrl,
274275
serviceUrl,
275276
format,

0 commit comments

Comments
 (0)