|
From: Ying X. <yin...@wi...> - 2012-07-03 07:52:46
|
Currently there have two major known issues about broadcast link as
belows:
1. There has one risk that name table updates sent over the broadcast
link after the neighbor is discovered will arrive before the initial
transfer of name table entries over the unicast link has been completed.
2. There has another risk that the node may send a broadcast message
after the neighbor is discovered without being certain whether the
neighbor will acknowledge it or not.
At present, the sending node cannot assume that the neighbor's link
endpoint is in the WW state just because its own link endpoint is in
that state, which means that is it doesn't know if the neighbor will
process the broadcast message or ignore it. However, once all of the
name table messages are acknowledged the node can be certain that the
other end is in WW state and that broadcast message will be processed.
Therefor, when the neighbor node is added to the sending node's
broadcast link map, the sending node should send all name talbe entries
over the unicast link. After these name table messages have been sent,
it should immediately one explicit message that tells the neighbor node
to start accepting broadcast messages, which can resolve the first
problem.
Since the explicit message contains the sequence number of the most
recent broadcast message the sending node has sent when it adds the
neighbor node to its broadcast link map, it can tell the neighbor node
where to start receiving and acknowledging broadcast messages, which
can reslove the second problem.
Signed-off-by: Ying Xue <yin...@wi...>
---
net/tipc/bcast.c | 23 +++++++++++++++++++++++
net/tipc/bcast.h | 1 +
net/tipc/link.c | 47 ++++++++++++++++++++++++++++++++++++++++++++++-
net/tipc/link.h | 1 +
net/tipc/name_distr.c | 1 +
net/tipc/node.c | 2 +-
6 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 47b61d3..3e7688a 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -355,6 +355,29 @@ static void bclink_peek_nack(struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
+/**
+ * tipc_bclink_info_recv - synchronize broadcast link info
+ */
+void tipc_bclink_info_recv(struct sk_buff *buf)
+{
+ struct tipc_node *n_ptr;
+ struct tipc_msg *msg = buf_msg(buf);
+
+ n_ptr = tipc_node_find(msg_prevnode(msg));
+ if (unlikely(!n_ptr))
+ return;
+
+ tipc_node_lock(n_ptr);
+ if (!n_ptr->bclink.supported && n_ptr->bclink.supportable &&
+ n_ptr->bclink.sync) {
+ n_ptr->bclink.supported = 1;
+ n_ptr->bclink.last_sent = n_ptr->bclink.last_in =
+ msg_last_bcast(msg);
+ n_ptr->bclink.oos_state = 0;
+ }
+ tipc_node_unlock(n_ptr);
+}
+
/*
* tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
*/
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a933065..c689aa2 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -89,6 +89,7 @@ void tipc_bclink_add_node(u32 addr);
void tipc_bclink_remove_node(u32 addr);
struct tipc_node *tipc_bclink_retransmit_to(void);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
+void tipc_bclink_info_recv(struct sk_buff *buf);
int tipc_bclink_send_msg(struct sk_buff *buf);
void tipc_bclink_recv_pkt(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 918c1b0..e6868e4 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -952,6 +952,47 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
}
/*
+ * tipc_link_send_bclink_info - send broadcast link info to new neighbor
+ *
+ * Send broadcast link info to open up for the new neighbor to accept broadcast
+ * message from the point contained in "last sent broadcast number" field and
+ * onwards. No link congestion checking is performed because the last sent
+ * broadcast number info *must* be delivered.
+ */
+void tipc_link_send_bclink_info(u32 dest)
+{
+ struct tipc_node *n_ptr;
+ struct tipc_link *l_ptr;
+ struct sk_buff *buf;
+
+ read_lock_bh(&tipc_net_lock);
+ n_ptr = tipc_node_find(dest);
+ if (n_ptr) {
+ tipc_node_lock(n_ptr);
+ buf = tipc_buf_acquire(INT_H_SIZE);
+ if (buf) {
+ struct tipc_msg *msg = buf_msg(buf);
+
+ tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
+ INT_H_SIZE, dest);
+ msg_set_non_seq(msg, 0);
+ msg_set_last_bcast(msg, n_ptr->bclink.acked);
+ msg_set_link_selector(msg, (dest & 1));
+
+ l_ptr = n_ptr->active_links[0];
+ if (l_ptr) {
+ link_add_chain_to_outqueue(l_ptr, buf, 0);
+ tipc_link_push_queue(l_ptr);
+ } else {
+ kfree_skb(buf);
+ }
+ }
+ tipc_node_unlock(n_ptr);
+ }
+ read_unlock_bh(&tipc_net_lock);
+}
+
+/*
* tipc_link_send_names - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
@@ -1730,6 +1771,10 @@ deliver:
tipc_node_unlock(n_ptr);
tipc_named_recv(buf);
continue;
+ case BCAST_PROTOCOL:
+ tipc_node_unlock(n_ptr);
+ tipc_bclink_info_recv(buf);
+ continue;
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_port_recv_proto_msg(buf);
@@ -1960,7 +2005,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
}
r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
- msg_set_bclink_sync(msg, 0);
+ msg_set_bclink_sync(msg, 1);
msg_set_redundant_link(msg, r_flag);
msg_set_linkprio(msg, l_ptr->priority);
msg_set_size(msg, msg_size);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 3a045eb..a53f103 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -224,6 +224,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space);
void tipc_link_reset(struct tipc_link *l_ptr);
int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector);
+void tipc_link_send_bclink_info(u32 dest);
void tipc_link_send_names(struct list_head *message_list, u32 dest);
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 25b7b56..05a4303 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -263,6 +263,7 @@ void tipc_named_node_up(unsigned long nodearg)
read_unlock_bh(&tipc_nametbl_lock);
tipc_link_send_names(&message_list, (u32)node);
+ tipc_link_send_bclink_info(node);
}
/**
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 37cf3e3..9e443d2 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -296,7 +296,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_addr_string_fill(addr_string, n_ptr->addr));
/* Flush broadcast link info associated with lost node */
- if (n_ptr->bclink.supported) {
+ if (n_ptr->bclink.supportable) {
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
--
1.7.1
|