抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

Hello world!

消息队列

异步处理、应用解耦、流量削峰

image-20211121110402937

image-20211121110657527

消息中间件

  1. 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

  2. 消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)

    • 当消息发送者发送消息以后,将由消息代理(安装了消息中间件的服务器)接管,消息代理保证消息传递到指定目的地。
  3. 消息队列主要有两种形式的目的地

    • 队列(queue):点对点消息通信(point-to-point)
    • 主题(topic):发布(publish)/订阅(subscribe)消息通信
  4. 点对点式 :

    • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
    • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
  5. 发布订阅式:

    • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
  6. JMS(Java Message Service)JAVA消息服务:

    • 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
  7. AMQP(Advanced Message Queuing Protocol)

    • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
    • RabbitMQ是AMQP的实现

image-20211121111651609

image-20211121112256879

RabbitMQ

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现

  1. Message

    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序

  3. Exchange

    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

  4. Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走

  5. Binding

    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系

  6. Connection
    网络连接,比如一个TCP连接。

  7. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  8. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  9. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  10. Broker
    表示消息队列服务器实体

image-20211121121330065

image-20211121120230895

安装

https://www.rabbitmq.com/networking.html

docker run -d –name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

docker update rabbitmq –restart=always

4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)

Exchange类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

image-20211121123352595

消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

image-20211121123425049

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

image-20211121123429493

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配一个单词。

spring整合

依赖、配置

1
2
3
4
5
<!--  rabbitMQ  -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
@EnableRabbit   // 开启rabbit
1
2
3
4
5
6
spring:
rabbitmq:
host: 192.168.128.129
port: 5672
virtual-host: /
# 其他

AmqpAdmin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createExchange() {
DirectExchange directExchange = new DirectExchange("test-direct", true, false);
amqpAdmin.declareExchange(directExchange);
log.debug("Exchange[{}]创建成功", directExchange.getName());
}

@Test
public void createQueue() {
Queue queue = new Queue("test-hello");
amqpAdmin.declareQueue(queue);
log.debug("Queue[{}]创建成功", queue.getName());
}

@Test
public void binding() {
Binding binding = new Binding("test-hello", Binding.DestinationType.QUEUE, "test-direct", "test-hello", new HashMap<>());
amqpAdmin.declareBinding(binding);
log.debug("Binding创建成功,目的地[{}]", binding.getDestination());
}

}

RabbitTemplate

1
2
3
4
5
6
7
8
9
10
11
12
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMsg() {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setCouponId(1L);
// 如果消息内容是对象,默认序列化转换,所以对象类要求实现序列化
// 也可以配置默认转为JSON
rabbitTemplate.convertAndSend("test-direct", "test-hello", orderEntity);
log.debug("消息发送成功");
}
1
2
3
4
5
6
7
8
@Configuration
public class OrderRabbitConfig {
// 配置默认MessageConverter为Jackson2JsonMessageConverter(转为JSON)
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

RabbitListener、RabbitHandler

1
2
3
4
5
6
7
8
// 监听队列
@RabbitListener(queues = {"test-hello"})
public void testListener(Message msg, OrderEntity content, Channel channel) {
// msg 包括头+体
// content 可以使用具体类型接收内容
// channel 传输数据通道
log.debug("监听到了消息:" + msg + "; 内容为:" + content);
}
  1. 订单服务启动多个的话,同一个消息只有一个客户端可以收到
  2. 只有一个消息完全处理完之后,才能接收下一个消息
  1. RabbitListener注解可以标在类、方法上(一般类上)
  2. RabbitHandler注解可以标在方法上,可以自动判断方法参数的内容类型(重载,不过方法名不需要一样)

可靠投递

保证消息不丢失,可靠抵达,可以使用事务消息,但是性能会下降,为此引入确认机制

image-20211121141708505

ConfirmCallback

image-20211121141957310

ReturnCallback

image-20211121142049440

Ack消息确认机制

image-20211121142119941

订单服务

image-20211121155213384

环境搭建

老生常谈

订单登录拦截

OrderWebConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class OrderWebConfig implements WebMvcConfigurer {

@Autowired
OrderLoginUserInterceptor loginInterceptor;

/*
* 添加拦截器
* 配置拦截地址
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(loginInterceptor).addPathPatterns("/**");
}
}

OrderLoginUserInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* 登录拦截
*/
@Component
public class OrderLoginUserInterceptor implements HandlerInterceptor {

public static ThreadLocal<MemberEntityVo> loginUser = new ThreadLocal<>();

/*
* 在方法执行之前拦截
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
MemberEntityVo attribute = (MemberEntityVo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null) {
// 已登录
loginUser.set(attribute);
return true;
} else {
request.getSession().setAttribute("msg", "请先登录");
response.sendRedirect("http://auth.gulimall.com/login.html");
return false;
}
}
}

DEBUG

OrderLoginUserInterceptor需写成如下,加入loginUser判断,否则会有cannot get session的问题

debug的时候会进入两次interceptor,然后第二次拿不到session所以就会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
* 登录拦截
*/
@Component
public class OrderLoginUserInterceptor implements HandlerInterceptor {

public static ThreadLocal<MemberEntityVo> loginUser = new ThreadLocal<>();

/*
* 在方法执行之前拦截
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

// /order/order/status/2948294820984028420
String uri = request.getRequestURI();
AntPathMatcher antPathMatcher = new AntPathMatcher();
boolean match = antPathMatcher.match("/order/order/status/**", uri);
boolean match1 = antPathMatcher.match("/payed/notify", uri);
if (match || match1) {
return true;
}

if (loginUser.get() != null) {
return true;
} else {
MemberEntityVo attribute = (MemberEntityVo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null) {
loginUser.set(attribute);
return true;
} else {
//没登录就去登录
request.getSession().setAttribute("msg", "请先进行登录");
response.sendRedirect("http://auth.gulimall.com/login.html");
return false;
}
}


}
}

订单服务 - 确认页

Feign远程调用丢失请求头

丢失请求头

image-20211122102421314

MyFeignConfig

题外话,跟着雷神学我现在也习惯点进源码看一看,而且不会啥也看不懂了,真不戳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* 配置feign
*/
@Configuration
public class MyFeignConfig {
// 配置请求拦截器,防止头丢失
// 默认request拦截器是空,存入一个即可
@Bean("requestInterceptor")
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate requestTemplate) {
// RequestContextHolder利用了ThreadLocal,存放了请求内容
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// 获得request (原请求)
HttpServletRequest request = requestAttributes.getRequest();
// 获得cookie 并存入新请求
requestTemplate.header("Cookie", request.getHeader("Cookie"));
}
};
}
}

异步情况下丢失上下文

image-20211122105832624

1
2
3
4
5
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {
// 在异步请求中共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
}, executor);

Order模块

OrderConfirmVo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 订单确认页模型
*/
@Data
public class OrderConfirmVo {
// 订单防重复令牌
private String orderToken;
// 收货地址表
private List<MemberAddressVo> address;
// 订单商品
private List<OrderItemVo> items;
// 发票
// 优惠
private Integer integration;
private BigDecimal total;
private BigDecimal payPrice;
public BigDecimal getPayPrice() {
return total.subtract(new BigDecimal(integration));
}
public BigDecimal getTotal() {
for (OrderItemVo item : items) {
total = total.add(item.getTotalPrice());
}
return total;
}
}

OrderItemVo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
public class OrderItemVo {
private Long skuId;
private String title;
private String image;
private List<String> skuAttr;
private BigDecimal price;
private Integer count;
private BigDecimal totalPrice;

public BigDecimal getTotalPrice() {
return this.price.multiply(new BigDecimal(this.count));
}
}

OrderWebController

1
2
3
4
5
6
7
8
9
/*
* 前往订单确认页
*/
@GetMapping("/toTrade")
public String toConfirmPage(Model model) {
OrderConfirmVo orderConfirmVo = orderService.confirmOrder();
model.addAttribute("orderConfirmData", orderConfirmVo);
return "confirm";
}

OrderServiceImpl

feign远程调用和页面进入不同,后者会带cookie,前者没有带header所有没有cookie

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    /*
* 确认订单的信息
*/
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
OrderConfirmVo orderConfirmVo = new OrderConfirmVo();
// TODO 设置防重复令牌
// orderConfirmVo.setOrderToken();

// 设置收货地址 (用户id可以从拦截器中的ThreadLocal的loginUser中查找)
CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {
R r = memberFeignService.getAddressListByMemberId(OrderLoginUserInterceptor.loginUser.get().getId());
if (r.getCode() == 0) {
List<MemberAddressVo> memberAddressVoList = r.getData("data", new TypeReference<List<MemberAddressVo>>() {
});
orderConfirmVo.setAddress(memberAddressVoList);
} else {
log.error("member远程调用失败");
}
}, executor);

// 设置购物项 (从redis中获取)
CompletableFuture<Void> itemFuture = CompletableFuture.runAsync(() -> {
List<OrderItemVo> checkedCartItems = cartFeignService.getCheckedCartItems();
orderConfirmVo.setItems(checkedCartItems);
}, executor);

// 设置总价格 (get方法已经写好了)
// 设置优惠价
orderConfirmVo.setIntegration(OrderLoginUserInterceptor.loginUser.get().getIntegration());
// 设置支付价 (get方法已经写好了)

CompletableFuture.allOf(itemFuture, addressFuture).get();
return orderConfirmVo;
}

MemberFeignService

1
2
3
4
5
6
7
8
@FeignClient("gulimall-member")
public interface MemberFeignService {
/**
* 获取对应会员的所有收货地址
*/
@GetMapping("/member/memberreceiveaddress/{memberId}/list")
R getAddressListByMemberId(@PathVariable("memberId") Long memberId);
}

CartFeignService

1
2
3
4
5
6
7
8
@FeignClient("gulimall-cart")
public interface CartFeignService {
/*
* 获得购物车商品
*/
@GetMapping("/checkItems")
List<OrderItemVo> getCheckedCartItems();
}

Member模块

MemberReceiveAddressController

1
2
3
4
5
6
7
8
/**
* 获取对应会员的所有收货地址
*/
@GetMapping("/{memberId}/list")
public R getAddressListByMemberId(@PathVariable("memberId") Long memberId) {
List<MemberReceiveAddressEntity> addressList = memberReceiveAddressService.list(new QueryWrapper<MemberReceiveAddressEntity>().eq("member_id", memberId));
return R.ok().put("data", addressList);
}

Cart模块

CartServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* 获取用户选中的所有商品
*/
@Override
public List<CartItemEntity> getCheckedCartItems() {
// 从redis中根据userId获得cartItem
UserInfoTo userInfoTo = CartInterceptor.threadLocal.get();
// 封装list
List<CartItemEntity> cartItem = getCartItem(CartConstant.CART_PREFIX + userInfoTo.getUserId());
return cartItem.stream().filter(CartItemEntity::getCheck).map((item) -> {
// 存放在redis中的购物项的价格可能会变(冗余字段,用id去查)
// 我这里就不写了
// item.setPrice();
return item;
})
.collect(Collectors.toList());
}

CartController

1
2
3
4
5
6
7
8
9
/*
* 获得购物车商品
*/
@ResponseBody
@GetMapping("/checkItems")
public List<CartItemEntity> getCheckedCartItems() {
List<CartItemEntity> cartItemEntityList = cartService.getCheckedCartItems();
return cartItemEntityList;
}

TODO

怎么说呢,感觉不是很想写网页前台了,现在写给用户的还是小程序 app这种流行,网页基本就是后台管理了,再加上thymeleaf实在是让人难受,就这样吧,不写了,我把重要的业务逻辑记录一下就行。

唉阻碍我的另一个原因是身体状况,突然理解了讲座的那位学长说的话,多锻炼身体,有体力才能继续啊。

接口幂等性

接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的, 不会因为多次点击而产生了副作用; 比如说支付场景, 用户购买了商品支付扣款成功, 但是返回结果的时候网络异常, 此时钱已经扣了, 用户再次点击按钮, 此时会进行第二次扣款, 返回结果成功, 用户查询余额返发现多扣钱了, 流水记录也变成了两条. . . ,这就没有保证接口的幂等性。

哪些情况需要防止

  1. 用户多次点击按钮
  2. 用户页面回退再次提交
  3. 微服务互相调用, 由于网络问题, 导致请求失败。 feign 触发重试机制
  4. 其他业务情况

什么情况下需要幂等

以 SQL 为例, 有些操作是天然幂等的。

SELECT * FROM table WHER id=?, 无论执行多少次都不会改变状态, 是天然的幂等。
UPDATE tab1 SET col1=1 WHERE col2=2, 无论执行成功多少次状态都是一致的, 也是幂等操作。
delete from user where userid=1, 多次操作, 结果一样, 具备幂等性
insert into user(userid,name) values(1,’a’) 如 userid 为唯一主键, 即重复操作上面的业务, 只
会插入一条用户数据, 具备幂等性。

UPDATE tab1 SET col1=col1+1 WHERE col2=2, 每次执行的结果都会发生变化, 不是幂等的。
insert into user(userid,name) values(1,’a’) 如 userid 不是主键, 可以重复, 那上面业务多次操
作, 数据都会新增多条, 不具备幂等性。

token 机制

1、 服务端提供了发送 token 的接口。 我们在分析业务的时候, 哪些业务是存在幂等问题的,
就必须在执行业务前, 先去获取 token, 服务器会把 token 保存到 redis 中。
2、然后调用业务接口请求时, 把 token 携带过去, 一般放在请求头部。
3、 服务器判断 token 是否存在 redis 中, 存在表示第一次请求, 然后删除 token,继续执行业
务。
4、 如果判断 token 不存在 redis 中, 就表示是重复操作, 直接返回重复标记给 client, 这样
就保证了业务代码, 不被重复执行。

危险性:
1、 先删除 token 还是后删除 token;
(1) 先删除可能导致, 业务确实没有执行, 重试还带上之前 token, 由于防重设计导致,
请求还是不能执行。
(2) 后删除可能导致, 业务处理成功, 但是服务闪断, 出现超时, 没有删除 token, 别
人继续重试, 导致业务被执行两边
(3) 我们最好设计为先删除 token, 如果业务调用失败, 就重新获取 token 再次请求。
2、 Token 获取、 比较和删除必须是原子性
(1) redis.get(token) 、 token.equals、 redis.del(token)如果这两个操作不是原子, 可能导
致, 高并发下, 都 get 到同样的数据, 判断都成功, 继续业务并发执行
(2) 可以在 redis 使用 lua 脚本完成这个操作
if redis.call(‘get’, KEYS[1]) == ARGV[1] then return redis.call(‘del’, KEYS[1]) else return 0 end

各种锁机制

数据库悲观锁

select * from xxxx where id = 1 for update;
悲观锁使用时一般伴随事务一起使用, 数据锁定时间可能会很长, 需要根据实际情况选用。
另外要注意的是, id 字段一定是主键或者唯一索引, 不然可能造成锁表的结果, 处理起来会
非常麻烦。

数据库乐观锁

这种方法适合在更新的场景中,
update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1
根据 version 版本, 也就是在操作库存前先获取当前商品的 version 版本号, 然后操作的时候
带上此 version 号。 我们梳理下, 我们第一次操作库存时, 得到 version 为 1, 调用库存服务
version 变成了 2; 但返回给订单服务出现了问题, 订单服务又一次发起调用库存服务, 当订
单服务传如的 version 还是 1, 再执行上面的 sql 语句时, 就不会执行; 因为 version 已经变
为 2 了, where 条件就不成立。 这样就保证了不管调用几次, 只会真正的处理一次。
乐观锁主要使用于处理读多写少的问题

业务层分布式锁

如果多个机器可能在同一时间同时处理相同的数据, 比如多台机器定时任务都拿到了相同数
据处理, 我们就可以加分布式锁, 锁定此数据, 处理完成后释放锁。 获取到锁的必须先判断
这个数据是否被处理过。

各种唯一约束

数据库唯一约束

插入数据, 应该按照唯一索引进行插入, 比如订单号, 相同的订单就不可能有两条记录插入。
我们在数据库层面防止重复。
这个机制是利用了数据库的主键唯一约束的特性, 解决了在 insert 场景时幂等问题。 但主键
的要求不是自增的主键, 这样就需要业务生成全局唯一的主键。
如果是分库分表场景下, 路由规则要保证相同请求下, 落地在同一个数据库和同一表中, 要
不然数据库主键约束就不起效果了, 因为是不同的数据库和表主键不相关。

redis set 防重

很多数据需要处理, 只能被处理一次, 比如我们可以计算数据的 MD5 将其放入 redis 的 set,
每次处理数据, 先看这个 MD5 是否已经存在, 存在就不处理

防重表

使用订单号 orderNo 做为去重表的唯一索引, 把唯一索引插入去重表, 再进行业务操作, 且
他们在同一个事务中。 这个保证了重复请求时, 因为去重表有唯一约束, 导致请求失败, 避
免了幂等问题。 这里要注意的是, 去重表和业务表应该在同一库中, 这样就保证了在同一个
事务, 即使业务操作失败了, 也会把去重表的数据回滚。 这个很好的保证了数据一致性。
之前说的 redis 防重也算

全局请求唯一 id

调用接口时, 生成一个唯一 id, redis 将数据保存到集合中(去重) , 存在即处理过。
可以使用 nginx 设置每一个请求的唯一 id;
proxy_set_header X-Request-Id $request_id;

订单提交 token

添加token

1
2
3
String token = UUID.randomUUID().toString().replace("-", "");
redisTemplate.opsForValue().set(AuthServerConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId(), token, 30, TimeUnit.MINUTES);
confirmVo.setOrderToken(token);

原子验证token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Transaction // 本第事务,只能控制自己服务的事务
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {
confirmVoThreadLocal.set(vo);
SubmitOrderResponseVo response = new SubmitOrderResponseVo();
MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();
response.setCode(0);
//1、验证令牌【令牌的对比和删除必须保证原子性】
//0令牌失败 - 1删除成功
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String orderToken = vo.getOrderToken();
//原子验证令牌和删除令牌
Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()), orderToken);
if (result == 0L) {
//令牌验证失败
response.setCode(1);
return response;
} else {
//令牌验证成功
//下单:去创建订单,验令牌,验价格,锁库存...
//1、创建订单,订单项等信息
OrderCreateTo order = createOrder();
//2、验价
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = vo.getPayPrice();
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
//金额对比
//....
//TODO 3、保存订单
saveOrder(order);
//4、库存锁定。只要有异常回滚订单数据。
// 订单号,所有订单项(skuId,skuName,num)
WareSkuLockVo lockVo = new WareSkuLockVo();
lockVo.setOrderSn(order.getOrder().getOrderSn());
List<OrderItemVo> locks = order.getOrderItems().stream().map(item -> {
OrderItemVo itemVo = new OrderItemVo();
itemVo.setSkuId(item.getSkuId());
itemVo.setCount(item.getSkuQuantity());
itemVo.setTitle(item.getSkuName());
return itemVo;
}).collect(Collectors.toList());
lockVo.setLocks(locks);
//4、远程锁库存
//库存成功了,但是网络原因超时了,订单回滚,库存不滚。

//为了保证高并发。库存服务自己回滚。可以发消息给库存服务;
//库存服务本身也可以使用自动解锁模式 消息
R r = wmsFeignService.orderLockStock(lockVo);
if (r.getCode() == 0) {
//锁成功了
response.setOrder(order.getOrder());

//TODO 5、远程扣减积分 出异常
// int i = 10/0; //订单回滚,库存不滚
//订单创建成功发送消息给MQ
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());

//TODO 6、清除购物车已经下单的商品
return response;
} else {
//锁定失败
String msg = (String) r.get("msg");
throw new NoStockException(msg);
}
} else {
response.setCode(2);
return response;
}

}

分布式事务

本地事务

详见文档

image-20211124185045824

事务的隔离级别

READ UNCOMMITTED(读未提交)
该隔离级别的事务会读到其它未提交事务的数据, 此现象也称之为脏读。

READ COMMITTED( 读提交)
一个事务可以读取另一个已提交的事务, 多次读取会造成不一样的结果, 此现象称为不可重
复读问题, Oracle 和 SQL Server 的默认隔离级别。

REPEATABLE READ( 可重复读)
该隔离级别是 MySQL 默认的隔离级别, 在同一个事务里, select 的结果是事务开始时时间
点的状态, 因此, 同样的 select 操作读到的结果会是一致的, 但是, 会有幻读现象。 MySQL
的 InnoDB 引擎可以通过 next-key locks 机制( 参考下文”行锁的算法”一节) 来避免幻读。

SERIALIZABLE( 序列化)
在该隔离级别下事务都是串行顺序执行的, MySQL 数据库的 InnoDB 引擎会给读操作隐式
加一把读共享锁, 从而避免了脏读、 不可重读复读和幻读问题。

事务的传播行为

1、 PROPAGATION_REQUIRED: 如果当前没有事务, 就创建一个新事务, 如果当前存在事务,
就加入该事务, 该设置是最常用的设置。
2、 PROPAGATION_SUPPORTS: 支持当前事务, 如果当前存在事务, 就加入该事务, 如果当
前不存在事务, 就以非事务执行。
3、 PROPAGATION_MANDATORY: 支持当前事务, 如果当前存在事务, 就加入该事务, 如果
当前不存在事务, 就抛出异常。
4、 PROPAGATION_REQUIRES_NEW: 创建新事务, 无论当前存不存在事务, 都创建新事务。
5、 PROPAGATION_NOT_SUPPORTED: 以非事务方式执行操作, 如果当前存在事务, 就把当
前事务挂起。
6、 PROPAGATION_NEVER: 以非事务方式执行, 如果当前存在事务, 则抛出异常。
7、 PROPAGATION_NESTED: 如果当前存在事务, 则在嵌套事务内执行。 如果当前没有事务,
则执行与 PROPAGATION_REQUIRED 类似的操作

评论




🧡💛💚💙💜🖤🤍