场景
等待条件发生
比如,有这样一个场景,在 spring boot 工程里面,有一个 controller,他会接受数据并把数据写入 kafka,然后返回写入 kafka 的结果。在调用 send 方法后,会得到一个 ListenableFuture
对象,这个对象可以传入 callback 对象,这是一个异步的过程,需要等待回调执行后,才能将结果返回给客户端。
我们就需要一种机制等待回调事件,这里用的模式如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Object mutex = new Object();
onSuccess/onFailure[callback] { synchronized (mutex) { mutex.notify(); } }
synchronized (mutex) { mutex.wait(); }
return ...;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Controller @RequestMapping("kafka") public class KafkaController { static class Keeper { String result; }
@Resource KafkaTemplate<String, String> template;
@RequestMapping(value = "put", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) @ResponseBody public ResponseEntity<String> check(@RequestBody Message message) throws InterruptedException { ListenableFuture<SendResult<String, String>> f = template.send("example", message.getKey(), message.getMessage()); Object mutex = new Object(); final Keeper keeper = new Keeper(); keeper.result = "unknown"; f.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@SuppressWarnings("NullableProblems") Throwable throwable) { keeper.result = "send message failed"; synchronized (mutex) { mutex.notify(); } }
@Override public void onSuccess(SendResult<String, String> stringStringSendResult) { keeper.result = "send message success"; synchronized (mutex) { mutex.notify(); } } });
synchronized (mutex) { mutex.wait(); }
return ResponseEntity.ok(keeper.result); } }
|