|
From: Doug C. <cu...@ap...> - 2008-03-28 21:42:11
|
ni...@us... wrote: > 1 Add the protocol and the classes related to log propagation. Also add a simple implementation in the withlog package and a test case TestSimpleDbWithLog. This stuff looks great! You're a tour de force! A few minor comments: - RangeResults belongs in the ddb implementation package. The top-level package has the end-user API, and RangeResults are an implementation detail. - in Hadoop, the way we handle threads is to stop them with Thread.interrupt() rather than by setting a flag. In the thread, always treat InterruptedException as a signal to exit. the run() loop should check !this.isInterrupted(). - shouldn't the host get the hostMap from the master? And shouldn't it periodically refresh both the hostMap and the logMap from the master? For this, and instead of using ClientToMaster within a host, we need to add a method to HostToMaster protocol that returns a Mapper for the subset of the ring that concerns the calling node or host. Intitially it might return the full mapper, but, eventually, it should only transmit the node's neighborhood, or perhaps the host's neighborhoods. - Should we have a single propagator per host, instead of per node? That would conserve calls to the master, and a single propagation thread would throttle things, so that indexing doesn't overwhelm search performance. OTOH, we might sometimes want to propagate changes faster than a single thread can. But that's probably better dealt with explicitly rather than having a thread per node... - some possible name improvements: Tuple -> NodeState ? logMap -> overlappingNodes or neighbors? propagator -> retriever? synchronizer? - the point where we have a new log event from a neighbor, and need to resolve it against ourselves seems like a good point for a method call. > 2 Add the RangedDatabase class which contains NodeStatus, Database and Log. > 3 Add "getDocs" to the Database class to retrieve a number of documents. This will be used to improve performance during the log propagation. Q: Should Database be aware of Range to support filtered queries based on Range? Or do we make RangedDatabase add a clause to a query before passing it down to Database? I think we should add a Range element to Query that narrows it. But we first need to define what it means in terms of other public API elements. I think we define it in terms of the document's "position" field, which is the hashCode of its id by default, but can be explicitly specified. Does that sound right? Does getDocs() need to be in the top-level application API? At some point we need to distinguish between full documents and "outline" documents. E.g., if we're storing full-text then we don't want to transmit that to search clients when they're just displaying hits. We might, e.g., add a list of fields to be retrieved to Query. But I don't yet see a case where an application will need to fetch a set of documents by id. Except for search results, one-at-a-time access will be more typical, no? Sorry I've not been more involved this week... Doug |
|
From: Ning L. <nin...@gm...> - 2008-03-31 15:55:20
|
On Fri, Mar 28, 2008 at 5:42 PM, Doug Cutting <cu...@ap...> wrote: > - RangeResults belongs in the ddb implementation package. The top-level > package has the end-user API, and RangeResults are an implementation detail. Agree. I'll make the change. > - in Hadoop, the way we handle threads is to stop them with > Thread.interrupt() rather than by setting a flag. In the thread, always > treat InterruptedException as a signal to exit. the run() loop should > check !this.isInterrupted(). OK. We can use the same practice. > - shouldn't the host get the hostMap from the master? And shouldn't it > periodically refresh both the hostMap and the logMap from the master? > For this, and instead of using ClientToMaster within a host, we need to > add a method to HostToMaster protocol that returns a Mapper for the > subset of the ring that concerns the calling node or host. Intitially > it might return the full mapper, but, eventually, it should only > transmit the node's neighborhood, or perhaps the host's neighborhoods. Agree on all the points. I wrote the withlog package with the minimal functionalities just enough to demonstrate the use of the log interface. Much more needs to be done for host-to-master interactions. I figure we should come up with the detailed design for how to carry out the two types of load balancing - move a node from one host to another, and add/remove a node? > - Should we have a single propagator per host, instead of per node? > That would conserve calls to the master, and a single propagation thread > would throttle things, so that indexing doesn't overwhelm search > performance. OTOH, we might sometimes want to propagate changes faster > than a single thread can. But that's probably better dealt with > explicitly rather than having a thread per node... Agree. The part needs to be optimized - e.g. currently the synchronizer retrieves one doc at a time after processing the log. > - some possible name improvements: > Tuple -> NodeState ? > logMap -> overlappingNodes or neighbors? > propagator -> retriever? synchronizer? I like better names. :) I said very early I'm not good at names. :) > - the point where we have a new log event from a neighbor, and need to > resolve it against ourselves seems like a good point for a method call. The database should decide whether a version of a doc already exists, right? Should we add a method to the database class? Also, there will be a delay between checking if a version of a doc already exists and adding the version of the doc into the database (after retrieving it from a neighbor). Does this mean we always have to check if a version of a doc already exists before adding it? > I think we should add a Range element to Query that narrows it. But we > first need to define what it means in terms of other public API > elements. I think we define it in terms of the document's "position" > field, which is the hashCode of its id by default, but can be explicitly > specified. Does that sound right? Sounds good. > Does getDocs() need to be in the top-level application API? At some > point we need to distinguish between full documents and "outline" > documents. E.g., if we're storing full-text then we don't want to > transmit that to search clients when they're just displaying hits. We > might, e.g., add a list of fields to be retrieved to Query. But I don't > yet see a case where an application will need to fetch a set of > documents by id. Except for search results, one-at-a-time access will > be more typical, no? I thought getDoc() means getting the full document. I want to use getDocs() in log processing - after we process a number of log entries from a node and identify the docs we should retrieve, we call getDocs() to get those docs. What do you think? Ning |
|
From: Ning L. <nin...@gm...> - 2008-03-31 16:06:46
|
On Mon, Mar 31, 2008 at 11:55 AM, Ning Li <nin...@gm...> wrote: > On Fri, Mar 28, 2008 at 5:42 PM, Doug Cutting <cu...@ap...> wrote: > > - Should we have a single propagator per host, instead of per node? > > That would conserve calls to the master, and a single propagation thread > > would throttle things, so that indexing doesn't overwhelm search > > performance. OTOH, we might sometimes want to propagate changes faster > > than a single thread can. But that's probably better dealt with > > explicitly rather than having a thread per node... > > Agree. The part needs to be optimized - e.g. currently the synchronizer > retrieves one doc at a time after processing the log. It is a single propagator per host. But the question is, when should the synchronizer be started? Ning |
|
From: Doug C. <cu...@ap...> - 2008-03-31 22:15:14
|
Ning Li wrote: > It is a single propagator per host. But the question is, when should the > synchronizer be started? Hosts will both (1) respond to RPCs and (2) have a control loop. So maybe we should have them implement Runnable. The RPC listener could be started at the beginning of the run() implementation, and stopped when it exits. Then programs (like unit tests) can manage them using the Runnable API & Thread.interrupt(). They can also have a main() method that runs them in the foreground, for command-line use. Does that work? Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-02 16:26:51
|
On Fri, Mar 28, 2008 at 5:42 PM, Doug Cutting <cu...@ap...> wrote: > I think we should add a Range element to Query that narrows it. But we > first need to define what it means in terms of other public API > elements. I think we define it in terms of the document's "position" > field, which is the hashCode of its id by default, but can be explicitly > specified. Does that sound right? I realize, we'll have to add getDoc(String id, int position) in addition to adding search(Range range, Query q, int maxHits) to Database? Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-02 18:01:17
|
Ning Li wrote: > On Fri, Mar 28, 2008 at 5:42 PM, Doug Cutting <cu...@ap...> wrote: >> I think we should add a Range element to Query that narrows it. But we >> first need to define what it means in terms of other public API >> elements. I think we define it in terms of the document's "position" >> field, which is the hashCode of its id by default, but can be explicitly >> specified. Does that sound right? > > I realize, we'll have to add getDoc(String id, int position) in addition to > adding search(Range range, Query q, int maxHits) to Database? Yes. getDoc(id) should not be abstract, but should use the default hash function and call getDoc(id, position). Applications that explicitly specify positions when adding documents must always call getDoc(id, position). We should perhaps mark the methods that take an explicit position as "Expert" or somesuch? Doug |
|
From: Doug C. <cu...@ap...> - 2008-03-31 22:04:08
|
Ning Li wrote: >> - the point where we have a new log event from a neighbor, and need to >> resolve it against ourselves seems like a good point for a method call. > > The database should decide whether a version of a doc already > exists, right? Should we add a method to the database class? The naive approach is simply to db.getDoc() and check the version, no? If the existing version is newer, then we ignore. Note that deletes come with a version, so if the indexed version is newer than the deletion, we ignore the delete, just like an add. > Also, there will be a delay between checking if a version of a doc > already exists and adding the version of the doc into the database > (after retrieving it from a neighbor). Does this mean we always > have to check if a version of a doc already exists before adding it? Yes, in general. We might be able to avoid it when copying new ranges into a node, if the log is de-duplicated. But for general synchronization I think we always need to check versions. So it might be worth optimizing the doc->version implementation. So the method to add might be db.getVersion(String id). This doesn't make sense for end users, so should be added to the Database API we use in a node, not to the base abstract class. > I thought getDoc() means getting the full document. I want to use > getDocs() in log processing - after we process a number of log > entries from a node and identify the docs we should retrieve, we > call getDocs() to get those docs. What do you think? Yes, that sounds reasonable. I was just questioning whether this belongs the top-level Database API, or rather just in HostToHostProtocol. I just want to hide as much as we can from end-users, so that we can more freely re-arrange things underneath later. Eventually the top-level API might add a method like getDocs(String[] ids, String[] fields), which looks similar, but has a different purpose. Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-02 00:39:50
|
On Mon, Mar 31, 2008 at 6:03 PM, Doug Cutting <cu...@ap...> wrote: > The naive approach is simply to db.getDoc() and check the version, no? Yes. The current implementation does that. I was worried about its performance because I thought db.getDoc() means retrieving a full document. But you mentioned the semantics of retrieving the outline of a document. That would work. It's time to have db.getDoc(id) and db.getDoc(id, fields)? > If the existing version is newer, then we ignore. Note that deletes > come with a version, so if the indexed version is newer than the > deletion, we ignore the delete, just like an add. Do we index a delete? > Yes, in general. We might be able to avoid it when copying new ranges > into a node, if the log is de-duplicated. But for general > synchronization I think we always need to check versions. > > So it might be worth optimizing the doc->version implementation. So the > method to add might be db.getVersion(String id). This doesn't make sense > for end users, so should be added to the Database API we use in a node, > not to the base abstract class. So we have index deletes and keep them? > Yes, that sounds reasonable. I was just questioning whether this > belongs the top-level Database API, or rather just in > HostToHostProtocol. I just want to hide as much as we can from > end-users, so that we can more freely re-arrange things underneath later. > > Eventually the top-level API might add a method like getDocs(String[] > ids, String[] fields), which looks similar, but has a different purpose. I see. Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-02 15:49:14
|
Ning Li wrote: > On Mon, Mar 31, 2008 at 6:03 PM, Doug Cutting <cu...@ap...> wrote: >> The naive approach is simply to db.getDoc() and check the version, no? > > Yes. The current implementation does that. I was worried about > its performance because I thought db.getDoc() means retrieving > a full document. But you mentioned the semantics of retrieving > the outline of a document. That would work. It's time to have > db.getDoc(id) and db.getDoc(id, fields)? Maybe we need 'getVersion()' for this, that returns a special value for deleted documents? > Do we index a delete? No, but we need to log them, and, when replaying logs decide whether to execute them. If you delete a doc on one node and subsequently add it on another then, when the deletion is propagated to the former it should be ignored. But if you just delete it then the propagated delete should be executed. We distinguish the cases by comparing the version in the deletion log event to the version in the index -- just like for adds. Does that make sense? Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-02 16:24:00
|
On Wed, Apr 2, 2008 at 11:49 AM, Doug Cutting <cu...@ap...> wrote: > Ning Li wrote: > > On Mon, Mar 31, 2008 at 6:03 PM, Doug Cutting <cu...@ap...> wrote: > >> The naive approach is simply to db.getDoc() and check the version, no? > > > > Yes. The current implementation does that. I was worried about > > its performance because I thought db.getDoc() means retrieving > > a full document. But you mentioned the semantics of retrieving > > the outline of a document. That would work. It's time to have > > db.getDoc(id) and db.getDoc(id, fields)? > > Maybe we need 'getVersion()' for this, that returns a special value for > deleted documents? > > > > Do we index a delete? > > No, but we need to log them, and, when replaying logs decide whether to > execute them. If you delete a doc on one node and subsequently add it > on another then, when the deletion is propagated to the former it should > be ignored. But if you just delete it then the propagated delete should > be executed. We distinguish the cases by comparing the version in the > deletion log event to the version in the index -- just like for adds. Yes, we have to log and propagate deletes correctly. What I'm worried about is the impact of the version check on the index build performance. As you said, for general synchronization, we always need to check versions. After we check a database/log and decide to add/delete a document, we call Database's addDoc/removeDoc method. In this addDoc/removeDoc method, we first parse the document for addDoc. Then in the same critical section, we have to check again if it is the latest version and applies the add/delete. Is checking the log for the version for a delete expensive here? And a log is not part of the Database abstraction, but part of RangedDatabase... Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-02 17:36:40
|
Ning Li wrote: > Yes, we have to log and propagate deletes correctly. > > What I'm worried about is the impact of the version check on the index > build performance. As you said, for general synchronization, we always > need to check versions. After we check a database/log and decide to > add/delete a document, we call Database's addDoc/removeDoc method. > In this addDoc/removeDoc method, we first parse the document for > addDoc. Then in the same critical section, we have to check again if > it is the latest version and applies the add/delete. Is checking the log > for the version for a delete expensive here? And a log is not part of > the Database abstraction, but part of RangedDatabase... First, I think we need to add an abstrct Database service-provider interface, called perhaps RangeDatabase, that's different from Database, adding methods that will be critical to good performance that must be implemented by, e.g., HeapDatabase and LuceneDatabase. Second, I don't yet see a way around checking versions when documents are added or deleted. The ugliest bit is that we have to keep track of the version of every document that's ever been deleted, in case a long-offline node comes online and reports a stale addition. That table could grow without bound. Sigh. Do you see a way around this? Perhaps a node could discard old deletions after a time, keeping track of the log entry number of the oldest retained deletion. Attempts to sync starting with an older entry number should be rejected and should trigger a complete copy-based replacement of the stale index. The hazard is that, if a document is added to a single node, then that node goes offline for a long time, then, when it comes online, the addition will be lost. Not great. Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-03 20:40:07
|
On Wed, Apr 2, 2008 at 1:36 PM, Doug Cutting <cu...@ap...> wrote: > First, I think we need to add an abstrct Database service-provider > interface, called perhaps RangeDatabase, that's different from Database, > adding methods that will be critical to good performance that must be > implemented by, e.g., HeapDatabase and LuceneDatabase. Database as the application interface and RangedDatabase as the service-provider interface sounds good. The methods in RangedDatabase will be very similar to those in the current RangedDatabase? > Second, I don't yet see a way around checking versions when documents > are added or deleted. The ugliest bit is that we have to keep track of > the version of every document that's ever been deleted, in case a > long-offline node comes online and reports a stale addition. That table > could grow without bound. Sigh. Do you see a way around this? I've been thinking about this. Finally, I think this is a possibility: 1 The database records and logs a deleted document and its version. 2 If all the replicas have recorded and logged the deleted document with the same version number, the document can be removed from the database. This is because any new versions of the document come after will have a larger version number. Does this sound right? Things are more complicated when we consider state changes... > Perhaps a node could discard old deletions after a time, keeping track > of the log entry number of the oldest retained deletion. Attempts to > sync starting with an older entry number should be rejected and should > trigger a complete copy-based replacement of the stale index. The > hazard is that, if a document is added to a single node, then that node > goes offline for a long time, then, when it comes online, the addition > will be lost. Not great. Do we allow a node to go offline for a long time? I thought we'd consider the node goes down and pick a replacement for it. Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-04 16:22:04
|
Ning Li wrote: > Database as the application interface and RangedDatabase as the > service-provider interface sounds good. The methods in RangedDatabase > will be very similar to those in the current RangedDatabase? Yes. HeapDatabase should extend this now though. > I've been thinking about this. Finally, I think this is a possibility: > 1 The database records and logs a deleted document and its version. > 2 If all the replicas have recorded and logged the deleted document > with the same version number, the document can be removed from > the database. This is because any new versions of the document > come after will have a larger version number. > > Does this sound right? Things are more complicated when we > consider state changes... It sounds right except for the case of a long-offline node coming back online. More on that below... > Do we allow a node to go offline for a long time? I thought we'd consider > the node goes down and pick a replacement for it. It would be nice to be able to eliminate long-offline nodes, but I don't yet see how. At startup we want nodes to announce their content to the master. Not all nodes will start at exactly the same time. (Note also that, if the master fails then nodes will also re-elect a new master and post their state there. Search and indexing should continue uninterrupted through master moves.) So, when a master first starts it needs to avoid modifying the ring for a time until it assumes that all nodes are up. We might even have nodes randomly delay their first report, so that the master isn't overwhelmed. If the network is partitioned then the master would allocate new nodes to underserviced regions. When the network is repaired, we have the choice of ignoring the data on the nodes that were replaced, or synchronizing it with what has transpired in their absence. In the case where all replicas of a region were offline, then we would want to use their data when they come online (like the system restart case), but when only a single replica was offline we might simply ignore its data and let it sync from scratch. However it may not be easy to distinguish these cases. If all replicas go offline, then we add new nodes to the region, we'd need to remember that, at some point in the past, all nodes in that region were offline. If the master was restarted during this time, it will be even harder to keep track of this. I'm still hopeful that we can come up a heuristic for this, but I need to think more about what it should be. Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-04 19:04:01
|
On Fri, Apr 4, 2008 at 12:22 PM, Doug Cutting <cu...@ap...> wrote: > Ning Li wrote: > > Database as the application interface and RangedDatabase as the > > service-provider interface sounds good. The methods in RangedDatabase > > will be very similar to those in the current RangedDatabase? > > Yes. HeapDatabase should extend this now though. Yes. I'm thinking we should put RangedDatabase in a separate package? Is "provider" a good name for the package? Hmm, it should be different from the current RangedDatabase in that it should not include the node status and the log? I need to think about the other topic of long-offline node... Ning |
|
From: Ning L. <nin...@gm...> - 2008-04-04 22:18:40
|
On Fri, Apr 4, 2008 at 12:22 PM, Doug Cutting <cu...@ap...> wrote: > It would be nice to be able to eliminate long-offline nodes, but I don't > yet see how. > > At startup we want nodes to announce their content to the master. Not > all nodes will start at exactly the same time. (Note also that, if the > master fails then nodes will also re-elect a new master and post their > state there. Search and indexing should continue uninterrupted through > master moves.) So, when a master first starts it needs to avoid > modifying the ring for a time until it assumes that all nodes are up. > We might even have nodes randomly delay their first report, so that the > master isn't overwhelmed. > > If the network is partitioned then the master would allocate new nodes > to underserviced regions. When the network is repaired, we have the > choice of ignoring the data on the nodes that were replaced, or > synchronizing it with what has transpired in their absence. In the case > where all replicas of a region were offline, then we would want to use > their data when they come online (like the system restart case), but > when only a single replica was offline we might simply ignore its data > and let it sync from scratch. However it may not be easy to distinguish > these cases. If all replicas go offline, then we add new nodes to the > region, we'd need to remember that, at some point in the past, all nodes > in that region were offline. If the master was restarted during this > time, it will be even harder to keep track of this. We were thinking storing the "metadata" in Zookeeper, right? All the nodes (ever) created, their ranges, their current log entry number, the starting entry numbers of their overlapping nodes, the current ring... So when a master is restarted, it starts from exactly where it was before it went down by retrieving the "metadata" from Zookeeper. When an offline node comes back up, we verify with Zookeeper that it is a valid node in a valid state. Then we check if we can patch up its data by checking whether the overlapping nodes and their logs still have the entries back to the starting entry numbers of the node. If not, we sync from scratch. What do you think? Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-04 23:26:18
|
Ning Li wrote: > We were thinking storing the "metadata" in Zookeeper, right? > All the nodes (ever) created, their ranges, their current log > entry number, the starting entry numbers of their overlapping > nodes, the current ring... So when a master is restarted, > it starts from exactly where it was before it went down by > retrieving the "metadata" from Zookeeper. I've been going back-and-forth in my head to try to decide how we want to use Zookeeper and whether we should store the current ring there. This is certainly an argument for that. It makes for a more stateful master than I'd hoped, but that might not be bad. An extreme idea would be to make Zookeeper *be* the master. There would be no XxxToMaster protocols: instead everything would be coordinated through Zookeeper files. Letting nodes directly access Zookeeper makes the master less of a bottleneck, since Zookeeper itself replicates data. > When an offline node comes back up, we verify with Zookeeper > that it is a valid node in a valid state. Then we check if we > can patch up its data by checking whether the overlapping > nodes and their logs still have the entries back to the > starting entry numbers of the node. If not, we sync from > scratch. I don't see it yet. Log entry numbers are per-node, and can't be compared across nodes, right? Node A syncs from B log entries 1-10. Document X is added to C. A and B both sync X from C. B goes offline. X is deleted from C. A syncs X's deletion from C. A expunges its logs. B comes back online. A tries to sync events after 10 from B. How does A know to ignore the addition of X? Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-07 17:55:26
|
On Fri, Apr 4, 2008 at 7:26 PM, Doug Cutting <cu...@ap...> wrote: > I've been going back-and-forth in my head to try to decide how we want > to use Zookeeper and whether we should store the current ring there. > This is certainly an argument for that. It makes for a more stateful > master than I'd hoped, but that might not be bad. > > An extreme idea would be to make Zookeeper *be* the master. There would > be no XxxToMaster protocols: instead everything would be coordinated > through Zookeeper files. Letting nodes directly access Zookeeper makes > the master less of a bottleneck, since Zookeeper itself replicates data. I thought only the master writes to Zookeeper. Hosts read from Zookeeper. We should still have a light-weight master, no? Which keeps track of the heartbeats from the hosts, detects host failures and responds accordingly, and decides state changes (add/remove nodes) when appropriate... > I don't see it yet. Log entry numbers are per-node, and can't be > compared across nodes, right? > > Node A syncs from B log entries 1-10. > Document X is added to C. > A and B both sync X from C. > B goes offline. > X is deleted from C. > A syncs X's deletion from C. > A expunges its logs. > B comes back online. > A tries to sync events after 10 from B. > > How does A know to ignore the addition of X? Log entry numbers are per-node. Each node remembers the log entry numbers it has synced with its overlapping nodes. In this example, say B synced with A to A's entry 11 and with C to C's entry 12 (which includes adding doc X) before it went offline. Because A expunged its log, now A's log entry number starts from 21. B comes back online and finds A's 21 is newer than last time B synced with it (11). So B cannot recover but has to sync from scratch. Does it make sense? Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-08 20:29:32
|
Ning Li wrote: > I thought only the master writes to Zookeeper. Hosts read from Zookeeper. > We should still have a light-weight master, no? Which keeps track of > the heartbeats from the hosts, detects host failures and responds > accordingly, and decides state changes (add/remove nodes) when > appropriate... As a thought experiment, I'm trying to see whether, with Zookeeper, we actually need such a master. We could replace heartbeats with an ephemeral file per node containing its status. (Ephemeral files disappear if their owner goes offline.) Any host could (a) grab a lock; (b) analyze the ring for potential add/removes; (c) post these requests to a directory. In effect, getting the lock is master election, and while a node is doing this analysis, it is the master. But the master moves around: each host has a "master" thread, and remains "master" only during one analysis/action cycle. This analysis cycle should not be very compute or i/o intensive. The advantage of this is that we wouldn't have to explicitly test or otherwise engineer master failover, since it would be happening all the time. All global state would be redundantly stored in Zookeeper. Could this work? > In this example, say B synced with A to A's entry 11 and > with C to C's entry 12 (which includes adding doc X) before > it went offline. Because A expunged its log, now A's log > entry number starts from 21. B comes back online and > finds A's 21 is newer than last time B synced with it (11). > So B cannot recover but has to sync from scratch. My concern was that A might try to sync from B and get stale data. I think you're arguing that B should not go online, accepting sync requests, until it has itself sync'd with its neighbors. In this case, incremental sync would fail and B would sync from scratch, removing any stale adds. How does this work at system startup? How does B know not to go online, providing its stale adds, yet A is permitted to provide its data? Perhaps, at startup a node can respond to requests for its first log entry number, but refuse requests for the logs themselves. Then B can see that it must reload, and refuse requests from A to sync until reload is complete. Does that work? Doug |
|
From: Yonik S. <yo...@ap...> - 2008-04-09 17:45:02
|
Everything seemed easy enough until you brought up network partitioning. In that event, could each partition elect a master? We would then have to handle merging of multiple different rings when the network was repaired, right? -Yonik |
|
From: Doug C. <cu...@ap...> - 2008-04-09 20:57:41
|
Yonik Seeley wrote: > Everything seemed easy enough until you brought up network partitioning. > In that event, could each partition elect a master? We would then > have to handle merging of multiple different rings when the network > was repaired, right? Zookeeper is supposed to handle hard stuff like this. Multiple masters should be impossible. But clients and nodes will have stale rings at times, but they should be able to continue w/o compromising things. Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-10 17:34:42
|
On Tue, Apr 8, 2008 at 4:29 PM, Doug Cutting <cu...@ap...> wrote: > Ning Li wrote: > > I thought only the master writes to Zookeeper. Hosts read from Zookeeper. > > We should still have a light-weight master, no? Which keeps track of > > the heartbeats from the hosts, detects host failures and responds > > accordingly, and decides state changes (add/remove nodes) when > > appropriate... > > As a thought experiment, I'm trying to see whether, with Zookeeper, we > actually need such a master. > > We could replace heartbeats with an ephemeral file per node containing > its status. (Ephemeral files disappear if their owner goes offline.) > > Any host could (a) grab a lock; (b) analyze the ring for potential > add/removes; (c) post these requests to a directory. In effect, getting > the lock is master election, and while a node is doing this analysis, it > is the master. But the master moves around: each host has a "master" > thread, and remains "master" only during one analysis/action cycle. > This analysis cycle should not be very compute or i/o intensive. > > The advantage of this is that we wouldn't have to explicitly test or > otherwise engineer master failover, since it would be happening all the > time. All global state would be redundantly stored in Zookeeper. > > Could this work? Yes, this would work. We simply put the code for master into host and make sure only one host is executing the master code at a time. Interesting. :) But let's use a master for now since it's easier to debug this way? > > In this example, say B synced with A to A's entry 11 and > > with C to C's entry 12 (which includes adding doc X) before > > it went offline. Because A expunged its log, now A's log > > entry number starts from 21. B comes back online and > > finds A's 21 is newer than last time B synced with it (11). > > So B cannot recover but has to sync from scratch. > > My concern was that A might try to sync from B and get stale data. I > think you're arguing that B should not go online, accepting sync > requests, until it has itself sync'd with its neighbors. In this case, > incremental sync would fail and B would sync from scratch, removing any > stale adds. > > How does this work at system startup? How does B know not to go online, > providing its stale adds, yet A is permitted to provide its data? > Perhaps, at startup a node can respond to requests for its first log > entry number, but refuse requests for the logs themselves. Then B can > see that it must reload, and refuse requests from A to sync until reload > is complete. Does that work? What do you mean by "a node can respond to requests for its first log entry number, but refuse requests for the logs themselves"? Maybe at system startup, master (or a host) analyzes, for each node, its first log entry number and the log entry numbers it has synced with its overlapping nodes. Then it is derived from this analysis which nodes need complete reloads? Is this what you meant? Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-10 23:42:15
|
Ning Li wrote: > Yes, this would work. We simply put the code for master into host > and make sure only one host is executing the master code at a time. > > Interesting. :) But let's use a master for now since it's easier to > debug this way? Keeping all the state in Zookeeper may also make it easy to debug, since it can be read from any client. I actually don't think it should be hard to have it both ways. We should write the master as a fail-over loop in any case, so that multiple master-daemon processes can be running on a cluster at once, but only one acting as master. Switching masters frequently should be as simple as adding a counter in the top-level daemon loop to decide when to give up the lock. The master should be a different class from the host, and should have it's own main() method, so that it can be run independently, but it should also be possible to run the master in the same JVM as a host. (We'll want this for testing anyway.) So, with these two simple properties, lightweight master failover and ability to run the master on a host, means that we can choose to either dedicate a machine to the master, or run the master daemon on every host, switching frequently. I'd certainly like to preserve the latter option, so lets keep it in mind as we code. >> How does this work at system startup? How does B know not to go online, >> providing its stale adds, yet A is permitted to provide its data? >> Perhaps, at startup a node can respond to requests for its first log >> entry number, but refuse requests for the logs themselves. Then B can >> see that it must reload, and refuse requests from A to sync until reload >> is complete. Does that work? > > What do you mean by "a node can respond to requests for its first log > entry number, but refuse requests for the logs themselves"? From A's perspective, when B comes online, B's data looks fine, since B's log is complete. But B discovers, when it talks to A, that B's data is obsolete. If A retrieves B's data before B discovers that it is obsolete, then A would get stale adds. So B must block retrieval of its data until it has determined whether it is valid to all of its neighbors. At system startup this means that all nodes must wait for their neighbors to come online so that they can determine whether their own state is valid before permitting any synchronization. Does that make any sense? > Maybe at system startup, master (or a host) analyzes, for each node, > its first log entry number and the log entry numbers it has synced with > its overlapping nodes. Then it is derived from this analysis which nodes > need complete reloads? Is this what you meant? I think each node can determine that on its own at startup. It 1. Posts its range and log start number. 2. Waits a bit, so all other nodes have had a chance to post their data. 3. Decides if its index is valid, by checking all overlapping node's log start numbers & compares them with the last sync'd log number to see if they've compacted their log. 4. Starts syncing with its neighbors. I worry that there might be pathological cases where different nodes were offline and/or compacted at different times causing all replicas of a range to be discarded. Does this make any sense? Doug |
|
From: Ning L. <nin...@gm...> - 2008-04-17 21:34:46
|
On Thu, Apr 10, 2008 at 7:42 PM, Doug Cutting <cu...@ap...> wrote: > So, with these two simple properties, lightweight master failover and > ability to run the master on a host, means that we can choose to either > dedicate a machine to the master, or run the master daemon on every > host, switching frequently. I'd certainly like to preserve the latter > option, so lets keep it in mind as we code. Sounds good. I like the design. :) > From A's perspective, when B comes online, B's data looks fine, since > B's log is complete. But B discovers, when it talks to A, that B's data > is obsolete. If A retrieves B's data before B discovers that it is > obsolete, then A would get stale adds. So B must block retrieval of its > data until it has determined whether it is valid to all of its > neighbors. At system startup this means that all nodes must wait for > their neighbors to come online so that they can determine whether their > own state is valid before permitting any synchronization. > > Does that make any sense? Yep. My point was also that a node needs to check the neighbors to decide the validity. > I think each node can determine that on its own at startup. It > > 1. Posts its range and log start number. > 2. Waits a bit, so all other nodes have had a chance to post their data. > 3. Decides if its index is valid, by checking all overlapping node's log > start numbers & compares them with the last sync'd log number to see if > they've compacted their log. > 4. Starts syncing with its neighbors. Data in Zookeeper is persistent, right? So there are records in Zookeeper on the range, log start number and log start numbers on neighbors of a node. So what does it mean if, at startup, a node's posted numbers are newer or older than those in Zookeeper? Can the data in Zookeeper help during startup? Or do we discard those records in Zookeeper? > I worry that there might be pathological cases where different nodes > were offline and/or compacted at different times causing all replicas of > a range to be discarded. :) Hopefully this won't happen because of replication. In addition, we only throw away/re-build a node when we know we can re-build its range from some other nodes, right? Ning |
|
From: Doug C. <cu...@ap...> - 2008-04-23 00:00:51
|
Ning Li wrote: >> I think each node can determine that on its own at startup. It >> >> 1. Posts its range and log start number. >> 2. Waits a bit, so all other nodes have had a chance to post their data. >> 3. Decides if its index is valid, by checking all overlapping node's log >> start numbers & compares them with the last sync'd log number to see if >> they've compacted their log. >> 4. Starts syncing with its neighbors. > > Data in Zookeeper is persistent, right? So there are records in Zookeeper > on the range, log start number and log start numbers on neighbors > of a node. So what does it mean if, at startup, a node's posted numbers > are newer or older than those in Zookeeper? Can the data in Zookeeper > help during startup? Or do we discard those records in Zookeeper? Zookeeper can store data that's persistent or that disappears when a node disappears. I think the most natural representation in Zookeeper would be to make the live ring to be represented by ephemeral files. Doug |
|
From: Yonik S. <yo...@ap...> - 2008-04-09 17:11:42
|
I'm trying to catch up with all the activity... please ignore if this has been addressed in later emails. On Wed, Apr 2, 2008 at 1:36 PM, Doug Cutting <cu...@ap...> wrote: > Second, I don't yet see a way around checking versions when documents > are added or deleted. The ugliest bit is that we have to keep track of > the version of every document that's ever been deleted, in case a > long-offline node comes online and reports a stale addition. That table > could grow without bound. Sigh. Do you see a way around this? > > Perhaps a node could discard old deletions after a time, keeping track > of the log entry number of the oldest retained deletion. Attempts to > sync starting with an older entry number should be rejected and should > trigger a complete copy-based replacement of the stale index. Right, this seems like the right approach. We shouldn't have to support incremental syncs forever. > The > hazard is that, if a document is added to a single node, then that node > goes offline for a long time, then, when it comes online, the addition > will be lost. Not great. If a document is added to a single node, it could be lost permanently anyway (node may never come back). That's why an add should go to multiple nodes. -Yonik |