Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-3058][DataProxy] Add some configs while creating Pulsar producer #3059

Merged
merged 1 commit into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class ThirdPartyClusterConfig extends Context {
private static final String ENABLE_BATCH = "enable_batch";
private static final String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
private static final String MAX_PENDING_MESSAGES = "max_pending_messages";
private static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
"max_pending_messages_across_partitions";
private static final String COMPRESSION_TYPE = "compression_type";
private static final String MAX_BATCHING_MESSAGES = "max_batching_messages";
private static final String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
private static final String SINK_THREAD_NUM = "thread_num";
Expand Down Expand Up @@ -93,6 +96,8 @@ public class ThirdPartyClusterConfig extends Context {
private static final boolean DEFAULT_ENABLE_BATCH = true;
private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
private static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 500000;
private static final String DEFAULT_COMPRESSION_TYPE = "NONE";
private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
Expand Down Expand Up @@ -203,6 +208,15 @@ public int getMaxPendingMessages() {
return getInteger(MAX_PENDING_MESSAGES, DEFAULT_MAX_PENDING_MESSAGES);
}

public int getMaxPendingMessagesAcrossPartitions() {
return getInteger(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
}

public String getCompressionType() {
return getString(COMPRESSION_TYPE, DEFAULT_COMPRESSION_TYPE);
}

public int getMaxBatchingMessages() {
return getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
}
Expand All @@ -211,7 +225,7 @@ public long getRetryIntervalWhenSendErrorMs() {
return getLong(RETRY_INTERVAL_WHEN_SEND_ERROR_MILL, DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL);
}

public int getRetyCnt() {
public int getRetryCnt() {
return getInteger(RETRY_CNT, DEFAULT_RETRY_CNT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
* send thread pool
*/
private Thread[] sinkThreadPool;
private int sinkThreadPoolSize;
private PulsarClientService pulsarClientService;
private LinkedBlockingQueue<Event> eventQueue;

Expand Down Expand Up @@ -227,7 +228,16 @@ public void configure(Context context) {
pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
pulsarConfig = configManager.getThirdPartyClusterConfig(); //pulsar common config
commonProperties = configManager.getCommonProperties();
pulsarClientService = new PulsarClientService(pulsarConfig);
if (keepOrder) {
logger.info("This is order pulsar sink!");
sinkThreadPoolSize = 1;
} else {
sinkThreadPoolSize = pulsarConfig.getThreadNum();
}
if (sinkThreadPoolSize <= 0) {
sinkThreadPoolSize = 1;
}
pulsarClientService = new PulsarClientService(pulsarConfig, sinkThreadPoolSize);
boolean enableReportConfigLog =
Boolean.parseBoolean(commonProperties
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,"true"));
Expand Down Expand Up @@ -268,12 +278,8 @@ public void update() {
resendQueue = new LinkedBlockingQueue<EventStat>(badEventQueueSize);

Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
if (keepOrder) {
logger.info("This is order pulsar sink!");
sinkThreadPool = new Thread[1];
} else {
sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
}

sinkThreadPool = new Thread[sinkThreadPoolSize];

eventQueueSize = pulsarConfig.getEventQueueSize();
eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
Expand Down Expand Up @@ -383,16 +389,16 @@ public void start() {
this.canSend = true;
this.canTake = true;

try {
initTopicSet(pulsarClientService,
new HashSet<String>(topicProperties.values()));
} catch (Exception e) {
logger.info("pulsar sink start publish topic fail.", e);
}

for (int i = 0; i < sinkThreadPool.length; i++) {
try {
initTopicSet(pulsarClientService,
new HashSet<String>(topicProperties.values()));
} catch (Exception e) {
logger.info("pulsar sink start publish topic fail.", e);
}
sinkThreadPool[i] = new Thread(new SinkTask(pulsarClientService), getName()
+ "_pulsar_sink_sender-"
+ i);
sinkThreadPool[i] = new Thread(new SinkTask(pulsarClientService, i), getName()
+ "_pulsar_sink_sender-" + i);
sinkThreadPool[i].start();
}
logger.debug("pulsar sink started");
Expand Down Expand Up @@ -736,8 +742,11 @@ class SinkTask implements Runnable {

private PulsarClientService pulsarClientService;

public SinkTask(PulsarClientService pulsarClientService) {
private int poolIndex = 0;

public SinkTask(PulsarClientService pulsarClientService, int poolIndex) {
this.pulsarClientService = pulsarClientService;
this.poolIndex = poolIndex;
}

@Override
Expand Down Expand Up @@ -838,8 +847,8 @@ public void run() {
if (pulsarConfig.getClientIdCache() && clientId != null) {
agentIdCache.put(clientId, System.currentTimeMillis());
}
boolean sendResult = pulsarClientService.sendMessage(topic, event,
PulsarSink.this, es);
boolean sendResult = pulsarClientService.sendMessage(poolIndex, topic,
event, PulsarSink.this, es);
/*
* handle producer is current is null
*/
Expand Down
Loading