Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom thread pool support #104

Merged
merged 10 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
path: ~/.m2
key: ${{ env.cache-name }}-${{ hashFiles('./log-record-starter/pom.xml') }}
restore-keys: ${{ env.cache-name }}-
- name: Install current version to local repository
run: mvn install -DskipTests -Dgpg.skip
working-directory: ./log-record-core
- name: Test with Maven
working-directory: ./log-record-starter
run: mvn -V --no-transfer-progress test
Expand Down
54 changes: 43 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public Response<T> function(Request request) {
- 支持自动重试和兜底处理:支持配置重试次数和处理失败兜底逻辑`SPI`
- 支持控制切面执行时机(方法执行前后)
- 支持自定义执行成功判断
- 支持非注解方式手动记录日志
- 自定义消息线程池
- 更多特性等你来发掘...

**日志实体(LogDTO)内包含:**
Expand Down Expand Up @@ -347,8 +349,9 @@ public Response<T> function(Request request) {
- [实体类`Diff`](#实体类Diff)
- [日志处理重试次数及兜底函数配置](#日志处理重试次数及兜底函数配置)
- [重复注解](#重复注解)
- [消息分发线程池配置](#消息分发线程池配置)
- [自定义消息线程池](#自定义消息线程池)
- [函数返回值记录开关](#函数返回值记录开关)
- [非注解方式手动记录日志](#非注解方式)
- [操作日志数据表结构推荐](#操作日志数据表结构推荐)
- [让注解支持`IDEA`自动补全](#让注解支持IDEA自动补全)

Expand Down Expand Up @@ -512,7 +515,7 @@ public Response<T> function(Request request) {
}
```

LogRecordContext内部使用TransmittableThreadLocal,在线程池中也可以读取到主线程的ThreadLocal
LogRecordContext内部使用TransmittableThreadLocal实现与主线程的ThreadLocal传递

### 自定义函数

Expand Down Expand Up @@ -795,20 +798,49 @@ public class LogRecordErrorHandlerServiceImpl implements LogRecordErrorHandlerSe

我们还加上了重复注解的支持,可以在一个方法上同时加多个`@OperationLog`,**会保证按照`@OperationLog`从上到下的顺序输出日志**。

### 消息分发线程池配置
### 自定义消息线程池

在组装好`logDTO`后,默认使用线程池对消息进行分发,发送至本地监听函数或者消息队列发送者。

**注意:`logDTO`的组装在切面中,该切面仍然在函数执行的线程中运行。**

可以使用如下配置:
starter提供了如下配置:

```properties
log-record.thread-pool.pool-size=4(线程池核心线程大小 默认为4)
log-record.thread-pool.enabled=true(线程池开关 默认为开启 若关闭则使用主线程进行消息处理发送)
log-record.thread-pool.enabled=true(线程池开关 默认为开启 若关闭则使用业务线程进行消息处理发送)
```

在组装好`logDTO`后,默认会使用线程池对消息进行处理,发送至本地监听函数或者消息队列发送者,也可以通过配置关闭线程池,让主线程执行全部消息处理逻辑。

**注意:`logDTO`的组装逻辑在切面中,该切面仍然在函数执行的线程中运行。**

默认线程池配置如下(拒绝策略为丢弃):

```java
return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
```

此外,还提供了用户传入自定义线程池的方式,用户可自行实现cn.monitor4all.logRecord.thread.ThreadPoolProvider,传入线程池。

示例:

```java
public class CustomThreadPoolProvider implements ThreadPoolProvider {

private static ThreadPoolExecutor EXECUTOR;

private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("custom-log-record-");


private CustomThreadPoolProvider() {
log.info("CustomThreadPoolProvider init");
EXECUTOR = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
}

@Override
public ThreadPoolExecutor buildLogRecordThreadPool() {
return EXECUTOR;
}
}
```

关闭使用线程池后,所有发送由主线程执行,带来的副作用是大量日志并发发送,会降低主线程处理效率。

### 函数返回值记录开关

Expand Down Expand Up @@ -893,7 +925,7 @@ public void testBizIdWithSpEL(String bizId) {

应用之间通过关键操作的日志消息,互相通知。

## 附录:Demo
## Demo

当你觉得用法不熟悉,可以查看单元测试用例,里面有最为详细且最全的使用示例。

Expand Down
2 changes: 1 addition & 1 deletion log-record-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>cn.monitor4all</groupId>
<artifactId>log-record-core</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import cn.monitor4all.logRecord.service.IOperatorIdGetService;
import cn.monitor4all.logRecord.service.LogRecordErrorHandlerService;
import cn.monitor4all.logRecord.thread.LogRecordThreadPool;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import cn.monitor4all.logRecord.util.JsonUtil;
import com.alibaba.ttl.TtlRunnable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -128,7 +127,7 @@ public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
logDTO.setSuccess(true);
}
if (annotation.recordReturnValue() && result != null) {
logDTO.setReturnStr(JSON.toJSONString(result));
logDTO.setReturnStr(JsonUtil.safeToJsonString(result));
}
});
} catch (Throwable throwableAfterFuncSuccess) {
Expand Down Expand Up @@ -305,7 +304,7 @@ private String parseParamToStringOrJson(String spel, StandardEvaluationContext c
Expression msgExpression = parser.parseExpression(spel);
Object obj = msgExpression.getValue(context, Object.class);
if (obj != null) {
return obj instanceof String ? (String) obj : JSON.toJSONString(obj, SerializerFeature.WriteMapNullValue);
return obj instanceof String ? (String) obj : JsonUtil.safeToJsonString(obj);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ private static boolean isJsonArray(Object obj) {
private static Field[] getAllFields(Class<?> type) {
List<Field> fields = new ArrayList<>();
for (Class<?> c = type; c != null && !c.isSynthetic(); c = c.getSuperclass()) {
Collections.addAll(fields, c.getDeclaredFields());
for (Field field : c.getDeclaredFields()) {
if (!field.isSynthetic()) {
fields.add(field);
}
}
}
return fields.toArray(new Field[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import cn.monitor4all.logRecord.constants.LogConstants;
import cn.monitor4all.logRecord.service.DataPipelineService;
import com.alibaba.fastjson.JSON;
import cn.monitor4all.logRecord.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -27,7 +27,7 @@ public class RabbitMqDataPipelineServiceImpl implements DataPipelineService {
@Override
public boolean createLog(LogDTO logDTO) {
log.info("LogRecord RabbitMq ready to send routingKey [{}] LogDTO [{}]", properties.getRabbitMqProperties().getRoutingKey(), logDTO);
rubeExchangeTemplate.convertAndSend(properties.getRabbitMqProperties().getRoutingKey(), JSON.toJSONString(logDTO));
rubeExchangeTemplate.convertAndSend(properties.getRabbitMqProperties().getRoutingKey(), JsonUtil.safeToJsonString(logDTO));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import cn.monitor4all.logRecord.constants.LogConstants;
import cn.monitor4all.logRecord.service.DataPipelineService;
import com.alibaba.fastjson.JSON;
import cn.monitor4all.logRecord.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
Expand All @@ -30,7 +30,7 @@ public class RocketMqDataPipelineServiceImpl implements DataPipelineService {
@Override
public boolean createLog(LogDTO logDTO) {
try {
Message msg = new Message(properties.getRocketMqProperties().getTopic(), properties.getRocketMqProperties().getTag(), (JSON.toJSONString(logDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg = new Message(properties.getRocketMqProperties().getTopic(), properties.getRocketMqProperties().getTag(), (JsonUtil.safeToJsonString(logDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMqProducer.send(msg);
log.info("LogRecord RocketMq send LogDTO [{}] sendResult: [{}]", logDTO, sendResult);
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cn.monitor4all.logRecord.thread;

import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 默认线程池提供者
*/
@Slf4j
public class DefaultThreadPoolProvider implements ThreadPoolProvider {

private final LogRecordProperties logRecordProperties;
private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("log-record-");


public DefaultThreadPoolProvider(LogRecordProperties logRecordProperties) {
this.logRecordProperties = logRecordProperties;
}

@Override
public ThreadPoolExecutor buildLogRecordThreadPool() {
log.info("LogRecordThreadPool init poolSize [{}]", logRecordProperties.getThreadPool().getPoolSize());
int poolSize = logRecordProperties.getThreadPool().getPoolSize();
return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,31 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Component
@ConditionalOnProperty(name = "log-record.thread-pool.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties({LogRecordProperties.class})
public class LogRecordThreadPool {

private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("log-record-");
private final ThreadPoolExecutor logRecordPoolExecutor;

private final ExecutorService LOG_RECORD_POOL_EXECUTOR;

public LogRecordThreadPool(LogRecordProperties logRecordProperties) {
log.info("LogRecordThreadPool init poolSize [{}]", logRecordProperties.getThreadPool().getPoolSize());
int poolSize = logRecordProperties.getThreadPool().getPoolSize();
this.LOG_RECORD_POOL_EXECUTOR = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 操作日志主逻辑线程池
* 提供顺序:用户传入线程池 优先于 通过配置文件创建的默认线程池
*/
public LogRecordThreadPool(LogRecordProperties logRecordProperties, ApplicationContext applicationContext) {
ThreadPoolProvider threadPoolProvider = applicationContext.getBeanProvider(ThreadPoolProvider.class)
.getIfUnique(() -> new DefaultThreadPoolProvider(logRecordProperties));
this.logRecordPoolExecutor = threadPoolProvider.buildLogRecordThreadPool();
}

public ExecutorService getLogRecordPoolExecutor() {
return LOG_RECORD_POOL_EXECUTOR;
public ThreadPoolExecutor getLogRecordPoolExecutor() {
return logRecordPoolExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.monitor4all.logRecord.thread;


import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池提供者
*/
public interface ThreadPoolProvider {

/**
* 提供操作日志处理线程池
*/
ThreadPoolExecutor buildLogRecordThreadPool();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cn.monitor4all.logRecord.util;


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JsonUtil {

public static String safeToJsonString(Object object) {
try {
return JSON.toJSONString(object);
} catch (Exception e) {
log.error("safeToJsonString error, object {}", object, e);
return object.toString();
}
}
}
4 changes: 2 additions & 2 deletions log-record-springboot3-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>cn.monitor4all</groupId>
<artifactId>log-record-springboot3-starter</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
Expand All @@ -35,7 +35,7 @@
<dependency>
<groupId>cn.monitor4all</groupId>
<artifactId>log-record-core</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>
</dependency>

<!-- 单元测试依赖 -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cn.monitor4all.logRecord.springboot3.test;


import cn.monitor4all.logRecord.bean.LogDTO;
import cn.monitor4all.logRecord.springboot3.test.service.OperatorIdGetService;
import cn.monitor4all.logRecord.springboot3.test.service.TestService;
import cn.monitor4all.logRecord.springboot3.LogRecordAutoConfiguration;
import cn.monitor4all.logRecord.springboot3.test.utils.TestHelper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.annotation.PropertySource;
import org.springframework.test.context.ContextConfiguration;

/**
* 单元测试:自定义线程池
*/
@Slf4j
@SpringBootTest
@ContextConfiguration(classes = {
LogRecordAutoConfiguration.class,
OperatorIdGetService.class,
TestService.class,})
@PropertySource("classpath:testCustomThreadPool.properties")
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class OperationLogCustomThreadPoolTest {

@Autowired
private TestService testService;

/**
* 测试:用户传入自定义线程池
*/
@Test
public void testCustomThreadPool() {
TestHelper.addLock("testCustomThreadPool");
testService.testCustomThreadPool();
TestHelper.await("testCustomThreadPool");
LogDTO logDTO = TestHelper.getLogDTO("testCustomThreadPool");

Assertions.assertEquals(logDTO.getBizType(), "testCustomThreadPool");
}

}
Loading
Loading