Case Studies
GFS
Why
- performance -> sharding
- faults -> tolerance -> replication
- replication -> inconsistency
- consistency -> low performance
Strong consistency
Architecture
- A GFS cluster consists of a single master and multiple chunkservers
- Files are divided by chunks. Each chunk is identified by an immutable and globally unique 64 bit chunk handle assigned by the master at the time of chunk creation.
- Chunkservers store chunks on local disks as Linux files and read or write chunk data specified by a chunk handle and byte range.
- For reliability, each chunk is replicated on multiple chunkservers. By default, we store three replicas, though users can designate different replication levels for different regions of the file namespace
Master Data
- file name -> array of chunk handles (non-volatile)
- handle
- List of chunkservers (v)
- version (v)
- primary (v)
- lease expiration (v)
- LOG, CHECKPOINT (disk)
Read
- sends filename, offset to master
- master sends back handle, list of chunkservers
- client talks to one of the chunkservers
Raft
Algorithm
State
- Persistent state on all servers
- currentTerm: latest term server has seen (initialized to 0 on first boot, increases monotonically)
- votedFor: candidateId that received vote in current term (or null if none)
- log[]: log entries: each entry contains command for state machine, and term when entry was received by leader (first index is 1)
- Volatile state on all servers:
- commitIndex: index of highest log entry known to be committed (initialized to 0, increases monotonically)
- lastApplied: index of highest log entry applied to state machine
AppendEntriesRPC
- Receiver implementation
- Reply false if term < currentTerm
- Reply false if log doesn’t contain an entry at pervLog Index whose term matches pre
RequestVoteRPC
- Arguments
- term: candidate’s term
- candidateId: candidate requesting vote
- lastLogIndex: index of candidate’s last log entry
- lastLogTerm: term of candidate’s last log entry
- Results
- term: currentTerm, for candidate to update itself
- voteGranted: true means candidate received vote
- Receiver implementation
- Reply false if term < currentTerm
- If votedFor is null or candidateId, and candidate’s log is at least up-to-date as receiver’s log, grant vote
Raft Basics
Leaders, followers, and candidates
- Each server can be in one of three states: leader, follower, or candidate
- There is exactly one leader and all the other servers are followers
- Followers are passive: they issue no requests on their own but simply respond to requests from leaders and candidates
- The candidate state is used to elect a new leader
Terms and elections
- Raft divides time into terms of arbitrary length. Terms are numbered with consecutive integers.
- Each term begins with election, in which on ore more candidates attempt to become leader. If a candidate wins the election, the it servers as leader for the rest of the term. If the election results in a split vote, the term will end with no leader, and a new term will begin shortly. Raft ensures that there is at most one leader in a given term.
Terms as logical lock
- Different servers may observer the transitions between terms at different times, and in some situations a server may not observe an election or even entire terms.
- Terms act as a logical clock in Raft, and they allow servers to detect obsolete information.
- Each server store a current term number. Current terms are exchanged whenever servers communicate.
- If one server’s current term is smaller than the other’s, then it updates its current term to the larger value.
- If a candidate or leader discovers that its term is out of date, it immediately reverts to follower state.
- If a server receives a request with a stale term number, it rejects the request.
RPC
- Raft servers communicate using two types of RPCs.
- RequestVote RPCs are initiated by candidates during elections.
- AppendEntries RPCs are nitated by leaders to replicate log entries and provide a form of heartbet.
Leader Election
Heartbeat mechanism
- Raft uses a heartbeat mechanism to trigger leader election
- When server starts up, they begin as followers. Leaders send periodic heartbeats (AppendEntries RPCs that carry no log entries) to all followers in order to maintain their authority. A serer remains in follower state as long as it receives valid RPCs from a leader or candidate.
- If a follower receives no communication over a period of time called election timeout, then it assumes there is no viable leader and begins an election to choose a new leader.
Election
- To begin a election, a follower increments its current term and transitions to candidate state. It the votes for itself and issues RequestVote RPCs in parallel to each of the other servers in the cluster.
- Each server will vote for at most one candidate in a given term, on a first-come-first-served basis. The majority rule ensures that at most one candidate can win the election for a particular term
- A candidate continues in this state until one of three things happens: (1) it wins the election, (b) another server establishes itself as leader, or (c) a period of time goes by with no winner
- A candidate wins: A candidate wins an election if it receives votes form a majority of the servers in the full cluster for the same term. Once a candidate wins an election, it becomes leader. It then sends heartbeat messages to all of the other servers to establish its authority and prevent new elections.
- Another candidate wins: While waiting for votes, a candidate may receive an AppendEntries RPC from another server claiming to be leader. If the leader’s term (included in its RPC) is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and returns to follower state
- No winner: if many followers become candidates at the same time, votes could be split so that no candidate obtains a majority. When this happens, each candidate will time out and start a new election by incrementing its term and initiating another round of RequestVote RPCs.
Avoiding split votes
- Raft uses randomized election timeouts to ensure that split votes are rare and that they are resolved quickly.
- To prevent split votes in the first place, election timeouts are chosen randomly from a fixed interval. This spreads out the servers so that in most cases only a single server will time out; it wins the election and sends heartbeats before any other servers time out.
- To resolve split votes. Each candidate restarts its randomized election timeout at the start of an election, and it waits for that timeout to elapse before starting the next election; this reduces the likelihood of another split vote in the new election.
Log Replication
Log Replication Process
- The leader appends the command to its log as a new entry
- The leader issues AppendEntries RPCs in parallel to each of the other servers to replicate the entry
- When the entry has been safely replicated (the entry is replicated on a majority of the servers), the leader applies the entry to its state machine and returns the result of that execution to the client.
- If followers crash or run slowly, or if network packets are lost, the leader retries AppendEntries RPCS indefinitely (even after it has responded to the client) until all followers eventually store all log entries
Log structure
- Each log entry stores a state machine command along with the term number when the entry was received by the leader. The term number in log entries are used to detect inconsistencies between logs and to ensure some other properties.
- Each log entry also has an interger index identifying its position
Committed
- A log entry is committed once the leader that created the entry has replicated to a majority of the servers. This also commits all preceding entries in the leader’s log, including entries created by previous leaders.
- The leader keeps track of the highest index it knows to be committed, and it includes that index in future AppendEntries RPCs (including heartbeats) so that the other servers eventually find out. Once a follower learns that a log entry is committed, it applies the entry to its local state machine (in log order).
Log Matching property
- If two entries in different logs have the same index and term. then they store the same command. This property follow format the fact that a leader creates at most one entry with a given log index in a given term, and log entries never change their position in the log.
- If two entries in different logs have the same index and term, then the logs are identical in all preceding entries. This properties is guaranteed by a simple consistency check perform ed by AppendEntries. When sending an AppendEntries RPC, the leader includes the index and term of the entry in its log that immediately precedes the new entries. If the follower does not find an entry in its log with the same index and term, the nit refuses the new entries. This consistency check acts as an induction step.
Resolving inconsistencies
Leader crashes can leave the logs inconsistent. A follower maybe missing entries, may have extra uncommitted entries.
- In raft, the leader handles inconsistencies by forcing the follower’s log to duplicate its own. It find the latest log entry where the two logs agree, delete any entries in the follower’s log after that point, and send the follower all of the leader’s entries after that point.
- These actions happens in response to the consistency check performed by AppendEntries RPCs. The leader maintain a nextIndex for each follower, which is the index of the next log entry the leader will send to that follower. When a leader first comes into power, it initializes all nextIndex values to the index just after the last one in its log. If a followers log is inconsistent with the leader’s the AppendEntries consistency check will fail in the next AppendEntries RPC. After a rejection, the leader decrements nextIndex and retires the AppendEntries RPC. Eventually nextIndex will reach a point where the leader and follower logs match. When this happens, AppendEntries will succeed, which removes any conflicting entries in the follower’s log and appends entries format he leader’s log (if any). Once AppendEntries succeeds the follower’s log is consistent with the leader’s and it will remain that way for the rest of the term.
Safety
Election
- In any leader-based consensus algorithm, the leader must eventually store all of the commited log entries. In some consensus algorithms, such as Viewstamped Replication, a leader can be elected even if it doesn’t initially contain all of the committed entries, and the missing entries will be transmitted to the new leader.
- Raft uses a simpler approach, that prevent a candidate from winning an election unless its log contains all commited entry, to ensure each new leader has all the commited entries from previous terms. This is done by including information about the candidate’s log in the RequestVote RPC: the voter denies its vote if its own log is more up-to-date than that of the candidate.
- Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.
Committing entries from previous terms
Follower and candidate crashes
Timing and availability
Zookeeper
Zookeeper Service
Znode
- Zookeeper provides to its clients the abstraction of a set of data nodes (znodes), organized according to a hierarchical name space
- To refer to a given znode, we use the standard UNIX notation for file system path, like /A/B/C
- There are regular and ephemeral znodes a client can create. A regular znode is created and deleted by clients explicitly. A ephemeral node is created by clients and can be deleted by clients explicitly, and is also automatically removed when the session that creates it terminates.
Watches
- ZooKeeper implements watches to allow clients to receive timely notifications of changes without requiring polling.
- When a client issues a read operation with a watch flag set, the operation completes as normal except that the server promises to notify the client when the information returned has changed.
- Watches are one-time triggers associated with a session; they are unregistered once triggered or the session closes.
Sessions
- A client connects to ZooKeeper and initiates a session.
- Sessions have an associated timeout. ZooKeeper considers a client faulty if it does not receive anything from its session for more than that timeout.
- A session ends when clients explicitly close a session handle or ZooKeeper detects that a clients is faulty.
Client API
- Client can use
create
to create a regular or ephemeral znode with a given path, and store data in it. - Client can use
exists(path, watch)
to tell whether a znode with a given path exists, and use getData(path, wath)
to get the data and meta-data (such ass version information) associated with the znode. You can set the watch
flag to true to set a watch on the znode. getChildren(path, watch)
is used to get the set of names of the children of a znode.- Client can use
setData(path, data, version)
to write data to the znode with the given path, and use delete(path, versison)
to delete the znode. version
is used to detect whether the znode you want to update or delete if of the expected version, and if not, it will fail with an unexpected version number. sync(path)
waits for all updates pending at the start of the operation to propagate to the server that the client is connected to. The path is currently ignored.- All methods have both a synchronous and an asynchronous version available through the API.
Zookeeper Guarantees
Linearizable writes
- All request that update the state of Zookeeper are serializable and respect precedence
Example: New leader changes configuration
- Problem: What if a process sees that ready exists before the new leader start to make a change and then starts reading configuration while the change is in progress. Solution: Use the ordering guarantee for the notificationss.
Zookeeper Implementation
Replication
- ZooKeeper provides high availability by replicating the ZooKeeper data on each server that composes the service.
- Read requests are serviced from the local replica of each server database.
- Write requests are processed by an agreement protocol (an implementation of atomic broadcast).
- Write requests are forwarded to the leader. The followers receive message proposals consisting of state changes from the leader and agree upon state changes.
Data storage
- The replicated database is an in-memory database containing the entire data tree.
- For recoverability, Zookeeper keeps a replay log (a write-ahead log) of committed operation and we force writes to be on the disk media before they are applied to the in-memory database. It also generates periodic snapshots of the in-memory database.