电商网站怎么做优化,做淘宝必备的网站,网站美工工作步骤是什么,新思维网站转载自 你真的很熟分布式和事务吗#xff1f;
微吐槽
hello,world.
不想了#xff0c;我等码农#xff0c;还是看看怎么来处理分布式系统中的事务这个老大难吧#xff01;
本文略长#xff0c;读者需要有一定耐心#xff0c;如果你是高级码农或者架构师级别#xf…转载自 你真的很熟分布式和事务吗
微吐槽
hello,world.
不想了我等码农还是看看怎么来处理分布式系统中的事务这个老大难吧
本文略长读者需要有一定耐心如果你是高级码农或者架构师级别你可以跳过。
本文注重实战或者实现不涉及CAP略提ACID。
本文适合基础分布式程序员
1. 本文会涉及集群中节点的failover和recover问题.
2. 本文会涉及事务及不透明事务的问题.
3. 本文会提到微博和tweeter并引出一个大数据问题.
由于分布式这个话题太大事务这个话题也太大我们从一个集群的一个小小节点开始谈起。 集群中存活的节点与同步
分布式系统中如何判断一个节点node是否存活
kafka这样认为
1. 此节点和zookeeper能喊话。Keep sessions with zookeeper through heartbeats.
2. 此节点如果是个从节点必须能够尽可能忠实地反映主节点的数据变化。 也就是说必须能够在主节点写了新数据后及时复制这些变化的数据所谓及时不能拉下太多哦。
那么符合上面两个条件的节点就可以认为是存活的也可以认为是同步的in-sync。
关于第1点大家对心跳都很熟悉那么我们可以这样认为某个节点不能和zookeeper喊话了 zookeeper-node: var timer new timer() .setInterval(10sec) .onTime(slave-nodes,function(slave-nodes){ slave-nodes.forEach( node - { boolean isAlive node.heartbeatACK(15sec); if(!isAlive) { node.numNotAlive 1; if(node.numNotAlive 3) { node.declareDeadOrFailed(); slave-nodes.remove(node); //回调也可 leader-node-app.notifyNodeDeadOrFailed(node) } }else node.numNotAlive 0; }); }); timer.run(); //你可以回调也可以像下面这样简单的计时判断 leader-node-app: var timer new timer() .setInterval(10sec) .onTime(slave-nodes,function(slave-nodes){ slave-nodes.forEach(node - { if(node.isDeadOrFailed) { //node不能和zookeeper喊话了 } }); }); timer.run(); 关于第二点要稍微复杂点了怎么搞呢
来这么分析 数据 messages. 操作 op-log. 偏移 position/offset. // 1. 先考虑messages // 2. 再考虑log的postion或者offset // 3. 考虑msg和off都记录在同源数据库或者存储设备上.(database or storage-device.) var timer new timer() .setInterval(10sec) .onTime(slave-nodes,function(nodes){ var core-of-cpu 8; //嫌慢就并发呗 mod hash go! nodes.groupParallel(core-of-cpu) .forEach(node - { boolean nodeSucked false; if(node.ackTimeDiff 30sec) { //30秒内没有回复node卡住了 nodeSucked true; } if(node.logOffsetDiff 100) { //node复制跟不上了差距超过100条数据 nodeSucked true; } if(nodeSucked) { //总之node“死”掉了其实到底死没死谁知道呢network-error在分布式系统中或者节点失败这个事情是正常现象. node.declareDeadOrFailed(); //不和你玩啦集群不要你了 nodes.remove(node); //该怎么处理呢抛个事件吧. fire-event-NodeDeadOrFailed(node); } }); }); timer.run(); 上面的节点的状态管理一般由zookeeper来做leader或者master节点也会维护那么点状态。
那么应用中的leader或者master节点只需要从zookeeper拉状态就可以同时上面的实现是不是一定最佳呢不是的而且多数操作可以合起来但为了描述节点是否存活这个事儿咱们这么写没啥问题。
节点死掉、失败、不同步了咋处理呢
好嘛终于说到failover和recover了那failover比较简单因为还有其它的slave节点在不影响数据读取。
1. 同时多个slave节点失败了 没有100%的可用性.数据中心和机房瘫痪、网络电缆切断、hacker入侵删了你的根总之你rp爆表了.
2. 如果主节点失败了那master-master不行嘛 keep-alived或者LVS或者你自己写failover吧。 高可用架构HA又是个大件儿了此文不展开了。
我们来关注下recover方面的东西这里把视野打开点不仅关注slave节点重启后追log来同步数据我们看下在实际应用中数据请求包括读、写、更新失败怎么办
大家可能都会说重试retry呗、重放replay呗或者干脆不管了呗 行都行这些都是策略但具体怎么个搞法你真的清楚了 一个bigdata问题
我们先摆个探讨的背景 问题消息流比如微博的微博真绕源源不断地流进我们的应用中要处理这些消息有个需求是这样的 Reach is the number of unique people exposed to a URL on Twitter. 那么统计一下3小时内的本条微博url的reach总数。 怎么解决呢 把某时间段内转发过某条微博url的人拉出来把这些人的粉丝拉出来去掉重复的人然后求总数就是要求的reach。 为了简单我们忽略掉日期先看看这个方法行不行 /** --------------------------------- * 1. 求出转发微博(url)的大V. * __________________________________*/ 方法 getUrlToTweetersMap(String url_id) SQL /* 数据库A表url_user存储了转发某url的user */ SELECT url_user.user_id as tweeter_id FROM url_user WHERE url_user.url_id ${url_id} 返回 [user_1,...,user_m] /** --------------------------------- * 2. 求出大V的粉丝 * __________________________________*/ 方法 : getFollowers(String tweeter_id); SQL : /* 数据库B */ SELECT users.id as user_id FROM users WHERE users.followee_id ${tweeter_id} 返回tweeter的粉丝 /** --------------------------------- * 3. 求出Reach * __________________________________*/ var url queryArgs.getUrl(); var tweeters getUrlToTweetersMap(); var result new HashMapString,Integer(); tweeters.forEach(t - { // 你可以批量in 并发读来优化下面方法的性能 var followers getFollowers(t.tweeter_id); followers.forEach(f - { //hash去重 result.put(f.user_id,1); }); }); //Reach return result.size(); 顶呱呱无论如何求出了Reach啊
其实这又引出了一个很重要的问题也是很多大谈框架、设计、模式却往往忽视的问题性能和数据库建模的关系。
1. 数据量有多大 不知道读者有木有对这个问题的数据库I/O有点想法或者虎躯一震呢 Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples. 在上面的数据库设计中避免了JOIN为了提高求大V粉丝的性能可以将一批大V作为batch/bulk然后多个batch并发读誓死搞死数据库。 这里将微博到转发者表所在的库与粉丝库分离如果数据更大怎么办 库再分表... OK假设你已经非常熟悉传统关系型数据库的分库分表及数据路由读路径的聚合、写路径的分发、或者你对于sharding技术也很熟悉、或者你良好的结合了HBase的横向扩展能力并有一致性策略来解决其二级索引问题. 总之存储和读取的问题假设你已经解决了那么分布式计算呢 2. 微博这种应用人与人之间的关系成图状网你怎么建模存储而不仅仅对应这个问题比如 某人的好友的好友可能和某人有几分相熟
看看用storm怎么来解决分布式计算并提供流式计算的能力 // url到大V - 数据库1 TridentState urlToTweeters topology.newStaticState(getUrlToTweetersState()); // 大V到粉丝 - 数据库2 TridentState tweetersToFollowers topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream(reach) .stateQuery(urlToTweeters, new Fields(args), new MapGet(), new Fields(tweeters)) .each(new Fields(tweeters), new ExpandList(), new Fields(tweeter)) .shuffle() /* 大V的粉丝很多所以需要分布式处理*/ .stateQuery(tweetersToFollowers, new Fields(tweeter), new MapGet(), new Fields(followers)) .parallelismHint(200) /* 粉丝很多所以需要高并发 */ .each(new Fields(followers), new ExpandList(), new Fields(follower)) .groupBy(new Fields(follower)) .aggregate(new One(), new Fields(one)) /* 去重 */ .parallelismHint(20) .aggregate(new Count(), new Fields(reach)); /* 计算reach数 */ 最多处理一次At most once
回到主题引出上面的例子一是为了引出一个有关分布式存储计算的问题二是透漏这么点意思
码农就应该关注设计和实现的东西比如Jay Kreps是如何发明Kafka这个轮子的 : ]
如果你还是码农级别咱来务点实吧前面我们说到recover节点恢复的问题那么我们恢复几个东西
基本的 节点状态 节点数据
本篇从数据上来讨论下这个问题为使问题再简单点我们考虑写数据的场景如果我们用write-ahead-log的方式来保证数据复制和一致性那么我们会怎么处理一致性问题呢
1. 主节点有新数据写入
2. 从节点追log,准备复制这批新数据。从节点做两件事 (1) 把数据的id偏移写入log; (2) 正要处理数据本身从节点挂了。
那么根据上文的节点存活条件这个从节点挂了这件事被探测到了从节点由维护人员手动或者其自己恢复了那么在加入集群和小伙伴们继续玩耍之前它要同步自己的状态和数据。
问题来了 如果根据log内的数据偏移来同步数据那么因为这个节点在处理数据之前就把偏移写好了可是那批数据lost-datas没有得到处理如果追log之后的数据来同步那么那批数据lost-datas就丢了。 在这种情况下就叫作数据最多处理一次也就是说数据会丢失。 最少处理一次At least once
好吧丢失数据不能容忍那么我们换种方式来处理
1. 主节点有新数据写入
2. 从节点追log,准备复制这批新数据。从节点做两件事 (1) 先处理数据 (2) 正要把数据的id偏移写入log从节点挂了。
问题又来了 如果从节点追log来同步数据那么因为那批数据duplicated-datas被处理过了而数据偏移没有反映到log中如果这样追会导致这批数据重复。 这种场景从语义上来讲就是数据最少处理一次意味着数据处理会重复。 仅处理一次Exactly once Transaction
好吧数据重复也不能容忍要求挺高啊。
大家都追求的强一致性保证这里是最终一致性怎么来搞呢
换句话说在更新数据的时候事务能力如何保障呢
假设一批数据如下 // 新到数据 { transactionId:4 urlId:99 reach:5 } 现在要更新这批数据到库里或者log里那么原来的情况是 // 老数据 { transactionId3 urlId:99 reach:3 } 如果说可以保证如下三点
1. 事务ID的生成是强有序的隔离性串行
2. 同一个事务ID对应的一批数据相同幂等性多次操作一个结果
3. 单条数据会且仅会出现在某批数据中一致性无遗漏无重复
那么放心大胆的更新好了 // 更新后数据 { transactionId4 urlId:99 //3 5 8 reach:8 } 注意到这个更新是ID偏移和数据一起更新的那么这个操作靠什么来保证原子性。 你的数据库不提供原子性后文略有提及。 这里是更新成功了。如果更新的时候节点挂了那么库里或者log里的id偏移不写数据也不处理等节点恢复就可以放心去同步然后加入集群玩耍了。
所以说要保证数据仅处理一次还是挺困难的吧
上面的保障“仅处理一次”这个语义的实现有什么问题呢
性能问题。 这里已经使用了batch策略来减少到库或磁盘的Round-Trip Time那么这里的性能问题是什么呢 考虑一下采用master-master架构来保证主节点的可用性但是一个主节点失败了到另一个主节点主持工作是需要时间的。 假设从节点正在同步啪主节点挂了因为要保证仅处理一次的语义所以原子性发挥作用失败回滚然后从主节点拉失败的数据你不能就近更新因为这批数据可能已经变化了或者你根本没缓存本批数据结果是什么呢 老主节点挂了 新的主节点还没启动所以这次事务就卡在这里直到数据同步的源——主节点可以响应请求。 如果不考虑性能就此作罢这也不是什么大事。
你似乎意犹未尽来吧看看“银弹”是什么 Opaque-Transaction
现在我们来追求这样一种效果 某条数据在一批数据中这批数据对应着一个事务很可能会失败但是它会在另一批数据中成功。 换句话说一批数据的事务ID一定相同。 来看看例子吧老数据不变只是多了个字段prevReach。 // 老数据 { transactionId3 urlId:99 //注意这里多了个字段表示之前的reach的值 prevReach:2 reach:3 } // 新到数据 { transactionId:4 urlId:99 reach:5 } 这种情况新事务的ID更大、更靠后表明新事务可以执行还等什么直接更新更新后数据如下 // 新到数据 { transactionId:4 urlId:99 //注意这里更新为之前的值 prevReach:3 //3 5 8 reach:8 } 现在来看下另外的情况 // 老数据 { transactionId3 urlId:99 prevReach:2 reach:3 } // 新到数据 { //注意事务ID为3和老数据中的事务ID相同 transactionId:3 urlId:99 reach:5 } 这种情况怎么处理是跳过吗因为新数据的事务ID和库里或者log里的事务ID相同按事务要求这次数据应该已经处理过了跳过
不这种事不能靠猜的想想我们有的几个性质其中关键一点就是 给定一批数据它们所属的事务ID相同。 仔细体会下上面那句话和下面这句话的差别 给定一个事务ID任何时候其所关联的那批数据相同。 我们应该这么做考虑到新到数据的事务ID和存储中的事务ID一致所以这批数据可能被分别或者异步处理了但是这批数据对应的事务ID永远是同一个那么即使这批数据中的A部分先处理了由于大家都是一个事务ID那么A部分的前值是可靠的。
所以我们将依靠prevReach而不是Reach的值来更新 // 更新后数据 { transactionId:3 urlId:99 //这个值不变 prevReach:2 //2 5 7 reach:7 } 你发现了什么呢
不同的事务ID导致了不同的值
1. 当事务ID为4大于存储中的事务ID3Reach更新为35 8.
2. 当事务ID为3等于存储中的事务ID3Reach更新为25 7.
这就是Opaque Transaction。
这种事务能力是最强的了可以保证事务异步提交。所以不用担心被卡住了如果说集群中 Transaction 数据是分批处理的每个事务ID对应一批确定、相同的数据. 保证事务ID的产生是强有序的. 保证分批的数据不重复、不遗漏. 如果事务失败数据源丢失那么后续事务就卡住直到数据源恢复. Opaque-Transaction 数据是分批处理的每批数据有确定而唯一的事务ID. 保证事务ID的产生是强有序的. 保证分批的数据不重复、不遗漏. 如果事务失败数据源丢失不影响后续事务除非后续事务的数据源也丢了.
其实这个全局ID的设计也是门艺术 冗余关联表的ID以减少join做到O(1)取ID. 冗余日期long型字段以避免order by. 冗余过滤字段以避免无二级索引HBase的尴尬. 存储mod-hash的值以方便分库、分表后应用层的数据路由书写.
这个内容也太多话题也太大就不在此展开了。
你现在知道twitter的snowflake生成全局唯一且有序的ID的重要性了。 两阶段提交
现在用zookeeper来做两阶段提交已经是入门级技术所以也不展开了。
如果你的数据库不支持原子操作那么考虑两阶段提交吧。 结语
To be continued.