Java / Spring处理Spring批处理作业请求异步

发布时间:2020-07-07 16:09

当前,我在应用程序中公开了一个弹簧批处理作业的剩余端点。 但是,请求不是异步安排的。作业完成后,MyResponse对象中将以批处理状态提供响应。

@RestController
@RequestMapping("/test")
public class TestController {

    private MyProcessor processor;
    private RequestDataRepo repo;

    public TestController(final MyProcessor processor, final RequestDataRepo repo) {
        this.feedProcessor = feedProcessor;
        this.repo = repo;
    }

    @PostMapping(value = "/kickOfJob", consumes = MediaType.APPLICATION_JSON_VALUE)
    public HttpEntity<MyReponse> trigger(@Valid @RequestBody MyRequest request) {

        final String trackingId = UUID.randomUUID().toString();

        final RequestEntity entity = RequestEntity.builder()
                .trackingId(trackingId)
                .name(request.getName())
                .build();

        this.repo.save(entity);

        MyReponse response = processor.process(request, trackingId);

        return new ResponseEntity<>(response, CREATED);
    }
}

我想异步处理请求,即使用trackingId对象中的MyResponse直接将响应返回给调用方,并以初始状态进行回复。想法是客户端稍后将调用另一个端点以检查状态。

我需要帮助实现异步处理。我创建了以下服务,其中包含一个阻止队列,用于保存作业提交和线程。如何将处理与RestController分离?在将请求保存到db并将初始响应发送回之后,我希望将任务提交到队列中。

不幸的是,我在这里停留在如何提交任务并将其从队列中取出并在后台处理的问题上。完成后,应使用新状态更新数据库。在启动时,它应该能够从db中读取并将任务重新提交到队列进行处理。假设容器在处理任务期间死亡。

PS我更喜欢为我的应用程序提供自定义线程池服务。

不胜感激:)

@Service
@Slf4j
public class BatchJobPublisherService {

    private final BlockingQueue<MyRequest> jobQueue;
    private final ExecutorService executorService;

    public BatchJobPublisherService() {
        this.jobQueue = new LinkedBlockingDeque<>();
        this.executorService = Executors.newFixedThreadPool(10);
    }
    
    /**
    
    
    
    **/
    
}

public class MyResponse  {
    private final String name;
    private String trackingId;
    private JobStatus jobStatus;
}


public class Processor {

    private JobLauncher jobLauncher;
    private JobExplorer jobExplorer;
    private JobLocator jobLocator;

    //Constructor


    public Processor process(MyRequest request, String trackingId) {
        //get job, build params here
        jobLauncher.run(job, parameters);
        return createAResponse(request, trackingId, jobExecution);
    }
    
}   
回答1