mirror of
				https://github.com/continew-org/continew-admin.git
				synced 2025-10-26 18:58:37 +08:00 
			
		
		
		
	chore: 任务调度代码优化
This commit is contained in:
		| @@ -22,11 +22,9 @@ import io.netty.handler.timeout.ReadTimeoutHandler; | ||||
| import io.netty.handler.timeout.WriteTimeoutHandler; | ||||
| import lombok.RequiredArgsConstructor; | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import org.apache.http.client.config.RequestConfig; | ||||
| import org.springframework.beans.factory.annotation.Value; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.context.annotation.Configuration; | ||||
| import org.springframework.http.MediaType; | ||||
| import org.springframework.http.client.reactive.ReactorClientHttpConnector; | ||||
| import org.springframework.http.codec.json.Jackson2JsonDecoder; | ||||
| import org.springframework.http.codec.json.Jackson2JsonEncoder; | ||||
| @@ -50,9 +48,9 @@ import top.continew.admin.job.constant.JobConstants; | ||||
|  * @author Charles7c | ||||
|  * @since 2024/6/25 18:03 | ||||
|  */ | ||||
| @Slf4j | ||||
| @Configuration | ||||
| @RequiredArgsConstructor | ||||
| @Slf4j | ||||
| public class HttpExchangeConfiguration { | ||||
|  | ||||
|     private final ObjectMapper objectMapper; | ||||
| @@ -81,28 +79,29 @@ public class HttpExchangeConfiguration { | ||||
|     @Bean | ||||
|     public HttpServiceProxyFactory httpServiceProxyFactory() { | ||||
|         HttpClient httpClient = HttpClient.create() | ||||
|                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000) | ||||
|                 .doOnConnected(conn -> { | ||||
|                     conn.addHandlerLast(new ReadTimeoutHandler(10)); | ||||
|                     conn.addHandlerLast(new WriteTimeoutHandler(10)); | ||||
|                 }).wiretap(true); | ||||
|             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000) | ||||
|             .doOnConnected(conn -> { | ||||
|                 conn.addHandlerLast(new ReadTimeoutHandler(10)); | ||||
|                 conn.addHandlerLast(new WriteTimeoutHandler(10)); | ||||
|             }) | ||||
|             .wiretap(true); | ||||
|  | ||||
|         WebClient webClient = WebClient.builder() | ||||
|                 .codecs(config -> config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper))) | ||||
|                 .codecs(config -> config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper))) | ||||
|                 .clientConnector(new ReactorClientHttpConnector(httpClient)) | ||||
|                 .filter(logRequest()) | ||||
|                 .filter(logResponse()) | ||||
|                 .filter((request, next) -> { | ||||
|                     // 设置请求头 | ||||
|                     ClientRequest filtered = ClientRequest.from(request) | ||||
|                             .header(JobConstants.NAMESPACE_ID_HEADER, namespace) | ||||
|                             .header(JobConstants.AUTH_TOKEN_HEADER, jobClient().getToken()) | ||||
|                             .build(); | ||||
|                     return next.exchange(filtered); | ||||
|                 }) | ||||
|                 .baseUrl(baseUrl) | ||||
|                 .build(); | ||||
|             .codecs(config -> config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper))) | ||||
|             .codecs(config -> config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper))) | ||||
|             .clientConnector(new ReactorClientHttpConnector(httpClient)) | ||||
|             .filter(logRequest()) | ||||
|             .filter(logResponse()) | ||||
|             .filter((request, next) -> { | ||||
|                 // 设置请求头 | ||||
|                 ClientRequest filtered = ClientRequest.from(request) | ||||
|                     .header(JobConstants.NAMESPACE_ID_HEADER, namespace) | ||||
|                     .header(JobConstants.AUTH_TOKEN_HEADER, jobClient().getToken()) | ||||
|                     .build(); | ||||
|                 return next.exchange(filtered); | ||||
|             }) | ||||
|             .baseUrl(baseUrl) | ||||
|             .build(); | ||||
|         return HttpServiceProxyFactory.builderFor(WebClientAdapter.create(webClient)).build(); | ||||
|     } | ||||
|  | ||||
| @@ -115,9 +114,9 @@ public class HttpExchangeConfiguration { | ||||
|      * 打印请求日志 | ||||
|      */ | ||||
|     private ExchangeFilterFunction logRequest() { | ||||
|         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { | ||||
|             log.info("---> {} {}", clientRequest.method(), clientRequest.url()); | ||||
|             return Mono.just(clientRequest); | ||||
|         return ExchangeFilterFunction.ofRequestProcessor(request -> { | ||||
|             log.info("---> {} {}", request.method(), request.url()); | ||||
|             return Mono.just(request); | ||||
|         }); | ||||
|     } | ||||
|  | ||||
| @@ -125,11 +124,11 @@ public class HttpExchangeConfiguration { | ||||
|      * 打印响应日志 | ||||
|      */ | ||||
|     private ExchangeFilterFunction logResponse() { | ||||
|         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> clientResponse.bodyToMono(String.class).flatMap(body -> { | ||||
|             log.info("<---  {}", clientResponse.statusCode()); | ||||
|             log.info("Content-Type:{}", clientResponse.headers().contentType().orElse(MediaType.APPLICATION_JSON)); | ||||
|             log.info("body: {}", body); | ||||
|             return Mono.just(ClientResponse.from(clientResponse).body(body).build()); | ||||
|         })); | ||||
|         return ExchangeFilterFunction.ofResponseProcessor(response -> response.bodyToMono(String.class) | ||||
|             .flatMap(body -> { | ||||
|                 log.info("<--- {}", response.statusCode()); | ||||
|                 log.info(body); | ||||
|                 return Mono.just(ClientResponse.from(response).body(body).build()); | ||||
|             })); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -22,10 +22,10 @@ import top.continew.admin.job.api.JobBatchApi; | ||||
| import top.continew.admin.job.api.JobClient; | ||||
| import top.continew.admin.job.model.JobInstanceLogPageResult; | ||||
| import top.continew.admin.job.model.query.JobInstanceLogQuery; | ||||
| import top.continew.admin.job.model.query.JobLogQuery; | ||||
| import top.continew.admin.job.model.query.JobInstanceQuery; | ||||
| import top.continew.admin.job.model.resp.JobLogResp; | ||||
| import top.continew.admin.job.model.query.JobLogQuery; | ||||
| import top.continew.admin.job.model.resp.JobInstanceResp; | ||||
| import top.continew.admin.job.model.resp.JobLogResp; | ||||
| import top.continew.admin.job.service.JobLogService; | ||||
| import top.continew.starter.extension.crud.model.resp.PageResp; | ||||
|  | ||||
| @@ -49,8 +49,8 @@ public class JobLogServiceImpl implements JobLogService { | ||||
|     @Override | ||||
|     public PageResp<JobLogResp> page(JobLogQuery query) { | ||||
|         return jobClient.requestPage(() -> jobBatchApi.page(query.getJobId(), query.getJobName(), query | ||||
|             .getGroupName(), query.getTaskBatchStatus().getValue(), query.getDatetimeRange(), query.getPage(), query | ||||
|                 .getSize())); | ||||
|             .getGroupName(), query.getTaskBatchStatus() != null ? query.getTaskBatchStatus().getValue() : null, query | ||||
|                 .getDatetimeRange(), query.getPage(), query.getSize())); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|   | ||||
| @@ -45,8 +45,8 @@ public class JobServiceImpl implements JobService { | ||||
|  | ||||
|     @Override | ||||
|     public PageResp<JobResp> page(JobQuery query) { | ||||
|         return jobClient.requestPage(() -> jobApi.page(query.getGroupName(), query.getJobName(), query.getJobStatus() | ||||
|             .getValue(), query.getPage(), query.getSize())); | ||||
|         return jobClient.requestPage(() -> jobApi.page(query.getGroupName(), query.getJobName(), query | ||||
|             .getJobStatus() != null ? query.getJobStatus().getValue() : null, query.getPage(), query.getSize())); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|   | ||||
		Reference in New Issue
	
	Block a user