@ApiOperation(value = "redis流 添加消息")
@RequestMapping(value="stream/add", method= {RequestMethod.GET})
public String streamAdd() throws Exception{
Map<String,String> map = new HashMap<>();
map.put("name","张三");
map.put("age","33");
redisTemplateString.<String,String>opsForStream().add("myQueue",map);
return "OK";
}
@ApiOperation(value = "redis流 消费者获取消息,并且确认消息")
@RequestMapping(value="stream/get", method= {RequestMethod.GET})
public String streamGet() throws Exception{
Map<String,String> map = new HashMap<>();
map.put("name","张三");
map.put("age","33");
//添加两个消费者组
try{
redisTemplateJson.<String,String>opsForStream().createGroup("myQueue","myGroup1");
redisTemplateJson.<String,String>opsForStream().createGroup("myQueue","myGroup2");
}catch (Exception e){
System.out.println( "消费者组已经存在" );
}
//两个消费者
Consumer consumer1 = Consumer.from("myGroup1", "consumer1");
Consumer consumer2 = Consumer.from("myGroup2", "consumer2");
//读取消息
StreamOffset<Object> myQueue1 = StreamOffset.<Object>create("myQueue", ReadOffset.lastConsumed());
StreamOffset<Object> myQueue2 = StreamOffset.<Object>create("myQueue", ReadOffset.lastConsumed());
List<MapRecord<Object, String, String>> read1 = redisTemplateJson.<String, String>opsForStream().read(consumer1, myQueue1);
List<MapRecord<Object, String, String>> read2 = redisTemplateJson.<String, String>opsForStream().read(consumer2, myQueue2);
System.out.println("消费者组1:" + JSONObject.toJSONString( read1 ));
System.out.println("消费者组2:" + JSONObject.toJSONString( read2 ));
//读取penging消息
PendingMessagesSummary pendingMessagesSummary = redisTemplateJson.<String, String>opsForStream().pending("myQueue", consumer1.getGroup());
Range<String> idRange = pendingMessagesSummary.getIdRange();
System.out.println("没有ack的消息:" +JSONObject.toJSONString( idRange ));
//确认消息
redisTemplateJson.<String, String>opsForStream().acknowledge("myQueue",consumer1.getGroup(), RecordId.of( idRange.getLowerBound().getValue().get() ) );
return "OK";
}
@ApiOperation(value = "检查消费者组是否存在")
@RequestMapping(value="stream/checkGroup", method= {RequestMethod.GET})
public Boolean streamCheckGroup(String groupName) throws Exception{
StreamInfo.XInfoGroups groupInfo = redisTemplateJson.opsForStream().groups("myQueue");
Set<String> groupNames = new HashSet<>();
if( !groupInfo.isEmpty() ){
Iterator<StreamInfo.XInfoGroup> iterator = groupInfo.iterator();
iterator.forEachRemaining( item -> groupNames.add( item.groupName() ) );
}
System.out.println(JSONUtil.toJsonStr( groupNames ));
return groupNames.contains( groupName );
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
所有评论(0)