转发:分布式系统设计自学整理

目录:

  1. GFS

  2. BigT able(SSTable)

  3. Cassandra

  4. MapReduce

  5. Spark

  6. Zookeeper

  7. Kafka

  8. GFS,
    https://pdos.csail.mit.edu/6.824/papers/gfs.pdf
    GFS(Google file system): a distributed file system, 用来存储数据,作为BigTable(SortedString Table的底层),
    也是HDFS的真身。把文件放在很多个disks上,能够满足大量读写需求。
    GFS的目的:

  9. Fault-tolerance: 假设有1000个机器,每个机器每年会坏一次,那么每天就有大概3个机器会挂。如何确保数据安
    全?

  10. Performance: 大量的同时读写,如何处理I/O

  11. Consistency:如果有多个人同时读写,如何确保最后读到的是最新的数据

  12. Efficiency:如何最优使用网络bandwidth来传输数据
    在GFS之前的文件系统:

  13. 每个文件分成blocks (4096byte),每个文件有自己的Metadata,记录文件名/时间/size/和blocks的diskoffset

  14. Master-slave 系统:数据都存放在master上,slave是备份,master挂了,找一个slave顶上去

  15. 缺点:随机访问速度慢,多进程读写需要锁进程,数据存放在一个master严重依赖badnwidth
    GFS的结构:

  16. client,比如SSTable,read/writefile to GFS

  17. Master,这里的master跟一般文件系统的不一样,不存放数据,memory中存放

  18. Chunk servers,存放数据的metadata和一个hashtable[filename][chunk_index] = chunk_server

  19. chunks,和一般的4096 block不同, GFS的数据每一块比较大,default用的是64M,这样metadata就比较小了。
    建立在小数据碎片很小的基础上。

  20. Megadata, 64Byte 每个chunk

  21. Replica: 每个数据存3次,在不同的chunkserver上,以防止chunk server 挂了
    Read:读数据:

  22. Client 发送 filename 到master

  23. 在master 找到这个file的metadata, 然后找到相应的chunk list,最后在hashmap里找相应的chunk serverlist,返
    回给client,这些数据会在client的cache里

  24. Client读chunk server list, 对于每个chunk,对应最多3个server,根据IP地址选择最近的server来读取数据
    *4. 实际上会有一个 version的信息,在每个数据里和chunk server list,如果在server数据里的version和chunk server
    不同就重新连接master
    *5. Client 会吧server list 放进cache里,这样就不需要每次都问master了
    Capture151
    Write: 写数据:(这里的写指的是modify)

  25. Client发 filename到master

  26. master恢复chunk server list,这里跟读一样,但是chunkservers分成两种,一个叫primaryreplica,一种叫
    secondary replica

  27. 把数据写到三个replica的cache里(!!不是硬盘,是LRU cache)

  28. Client 发写请求给primary replica,primary把最新的文件写到硬盘

  29. Primary 把写请求 forward给两个secondary, secondary 写data

  30. 写完之后secondary告诉primary写好了

  31. primary 回复client, 如果出错就重复3-7
    *和传统方法不一样的是,GFS写数据提供recordappend方法,用来保证多client同时写
    Capture152
    系统优化:

  32. 一般不需要第二个master, 如果挂了去重启就行,但是master会存一个log到硬盘,重启之后从log恢复state

  33. 如何判断chunk server是不是挂了:heartbeat message

  34. 如何判断一个chunk server上某个block挂了: Achunk is broken up into 64 KB blocks. Each has a
    corresponding 32 bit checksum.

1 Like

BigTable: a NoSQL database (Column based) based on GFS
补充:大数据中常见的两种NoSQL:1)key/ value based like Redis or MemcacheDB 和 2)column
based, BigTable
SSTable的数据结构:

  1. Rows (key) : 最大64KB 的字符串,在SSTable中,是按照Row key 排序好的,顾名思义,sorted string table
  2. Columnkeys: 一个set,可以支持range query
  3. value:包括两部分,第一部分是value, 第二部分是时间timestamp,是一个64bit的整
    SSTable的文件格式:原则是不能修改,只能append,根据timestamp来决定哪个是最新的value
  4. 分成datablock, 每个block 64
  5. 在SSTable最后有一个 block inde
    3.查询SSTable先 load index然后二分查询 (不能放进内存用hashtable查询!)
    Bigtable 系统:
    1.Master server: 负责测试tablet server是不是在工作, 处理GFS的sharding等,在Bigtable中,Client是不会和
    master交流的,要和Lock交流
  6. Lock server: 用Chubby或者zookepper实现,完成多进程的锁,用metadata查找key所在的server
    Capture153
  1. Tablet server: 处理读写操作
    写操作:
    1.Client ask lock
    2.Lock return a tablet server and lock it
    3.Go to tablet server, 1) write to 1) commit log (write ahead log) in disk and 2)memTable in memory. If
    memory down, recovery from log
  2. Minorcompaction: when memory hit the threshold, frozen this one and write to GFS asa SSTable
    5.Major compaction: Merge SSTable, with same row key, use the most recent record.
    6.After all, client ask lock to unlock the tablet server and update the metadata
    读操作:
  3. Clientask lock
    2.Check metadata, return the tablet server and lock it
    3.Go to tablet server, first check memTable
    4.If not in memTable , check tablets one-by-one
    5.For each tablet, check the index firstly. So the worst time is O(mlogk), m isthe number of tablets, k is the
    length of each tablets.
    6.Retern value and unlock the server
    其他重要的scale优化:
  4. readcache: 在tabletserver上,1)scan cache 保存已经read的key/value,优化重复read; 2)block cache,存在
    GFS上SSTBlock,优化读取附近的key(不是很懂)
    2.Bloom filter:对每个SSTable加入一个bloom filter (多个hash函数来确定的一个bit filter),如果key没有通过
    SSTable的bloom filter,就不用read SSTable的index了,省去了二分的时间

一般不需要第二个master, 如果挂了去重启就行,但是master会存一个log到硬盘,重启之后从log恢复state

这样不是就有downtime了么?另外,NameNode在现实中承受的大量QPS,很容易挂掉。现在大一点的文件系统
Metadata都是分布式存储的。

这样不是就有downtime了么?另外,NameNode在现实中承受的大量QPS,很容易挂掉。现在大一点的文件系统

谢谢大大的回复!我想了一下你的问题:

  1. Master建立自己的replica,同时setup shadow master。Shadow master平时设置成read-only,可以在master当
    机的时间load replica,并且代替master工作。
  2. 如果64M的data,有64B的megadata,那么 1PB的data,megadata也就1G,大小足够放在内存了,假设放在了
    memcached里,100k的QPS,应该足够多的hadoop来map了?是不是真正工业界的service scale比我想象的大很多
    啊。。
  3. 请教 一下大大,我过几天要去databricks面试,因为我是new grad不大了解工业届,你觉得我应该跟面试官聊啥?谢
    谢!

谢谢大大的回复!我想了一下你的问题:

回答你的问题。
2. 现实生活中da ta不总是占满64M,会有大量几k几十k的文件碎片,1PB的metadata至少在几十G。大一点公司的
storage QPS都是用million计的。
3. 面Databricks你应该研究Spark和 Flink,他们又不做Storage

另外,作为面试官我肯定会考的问题:

  1. 怎么保证Master和Shadow master一 致性?写一半挂了怎么半?

谢谢~
加入QP S是10M, 每个内存的QPS是100k,所以需要100个server来负责matadata的处理?那样的话一致性就很难确
保了呀,是有什么特别的方法吗?
我的计划是一会把p2p的NoSQL整 理一下,然后就走一下MapReduce -> Spark,我觉得对数据库理解深刻一点对后面
系统的bottleneck可能理解更深刻,加上这几篇paper确实写的挺精彩的!

回答你的问题。

我觉得,可以为master 建立一个private database来写入 log,如果shadow有lag的话,可以从log中读取。
问题:如果有大量的写操作,那么master写入log的I/O会成为一个bottleneck
我记得Raft可以解决这个问题,但是具体原理我不是很清楚。
刚才我回复了自己,粘贴一下:
加入QPS是10M, 每个内存的Q PS是100k,所以需要100个server来负责matadata的处理?那样的话一致性就很难确
保了呀,是有什么特别的方法吗?
我的计划是一会把p2p的NoSQL整 理一下,然后就走一下MapReduce → Spark,我觉得对数据库理解深刻一点对后面
系统的bottleneck可能理解更深刻,加上这几篇paper确实写的挺精彩的!

Cassandra,简单看一下,主要总结一下consistanthashing,今天重点是MapReduce,如果以后有需求再revisit
特点:NoSQL,peer-to-peer,column key
主要技术:consistant hashing
特点:容易扩展(consistant hashing),单值读写快,一致性高
Consistent hashing:

  1. 加入有一个环,等分成2^64份
  2. 现在加入一个机器A,在环上随机3个位置设置成A的三个virtual nodes
  3. 来一个key,hash成0-2^64的一个值,从这个值顺时针找到第一个virtual node,就把这组(key,value)存放在这
    个node上
  4. 加入新机器,比如已经有A,B加入C,还是随机生成3个C的virtual nodes。然后把C到A之前的data从Btransfer到C
  5. Repica: 不仅仅存在key后面第一个virtual node上,而是存在三个node上
    PNG

:+1:

插眼:upside_down_face: