From: Ying X. <yin...@wi...> - 2012-09-26 08:21:02
|
Jon and Erik, Comparing with last version, the following changes have been made: - Add necessary descriptions for new defined structures and functions by Erik's comments; - Update the header of the first patch header by Erik's suggestion; - Enhance allocation of connection ID to avoid possible duplication; - Make server support connectionless connection; - Convert configuration server with the interface of new server; - Remove implementation associated with user port; - Convert config_lock to mutex lock Until now all native user port code is purged completely. Therefor, after this time, I believe, TIPC stack code becomes more clean and more understandable. I have tested them with tipcTS&tipcTC as well as test cases in tipcutils package for a while, and no any exception happens. So please review them again. Lastly I appreciate Erik to give me valuable suggestions and find some potential problems. PS. Based on this series, I have implemented a prototype to remove port lock and aggregate tipc_port structure as a field inside tipc_sock, however, it's still unstable due to some unknown reasons. Once they are settled down, I will submit them in next step. Ying Xue (6): tipc: introduce new TIPC server infrastructure tipc: convert topology server with new server facility tipc: add lock nesting notation to quiet lockdep warning tipc: convert configuration server with new sever facility tipc: remove facilities of user port tipc: convert config_lock from spin lock to mutex net/tipc/Makefile | 2 +- net/tipc/config.c | 116 ++++------ net/tipc/core.c | 10 +- net/tipc/link.c | 18 +- net/tipc/msg.c | 13 +- net/tipc/msg.h | 2 +- net/tipc/port.c | 265 +-------------------- net/tipc/port.h | 62 +----- net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/server.h | 103 ++++++++ net/tipc/socket.c | 8 +- net/tipc/subscr.c | 318 ++++++++----------------- net/tipc/subscr.h | 19 ++- 13 files changed, 981 insertions(+), 641 deletions(-) create mode 100644 net/tipc/server.c create mode 100644 net/tipc/server.h |
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:05
|
Add lock nesting notation to quiet the following lockdep warning: [ INFO: possible recursive locking detected ] --------------------------------------------- kworker/u:0/6 is trying to acquire lock: (sk_lock-AF_TIPC){+.+.+.}, at: [<c8c1226c>] accept+0x15c/0x310 [tipc] but task is already holding lock: (sk_lock-AF_TIPC){+.+.+.}, at: [<c8c12138>] accept+0x28/0x310 [tipc] other info that might help us debug this: Possible unsafe locking scenario: CPU0 ---- lock(sk_lock-AF_TIPC); lock(sk_lock-AF_TIPC); *** DEADLOCK *** May be due to missing lock nesting notation 3 locks held by kworker/u:0/6: stack backtrace: Pid: 6, comm: kworker/u:0 Not tainted 3.5.0+ #25 Call Trace: [<c1092883>] print_deadlock_bug+0xe3/0xf0 [<c1094472>] validate_chain+0x5b2/0x720 [<c109485a>] __lock_acquire+0x27a/0x460 [<c1094ad0>] lock_acquire+0x90/0x100 [<c8c1226c>] ? accept+0x15c/0x310 [tipc] [<c1489d10>] lock_sock_nested+0x70/0x90 [<c8c1226c>] ? accept+0x15c/0x310 [tipc] [<c8c1226c>] accept+0x15c/0x310 [tipc] [<c1587962>] ? _raw_spin_unlock+0x22/0x30 [<c1486f51>] ? sock_create_lite+0x71/0x90 [<c1486fbb>] kernel_accept+0x4b/0x90 [<c8c13619>] tipc_accept_from_sock+0x29/0xa0 [tipc] [<c8c13115>] tipc_recv_work+0x15/0x30 [tipc] [<c1054c6c>] process_one_work+0x1cc/0x4a0 [<c1054bf0>] ? process_one_work+0x150/0x4a0 [<c8c13100>] ? tipc_close_conn+0xa0/0xa0 [tipc] [<c1056c92>] worker_thread+0x102/0x350 [<c109382b>] ? trace_hardirqs_on+0xb/0x10 [<c1056b90>] ? manage_workers+0x100/0x100 [<c105b614>] kthread+0x84/0x90 [<c105b590>] ? __init_kthread_worker+0x60/0x60 [<c158ed62>] kernel_thread_helper+0x6/0x10 Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/socket.c | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 2a5ccdf..6395560 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1510,7 +1510,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) u32 new_ref = new_tport->ref; struct tipc_msg *msg = buf_msg(buf); - lock_sock(new_sk); + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); /* * Reject any stray messages received by new socket -- 1.7.1 |
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:04
|
As new TIPC server infrastructure has been introduced, it can be used to convert TIPC topology server. Honestly the current locking scheme of topology server is so weird that deadlock easily happens. Therefor, from the conversion we will get two benefits: - It can simplify topology server locking policy. After the conversion, topology server only exists one subscriber lock to protect its members. So it's easier to understand and maintain the code. - It can kill a well-known bug which subscription events will be lost when topology port is congested. With the new sending interface provided by the new server to deliver subscribed events to subscribers, topology server first queues these events into its outgoing buffer, and then wakes up the sending process being blocked in workqueue context. Once the process is run, it will get events from the outgoing buffer to continue sending them to subscribers with kernel socket interface until the outgoing buffer becomes empty. Even if socket/port is congested during transmission of events, we are not afraid that events will be dropped since the process of sending events can be blocked. Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/core.c | 6 +- net/tipc/socket.c | 3 +- net/tipc/subscr.c | 318 +++++++++++++++++------------------------------------ net/tipc/subscr.h | 19 +++- 4 files changed, 121 insertions(+), 225 deletions(-) diff --git a/net/tipc/core.c b/net/tipc/core.c index bfe8af8..fbfcc92 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -2,7 +2,7 @@ * net/tipc/core.c: TIPC module code * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2006, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2010-2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -132,13 +132,13 @@ static int tipc_core_start(void) if (!res) res = tipc_nametbl_init(); if (!res) - res = tipc_subscr_start(); - if (!res) res = tipc_cfg_init(); if (!res) res = tipc_netlink_start(); if (!res) res = tipc_socket_init(); + if (!res) + res = tipc_subscr_start(); if (res) tipc_core_stop(); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 95df566..2a5ccdf 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -338,7 +338,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) else if (addr->addrtype != TIPC_ADDR_NAMESEQ) return -EAFNOSUPPORT; - if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) + if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) && + (addr->addr.nameseq.type != TIPC_TOP_SRV)) return -EACCES; return (addr->scope > 0) ? diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 0f7d0d0..0978f16 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -2,7 +2,7 @@ * net/tipc/subscr.c: TIPC network topology service * * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005-2007, 2010-2011, Wind River Systems + * Copyright (c) 2005-2007, 2010-2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -40,34 +40,44 @@ #include "subscr.h" /** - * struct tipc_subscriber - TIPC network topology subscriber - * @port_ref: object reference to server port connecting to subscriber - * @lock: pointer to spinlock controlling access to subscriber's server port - * @subscriber_list: adjacent subscribers in top. server's list of subscribers - * @subscription_list: list of subscription objects for this subscriber - */ -struct tipc_subscriber { - u32 port_ref; - spinlock_t *lock; - struct list_head subscriber_list; - struct list_head subscription_list; -}; - -/** * struct top_srv - TIPC network topology subscription service - * @setup_port: reference to TIPC port that handles subscription requests + * @tsrv: TIPC internal server that handles subscription requests * @subscription_count: number of active subscriptions (not subscribers!) - * @subscriber_list: list of ports subscribing to service - * @lock: spinlock govering access to subscriber list */ struct top_srv { - u32 setup_port; + struct tipc_server tsrv; atomic_t subscription_count; - struct list_head subscriber_list; - spinlock_t lock; }; -static struct top_srv topsrv; +static void subscr_conn_msg_event(unsigned int conid, + struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); +static void *subscr_named_msg_event(unsigned int conid); +static void subscr_conn_shutdown_event(unsigned int conid, void *usr_data); + + +static struct sockaddr_tipc topsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_TOP_SRV, + .addr.nameseq.lower = TIPC_TOP_SRV, + .addr.nameseq.upper = TIPC_TOP_SRV, + .scope = TIPC_NODE_SCOPE +}; + +static struct top_srv topsrv __read_mostly = { + .tsrv = { + .saddr = &topsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_SEQPACKET, + .max_rcvbuf_size = sizeof(struct tipc_subscr), + .name = "topsrv", + .tipc_conn_recvmsg = subscr_conn_msg_event, + .tipc_conn_new = subscr_named_msg_event, + .tipc_conn_shutdown = subscr_conn_shutdown_event, + }, + .subscription_count = ATOMIC_INIT(0) +}; /** * htohl - convert value to endianness used by destination @@ -83,9 +93,6 @@ static u32 htohl(u32 in, int swap) /** * subscr_send_event - send a message containing a tipc_event to the subscriber - * - * Note: Must not hold subscriber's server port lock, since tipc_send() will - * try to take the lock if the message is rejected and returned! */ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, @@ -94,7 +101,9 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 port_ref, u32 node) { - struct iovec msg_sect; + struct tipc_subscriber *subscriber = sub->subscriber; + struct kvec msg_sect; + int ret; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); @@ -104,7 +113,13 @@ static void subscr_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); + ret = tipc_conn_sendmsg((struct tipc_server *)&topsrv, + subscriber->conid, + NULL, + msg_sect.iov_base, + msg_sect.iov_len); + if (ret < 0) + pr_err("sending subscription event is failed!\n"); } /** @@ -152,16 +167,22 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub, */ static void subscr_timeout(struct tipc_subscription *sub) { - struct tipc_port *server_port; + struct tipc_subscriber *subscriber = sub->subscriber; + + spin_lock_bh(&subscriber->lock); - /* Validate server port reference (in case subscriber is terminating) */ - server_port = tipc_port_lock(sub->server_ref); - if (server_port == NULL) + /* + * Validate if connection related to the subscriber is + * closed (in case subscriber is terminating) + */ + if (subscriber->conid == 0) { + spin_unlock_bh(&subscriber->lock); return; + } /* Validate timeout (in case subscription is being cancelled) */ if (sub->timeout == TIPC_WAIT_FOREVER) { - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); return; } @@ -171,8 +192,7 @@ static void subscr_timeout(struct tipc_subscription *sub) /* Unlink subscription from subscriber */ list_del(&sub->subscription_list); - /* Release subscriber's server port */ - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); /* Notify subscriber of timeout */ subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, @@ -187,7 +207,7 @@ static void subscr_timeout(struct tipc_subscription *sub) /** * subscr_del - delete a subscription within a subscription list * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static void subscr_del(struct tipc_subscription *sub) { @@ -200,45 +220,38 @@ static void subscr_del(struct tipc_subscription *sub) /** * subscr_terminate - terminate communication with a subscriber * - * Called with subscriber port locked. Routine must temporarily release lock - * to enable subscription timeout routine(s) to finish without deadlocking; - * the lock is then reclaimed to allow caller to release it upon return. - * (This should work even in the unlikely event some other thread creates - * a new object reference in the interim that uses this lock; this routine will - * simply wait for it to be released, then claim it.) + * Note: Must call it in process context since it might sleep. */ static void subscr_terminate(struct tipc_subscriber *subscriber) { - u32 port_ref; + tipc_conn_terminate((struct tipc_server *)&topsrv, subscriber->conid); +} + +/** + * subscr_release - release subscriber + */ +static void subscr_release(struct tipc_subscriber *subscriber) +{ struct tipc_subscription *sub; struct tipc_subscription *sub_temp; - /* Invalidate subscriber reference */ - port_ref = subscriber->port_ref; - subscriber->port_ref = 0; - spin_unlock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); - /* Sever connection to subscriber */ - tipc_shutdown(port_ref); - tipc_deleteport(port_ref); + /* Invalidate subscriber reference */ + subscriber->conid = 0; /* Destroy any existing subscriptions for subscriber */ list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, subscription_list) { if (sub->timeout != TIPC_WAIT_FOREVER) { + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } - - /* Remove subscriber from topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_del(&subscriber->subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Reclaim subscriber lock */ - spin_lock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); /* Now destroy subscriber */ kfree(subscriber); @@ -247,7 +260,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber) /** * subscr_cancel - handle subscription cancellation request * - * Called with subscriber port locked. Routine must temporarily release lock + * Called with subscriber lock held. Routine must temporarily release lock * to enable the subscription timeout routine to finish without deadlocking; * the lock is then reclaimed to allow caller to release it upon return. * @@ -274,10 +287,10 @@ static void subscr_cancel(struct tipc_subscr *s, /* Cancel subscription timer (if used), then delete subscription */ if (sub->timeout != TIPC_WAIT_FOREVER) { sub->timeout = TIPC_WAIT_FOREVER; - spin_unlock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); - spin_lock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } @@ -285,7 +298,7 @@ static void subscr_cancel(struct tipc_subscr *s, /** * subscr_subscribe - create subscription for subscriber * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, struct tipc_subscriber *subscriber) @@ -335,7 +348,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, } INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); - sub->server_ref = subscriber->port_ref; + sub->subscriber = subscriber; sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); atomic_inc(&topsrv.subscription_count); @@ -350,194 +363,61 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, /** * subscr_conn_shutdown_event - handle termination request from subscriber - * - * Called with subscriber's server port unlocked. */ -static void subscr_conn_shutdown_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - unsigned char const *data, - unsigned int size, - int reason) +static void subscr_conn_shutdown_event(unsigned int conid, void *usr_data) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; - - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); + subscr_release((struct tipc_subscriber *)usr_data); } /** * subscr_conn_msg_event - handle new subscription request from subscriber - * - * Called with subscriber's server port unlocked. */ -static void subscr_conn_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size) +static void subscr_conn_msg_event(unsigned int conid, + struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; + struct tipc_subscriber *subscriber = usr_data; struct tipc_subscription *sub; - /* - * Lock subscriber's server port (& make a local copy of lock pointer, - * in case subscriber is deleted while processing subscription request) - */ - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - - if (size != sizeof(struct tipc_subscr)) { - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } else { - sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); - spin_unlock_bh(subscriber_lock); - if (sub != NULL) { - - /* - * We must release the server port lock before adding a - * subscription to the name table since TIPC needs to be - * able to (re)acquire the port lock if an event message - * issued by the subscription process is rejected and - * returned. The subscription cannot be deleted while - * it is being added to the name table because: - * a) the single-threading of the native API port code - * ensures the subscription cannot be cancelled and - * the subscriber connection cannot be broken, and - * b) the name table lock ensures the subscription - * timeout code cannot delete the subscription, - * so the subscription object is still protected. - */ - tipc_nametbl_subscribe(sub); - } - } + spin_lock_bh(&subscriber->lock); + sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber); + if (sub) + tipc_nametbl_subscribe(sub); + spin_unlock_bh(&subscriber->lock); } /** * subscr_named_msg_event - handle request to establish a new subscriber */ -static void subscr_named_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) +static void *subscr_named_msg_event(unsigned int conid) { struct tipc_subscriber *subscriber; - u32 server_port_ref; /* Create subscriber object */ subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); if (subscriber == NULL) { pr_warn("Subscriber rejected, no memory\n"); - return; + return NULL; } INIT_LIST_HEAD(&subscriber->subscription_list); - INIT_LIST_HEAD(&subscriber->subscriber_list); - - /* Create server port & establish connection to subscriber */ - tipc_createport(subscriber, - importance, - NULL, - NULL, - subscr_conn_shutdown_event, - NULL, - NULL, - subscr_conn_msg_event, - NULL, - &subscriber->port_ref); - if (subscriber->port_ref == 0) { - pr_warn("Subscriber rejected, unable to create port\n"); - kfree(subscriber); - return; - } - tipc_connect2port(subscriber->port_ref, orig); - - /* Lock server port (& save lock address for future use) */ - subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; + subscriber->conid = conid; + spin_lock_init(&subscriber->lock); - /* Add subscriber to topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Unlock server port */ - server_port_ref = subscriber->port_ref; - spin_unlock_bh(subscriber->lock); - - /* Send an ACK- to complete connection handshaking */ - tipc_send(server_port_ref, 0, NULL, 0); - - /* Handle optional subscription request */ - if (size != 0) { - subscr_conn_msg_event(subscriber, server_port_ref, - buf, data, size); - } + return (void *)subscriber; } +/** + * tipc_subscr_start - start TIPC network topology server + */ int tipc_subscr_start(void) { - struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; - int res; - - spin_lock_init(&topsrv.lock); - INIT_LIST_HEAD(&topsrv.subscriber_list); - - res = tipc_createport(NULL, - TIPC_CRITICAL_IMPORTANCE, - NULL, - NULL, - NULL, - NULL, - subscr_named_msg_event, - NULL, - NULL, - &topsrv.setup_port); - if (res) - goto failed; - - res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); - if (res) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - goto failed; - } - - return 0; - -failed: - pr_err("Failed to create subscription service\n"); - return res; + return tipc_server_start((struct tipc_server *)&topsrv); } +/** + * tipc_subscr_stop - stop TIPC network topology server + */ void tipc_subscr_stop(void) { - struct tipc_subscriber *subscriber; - struct tipc_subscriber *subscriber_temp; - spinlock_t *subscriber_lock; - - if (topsrv.setup_port) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - - list_for_each_entry_safe(subscriber, subscriber_temp, - &topsrv.subscriber_list, - subscriber_list) { - subscriber_lock = subscriber->lock; - spin_lock_bh(subscriber_lock); - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } - } + tipc_server_stop((struct tipc_server *)&topsrv); } diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 218d2e0..4b04ee8 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -2,7 +2,7 @@ * net/tipc/subscr.h: Include file for TIPC network topology service * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2007, Wind River Systems + * Copyright (c) 2005-2007, 2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,10 +37,25 @@ #ifndef _TIPC_SUBSCR_H #define _TIPC_SUBSCR_H +#include "server.h" + struct tipc_subscription; /** + * struct tipc_subscriber - TIPC network topology subscriber + * @conid: connection identifier to server connecting to subscriber + * @lock: controll access to subscriber + * @subscription_list: list of subscription objects for this subscriber + */ +struct tipc_subscriber { + unsigned int conid; + spinlock_t lock; + struct list_head subscription_list; +}; + +/** * struct tipc_subscription - TIPC network topology subscription object + * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription * @timeout: duration of subscription (in ms) * @filter: event filtering to be done for subscription @@ -52,13 +67,13 @@ struct tipc_subscription; * @evt: template for events generated by subscription */ struct tipc_subscription { + struct tipc_subscriber *subscriber; struct tipc_name_seq seq; u32 timeout; u32 filter; struct timer_list timer; struct list_head nameseq_list; struct list_head subscription_list; - u32 server_ref; int swap; struct tipc_event evt; }; -- 1.7.1 |
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:07
|
TIPC have two internal servers, one that provides a subscription service for topology events and another that provides the configuration interface. These servers have previously been running in BH context, accessing the TIPC-port API directly. This required complex lock policies to be implemented and it also caused scalability problems when the topology server port got congested and events got dropped silently. Therefor, we are introducing a TIPC server module that uses kernel sockets for message passing(instead of TIPC port directly). Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/Makefile | 2 +- net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/server.h | 103 ++++++++ 3 files changed, 790 insertions(+), 1 deletions(-) create mode 100644 net/tipc/server.c create mode 100644 net/tipc/server.h diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 6cd55d6..af7206b 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,4 +8,4 @@ tipc-y += addr.o bcast.o bearer.o config.o \ core.o handler.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ netlink.o node.o node_subscr.o port.o ref.o \ - socket.o log.o eth_media.o + socket.o log.o eth_media.o server.o diff --git a/net/tipc/server.c b/net/tipc/server.c new file mode 100644 index 0000000..0c369ae --- /dev/null +++ b/net/tipc/server.c @@ -0,0 +1,686 @@ +/* + * net/tipc/server.c: TIPC server infrastructure + * + * Copyright (c) 2012 Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include <net/sock.h> +#include <linux/module.h> + +/* Number of messages to send before rescheduling */ +#define MAX_SEND_MSG_COUNT 25 +#define CF_CONNECTED 1 + +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) + +/** + * struct tipc_conn - TIPC connection structure + * @kref: reference counter to connection object + * @conid: connection identifier + * @sock: socket handler associated with connection + * @flags: indicates connection state + * @server: pointer to connected server + * @outqueue: pointer to first outbound message in queue + * @outqueue_lock: controll access to the outqueue + * @rx_action: what to do when connection socket is active + * @list: list of connection objects for its server + * @rwork: receive work item + * @swork: send work item + * @usr_data: user-specified field + */ +struct tipc_conn { + struct kref kref; + unsigned int conid; + struct socket *sock; + unsigned long flags; + struct tipc_server *server; + struct list_head outqueue; + spinlock_t outqueue_lock; + int (*rx_action) (struct tipc_conn *con); + struct hlist_node list; + struct work_struct rwork; + struct work_struct swork; + void *usr_data; +}; + +/* An entry waiting to be sent */ +struct outqueue_entry { + struct list_head list; + struct kvec iov; + struct sockaddr_tipc dest; +}; + +static void tipc_recv_work(struct work_struct *work); +static void tipc_send_work(struct work_struct *work); +static void clean_outqueues(struct tipc_conn *con); + +static void tipc_conn_kref_release(struct kref *kref) +{ + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); + struct tipc_server *s = con->server; + + if (con->sock) { + __module_get(con->sock->ops->owner); + __module_get(con->sock->sk->sk_prot_creator->owner); + sock_release(con->sock); + con->sock = NULL; + } + + clean_outqueues(con); + + if (con->conid) + s->tipc_conn_shutdown(con->conid, con->usr_data); + + kfree(con); +} + +static void conn_put(struct tipc_conn *con) +{ + kref_put(&con->kref, tipc_conn_kref_release); +} + +static void conn_get(struct tipc_conn *con) +{ + kref_get(&con->kref); +} + +static inline unsigned int conid_hash(unsigned int conid) +{ + return conid & (CONN_HASH_SIZE - 1); +} + +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, + unsigned int conid) +{ + struct hlist_node *h; + struct tipc_conn *con; + unsigned int r; + + r = conid_hash(conid); + + read_lock_bh(&s->conn_hlist_lock); + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { + if (con->conid == conid) { + conn_get(con); + read_unlock_bh(&s->conn_hlist_lock); + return con; + } + } + read_unlock_bh(&s->conn_hlist_lock); + return NULL; +} + +static void sock_data_ready(struct sock *sk, int unused) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->rcv_wq, &con->rwork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +static void sock_write_space(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->send_wq, &con->swork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +/** + * tipc_register_callbacks - register socket callbacks + */ +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) +{ + struct sock *sk = sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + + sk->sk_data_ready = sock_data_ready; + sk->sk_write_space = sock_write_space; + sk->sk_user_data = con; + + con->sock = sock; + + write_unlock_bh(&sk->sk_callback_lock); +} + +/** + * tipc_unregister_callbacks - unregister socket callbacks + */ +static void tipc_unregister_callbacks(struct tipc_conn *con) +{ + struct sock *sk = con->sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + sk->sk_user_data = NULL; + write_unlock_bh(&sk->sk_callback_lock); +} + +/** + * tipc_close_conn - close connection + */ +static void tipc_close_conn(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { + write_lock_bh(&s->conn_hlist_lock); + hlist_del(&con->list); + write_unlock_bh(&s->conn_hlist_lock); + + tipc_unregister_callbacks(con); + + /* + * We shouldn't flush pending works as we may be in the + * thread. In fact the races with pending rx/tx work structs + * are harmless for us here as we have already deleted this + * connection from server connection list and set + * sk->sk_user_data to 0 before release connection object. + */ + kernel_sock_shutdown(con->sock, SHUT_RDWR); + + conn_put(con); + } +} + +/** + * get_conid - get unique connection ID + */ +static unsigned int get_conid(struct tipc_server *s) +{ + struct hlist_node *h; + struct tipc_conn *con; + unsigned int r; + unsigned int i; + + for (i = 0; i < 0xffffffff; i++) { + r = conid_hash(s->conid); + + if (hlist_empty(&s->conn_hlist[r])) + return s->conid; + + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { + if (con->conid != s->conid) + return s->conid; + } + s->conid++; + } + panic("failed to allocate unique connection ID\n"); + return 0; +} + +/** + * alloc_tipc_conn - allocate connection object + */ +static struct tipc_conn *alloc_tipc_conn(struct tipc_server *s) +{ + struct tipc_conn *con; + int r; + + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); + if (!con) + return NULL; + + write_lock_bh(&s->conn_hlist_lock); + r = conid_hash(get_conid(s)); + hlist_add_head(&con->list, &s->conn_hlist[r]); + con->conid = s->conid++; + write_unlock_bh(&s->conn_hlist_lock); + + set_bit(CF_CONNECTED, &con->flags); + con->server = s; + kref_init(&con->kref); + INIT_LIST_HEAD(&con->outqueue); + spin_lock_init(&con->outqueue_lock); + INIT_WORK(&con->swork, tipc_send_work); + INIT_WORK(&con->rwork, tipc_recv_work); + + return con; +} + +/** + * tipc_receive_from_sock - receive data from remote end + */ +static int tipc_receive_from_sock(struct tipc_conn *con) +{ + struct kvec iov; + void *buf; + int ret; + struct sockaddr_tipc addr; + struct msghdr msg = {}; + struct tipc_server *s = con->server; + + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); + if (!buf) { + ret = -ENOMEM; + goto out_close; + } + + iov.iov_base = buf; + iov.iov_len = s->max_rcvbuf_size; + msg.msg_name = &addr; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + MSG_DONTWAIT); + if (ret <= 0) { + kmem_cache_free(s->rcvbuf_cache, buf); + goto out_close; + } + + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); + + kmem_cache_free(s->rcvbuf_cache, buf); + + return 0; + +out_close: + if (ret != -EWOULDBLOCK) + tipc_close_conn(con); + else if (ret == 0) + /* Don't return success if we really got EOF */ + ret = -EAGAIN; + + return ret; +} + +/** + * tipc_accept_from_sock - listen and accept a new connection + */ +static int tipc_accept_from_sock(struct tipc_conn *con) +{ + struct socket *newsock; + struct tipc_conn *newcon; + int ret; + struct tipc_server *s = con->server; + + ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); + if (ret < 0) + goto exit; + + newcon = alloc_tipc_conn(con->server); + if (!newcon) { + ret = -ENOMEM; + sock_release(newsock); + goto exit; + } + + newcon->rx_action = tipc_receive_from_sock; + tipc_register_callbacks(newsock, newcon); + + /* Notify new connection is incoming */ + newcon->usr_data = s->tipc_conn_new(newcon->conid); + + module_put(newsock->ops->owner); + module_put(newsock->sk->sk_prot_creator->owner); + +exit: + return ret; +} + +/** + * tipc_create_listen_sock - create and listen server socket + */ +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) +{ + int ret; + struct socket *sock = NULL; + struct tipc_server *s = con->server; + + ret = sock_create_kern(AF_TIPC, s->type, 0, &sock); + if (ret < 0) + return NULL; + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&s->imp, sizeof(s->imp)); + if (ret < 0) + goto create_err; + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, + sizeof(*s->saddr)); + if (ret < 0) + goto create_err; + + switch (s->type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + con->rx_action = tipc_accept_from_sock; + + ret = kernel_listen(sock, 0); + if (ret < 0) + goto create_err; + break; + case SOCK_DGRAM: + case SOCK_RDM: + con->rx_action = tipc_receive_from_sock; + break; + default: + pr_err("Unknown socket type %d\n", s->type); + goto create_err; + } + + /* + * Hack: we put TIPC module twice since its refcount has been + * increased two times when socket is created, otherwise, the + * module cannot be unloaded at all. But it's safe to descrease + * its refcount here because the lifetime of the socket is always + * same as TIPC module. Likely we must inscrease it when the + * socket is closed. + */ + module_put(sock->ops->owner); + module_put(sock->sk->sk_prot_creator->owner); + + return sock; + +create_err: + sock_release(sock); + con->sock = NULL; + return NULL; +} + +/** + * tipc_open_listening_sock - create server to listen connections on its socket + */ +static int tipc_open_listening_sock(struct tipc_server *server) +{ + struct socket *sock; + struct tipc_conn *con; + int ret = -EINVAL; + + con = alloc_tipc_conn(server); + if (!con) + return -ENOMEM; + + sock = tipc_create_listen_sock(con); + if (sock) { + tipc_register_callbacks(sock, con); + ret = 0; + } + + return ret; +} + +static struct outqueue_entry *alloc_entry(void *data, int len) +{ + struct outqueue_entry *entry; + void *buf; + + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); + if (!entry) + return NULL; + + buf = kmalloc(len, GFP_ATOMIC); + if (!buf) { + kfree(entry); + return NULL; + } + + memcpy(buf, data, len); + entry->iov.iov_base = buf; + entry->iov.iov_len = len; + + return entry; +} + +static void free_entry(struct outqueue_entry *e) +{ + kfree(e->iov.iov_base); + kfree(e); +} + +/** + * clean_outqueues - discard all entries on the outqueues + */ +static void clean_outqueues(struct tipc_conn *con) +{ + struct outqueue_entry *e, *safe; + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +} + +/** + * tipc_conn_sendmsg - send message to server with connection ID + */ +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, + struct sockaddr_tipc *addr, void *data, size_t len) +{ + struct outqueue_entry *e; + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (!con) + return -1; + + e = alloc_entry(data, len); + if (!e) { + conn_put(con); + return -ENOMEM; + } + + if (addr) + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); + + spin_lock_bh(&con->outqueue_lock); + list_add_tail(&e->list, &con->outqueue); + spin_unlock_bh(&con->outqueue_lock); + + if (test_bit(CF_CONNECTED, &con->flags)) { + if (!queue_work(s->send_wq, &con->swork)) + conn_put(con); + } + + return 0; +} + +/** + * tipc_conn_terminate - terminate connection with server + */ +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (con) { + tipc_close_conn(con); + conn_put(con); + } +} + +/** + * tipc_send_to_sock - try its best to send out all messages in its outqueue + */ +static void tipc_send_to_sock(struct tipc_conn *con) +{ + int count = 0; + struct tipc_server *s = con->server; + struct outqueue_entry *e; + struct msghdr msg; + int ret; + + spin_lock_bh(&con->outqueue_lock); + while (1) { + e = list_entry(con->outqueue.next, struct outqueue_entry, + list); + if ((struct list_head *) e == &con->outqueue) + break; + spin_unlock_bh(&con->outqueue_lock); + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + + /* Don't starve people filling buffers */ + if (++count >= MAX_SEND_MSG_COUNT) { + cond_resched(); + count = 0; + } + + spin_lock_bh(&con->outqueue_lock); + list_del(&e->list); + free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +out: + return; + +send_err: + tipc_close_conn(con); +} + +/** + * tipc_recv_work - receive workqueue function + */ +static void tipc_recv_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); + int err; + + do { + err = con->rx_action(con); + } while (!err); + + conn_put(con); +} + +/** + * tipc_send_work - send workqueue function + */ +static void tipc_send_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); + + tipc_send_to_sock(con); + conn_put(con); +} + +/** + * tipc_work_stop - destroy allocated workqueues + */ +static void tipc_work_stop(struct tipc_server *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + +/** + * tipc_work_start - allocate workqueues + */ +static int tipc_work_start(struct tipc_server *s) +{ + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); + if (!s->rcv_wq) { + pr_err("can't start tipc receive workqueue\n"); + return -ENOMEM; + } + + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); + if (!s->send_wq) { + pr_err("can't start tipc send workqueue\n"); + destroy_workqueue(s->rcv_wq); + return -ENOMEM; + } + + return 0; +} + +/** + * tipc_server_start - launch server + */ +int tipc_server_start(struct tipc_server *s) +{ + int ret; + int i; + + s->conid = 0; + rwlock_init(&s->conn_hlist_lock); + + for (i = 0; i < CONN_HASH_SIZE; i++) + INIT_HLIST_HEAD(&s->conn_hlist[i]); + + s->rcvbuf_cache = kmem_cache_create("tipc_rcvbuf", s->max_rcvbuf_size, + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!s->rcvbuf_cache) + return -ENOMEM; + + ret = tipc_work_start(s); + if (ret < 0) { + kmem_cache_destroy(s->rcvbuf_cache); + return ret; + } + + ret = tipc_open_listening_sock(s); + return ret; +} + +/** + * tipc_server_stop - stop server and destroy all allocated resources + */ +void tipc_server_stop(struct tipc_server *s) +{ + struct hlist_node *h, *h1; + struct tipc_conn *con; + int i; + + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_safe(con, h, h1, &s->conn_hlist[i], list) + tipc_close_conn(con); + } + + tipc_work_stop(s); + kmem_cache_destroy(s->rcvbuf_cache); +} diff --git a/net/tipc/server.h b/net/tipc/server.h new file mode 100644 index 0000000..4b2c4cf --- /dev/null +++ b/net/tipc/server.h @@ -0,0 +1,103 @@ +/* + * net/tipc/server.h: Include file for TIPC server code + * + * Copyright (c) 2012, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SERVER_H +#define _TIPC_SERVER_H + +#include "core.h" + +#define TIPC_SERVER_NAME_LEN 32 +#define CONN_HASH_SIZE 32 + +/** + * struct tipc_server - TIPC server structure + * @conid: connection identifier associated with server + * @conn_hlist: links to all client connections connecting to server + * @conn_hlist_lock: conn_hlist list lock + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @saddr: TIPC server address + * @imp: message importance + * @type: socket type + * @max_rcvbuf_size: maximum permitted receive message length + * @name: server name + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_shutdown: callback will be called when connection is shut down + * @tipc_conn_recvmsg: callback will be called when message arrives + */ +struct tipc_server { + unsigned int conid; + struct hlist_head conn_hlist[CONN_HASH_SIZE]; + rwlock_t conn_hlist_lock; + struct kmem_cache *rcvbuf_cache; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + struct sockaddr_tipc *saddr; + int imp; + int type; + int max_rcvbuf_size; + const char name[TIPC_SERVER_NAME_LEN]; + void *(*tipc_conn_new) (unsigned int conid); + void (*tipc_conn_shutdown) (unsigned int conid, void *usr_data); + void (*tipc_conn_recvmsg) (unsigned int conid, + struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); +}; + +/** + * tipc_conn_sendmsg - send message to connection + */ +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, + struct sockaddr_tipc *addr, void *data, size_t len); + +/** + * tipc_conn_terminate - terminate connection with server + * + * Note: Must call it in process context since it might sleep + */ +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid); + +/** + * tipc_server_start - start server + */ +int tipc_server_start(struct tipc_server *s); + +/** + * tipc_server_stop - stop server + */ +void tipc_server_stop(struct tipc_server *s); + +#endif -- 1.7.1 |
[tipc-discussion] [PATCH net-next v2 4/6] tipc: convert
configuration server with new sever facility
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:12
|
As the new server module has been introduced, it can be used to convert configuration server to reduce its locking policy. Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/config.c | 99 +++++++++++++++++++++++----------------------------- net/tipc/core.c | 4 +- net/tipc/socket.c | 3 +- 3 files changed, 48 insertions(+), 58 deletions(-) diff --git a/net/tipc/config.c b/net/tipc/config.c index f67866c..c6d12b3 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -38,12 +38,12 @@ #include "port.h" #include "name_table.h" #include "config.h" +#include "server.h" #define REPLY_TRUNCATED "<truncated>\n" -static u32 config_port_ref; - static DEFINE_SPINLOCK(config_lock); +static struct tipc_server cfgsrv; static const void *req_tlv_area; /* request message TLV area */ static int req_tlv_space; /* request message TLV area size */ @@ -381,33 +381,28 @@ exit: return rep_tlv_buf; } -static void cfg_named_msg_event(void *userdata, - u32 port_ref, - struct sk_buff **buf, - const unchar *msg, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) +static void cfg_conn_msg_event(unsigned int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { struct tipc_cfg_msg_hdr *req_hdr; struct tipc_cfg_msg_hdr *rep_hdr; struct sk_buff *rep_buf; + int ret; /* Validate configuration message header (ignore invalid message) */ - req_hdr = (struct tipc_cfg_msg_hdr *)msg; - if ((size < sizeof(*req_hdr)) || - (size != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || + req_hdr = (struct tipc_cfg_msg_hdr *)buf; + if ((len < sizeof(*req_hdr)) || + (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) { pr_warn("Invalid configuration message discarded\n"); return; } /* Generate reply for request (if can't, return request) */ - rep_buf = tipc_cfg_do_cmd(orig->node, + rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type), - msg + sizeof(*req_hdr), - size - sizeof(*req_hdr), + buf + sizeof(*req_hdr), + len - sizeof(*req_hdr), BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr)); if (rep_buf) { skb_push(rep_buf, sizeof(*rep_hdr)); @@ -415,57 +410,51 @@ static void cfg_named_msg_event(void *userdata, memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr)); rep_hdr->tcm_len = htonl(rep_buf->len); rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST); - } else { - rep_buf = *buf; - *buf = NULL; - } - /* NEED TO ADD CODE TO HANDLE FAILED SEND (SUCH AS CONGESTION) */ - tipc_send_buf2port(port_ref, orig, rep_buf, rep_buf->len); + ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data, + rep_buf->len); + if (ret < 0) + pr_err("sending cfg reply message is failed\n"); + + kfree_skb(rep_buf); + } } +static struct sockaddr_tipc cfgsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_CFG_SRV, + .addr.nameseq.lower = 0, + .addr.nameseq.upper = 0, + .scope = TIPC_ZONE_SCOPE +}; + +static struct tipc_server cfgsrv __read_mostly = { + .saddr = &cfgsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_RDM, + .max_rcvbuf_size = 64 * 1024, + .name = "cfgsrv", + .tipc_conn_recvmsg = cfg_conn_msg_event, + .tipc_conn_new = NULL, + .tipc_conn_shutdown = NULL +}; + int tipc_cfg_init(void) { - struct tipc_name_seq seq; - int res; - - res = tipc_createport(NULL, TIPC_CRITICAL_IMPORTANCE, - NULL, NULL, NULL, - NULL, cfg_named_msg_event, NULL, - NULL, &config_port_ref); - if (res) - goto failed; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - goto failed; - - return 0; - -failed: - pr_err("Unable to create configuration service\n"); - return res; + return tipc_server_start(&cfgsrv); } void tipc_cfg_reinit(void) { - struct tipc_name_seq seq; - int res; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = 0; - tipc_withdraw(config_port_ref, TIPC_ZONE_SCOPE, &seq); + tipc_server_stop(&cfgsrv); - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - pr_err("Unable to reinitialize configuration service\n"); + cfgsrv_addr.addr.nameseq.lower = tipc_own_addr; + cfgsrv_addr.addr.nameseq.upper = tipc_own_addr; + tipc_server_start(&cfgsrv); } void tipc_cfg_stop(void) { - tipc_deleteport(config_port_ref); - config_port_ref = 0; + tipc_server_stop(&cfgsrv); } diff --git a/net/tipc/core.c b/net/tipc/core.c index fbfcc92..a4fb828 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -132,13 +132,13 @@ static int tipc_core_start(void) if (!res) res = tipc_nametbl_init(); if (!res) - res = tipc_cfg_init(); - if (!res) res = tipc_netlink_start(); if (!res) res = tipc_socket_init(); if (!res) res = tipc_subscr_start(); + if (!res) + res = tipc_cfg_init(); if (res) tipc_core_stop(); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 6395560..3abae14 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -339,7 +339,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) return -EAFNOSUPPORT; if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) && - (addr->addr.nameseq.type != TIPC_TOP_SRV)) + (addr->addr.nameseq.type != TIPC_TOP_SRV) && + (addr->addr.nameseq.type != TIPC_CFG_SRV)) return -EACCES; return (addr->scope > 0) ? -- 1.7.1 |
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:15
|
As both topology server and configuration server are converted with new server model, the old facilities of user port are not useful any more. So all its related code should be purged. Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/link.c | 18 ++-- net/tipc/msg.c | 13 +-- net/tipc/msg.h | 2 +- net/tipc/port.c | 265 +---------------------------------------------------- net/tipc/port.h | 62 +------------ net/tipc/socket.c | 2 +- 6 files changed, 19 insertions(+), 343 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 06a0214..dfffade 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1081,7 +1081,7 @@ again: * (Must not hold any locks while building message.) */ res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, - sender->max_pkt, !sender->user_port, &buf); + sender->max_pkt, &buf); read_lock_bh(&tipc_net_lock); node = tipc_node_find(destaddr); @@ -1216,18 +1216,14 @@ again: else sz = fragm_rest; - if (likely(!sender->user_port)) { - if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { + if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { error: - for (; buf_chain; buf_chain = buf) { - buf = buf_chain->next; - kfree_skb(buf_chain); - } - return -EFAULT; + for (; buf_chain; buf_chain = buf) { + buf = buf_chain->next; + kfree_skb(buf_chain); } - } else - skb_copy_to_linear_data_offset(buf, fragm_crs, - sect_crs, sz); + return -EFAULT; + } sect_crs += sz; sect_rest -= sz; fragm_crs += sz; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f2db8a8..9860d89 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -74,7 +74,7 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, */ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, u32 num_sect, unsigned int total_len, - int max_size, int usrmem, struct sk_buff **buf) + int max_size, struct sk_buff **buf) { int dsz, sz, hsz, pos, res, cnt; @@ -92,14 +92,9 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, return -ENOMEM; skb_copy_to_linear_data(*buf, hdr, hsz); for (res = 1, cnt = 0; res && (cnt < num_sect); cnt++) { - if (likely(usrmem)) - res = !copy_from_user((*buf)->data + pos, - msg_sect[cnt].iov_base, - msg_sect[cnt].iov_len); - else - skb_copy_to_linear_data_offset(*buf, pos, - msg_sect[cnt].iov_base, - msg_sect[cnt].iov_len); + skb_copy_to_linear_data_offset(*buf, pos, + msg_sect[cnt].iov_base, + msg_sect[cnt].iov_len); pos += msg_sect[cnt].iov_len; } if (likely(res)) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index ba2a72b..76271b9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -723,5 +723,5 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, u32 num_sect, unsigned int total_len, - int max_size, int usrmem, struct sk_buff **buf); + int max_size, struct sk_buff **buf); #endif diff --git a/net/tipc/port.c b/net/tipc/port.c index 2e9d5fe..5d851cf 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -46,11 +46,8 @@ #define MAX_REJECT_SIZE 1024 -static struct sk_buff *msg_queue_head; -static struct sk_buff *msg_queue_tail; DEFINE_SPINLOCK(tipc_port_list_lock); -static DEFINE_SPINLOCK(queue_lock); static LIST_HEAD(ports); static void port_handle_node_down(unsigned long ref); @@ -119,7 +116,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, msg_set_nameupper(hdr, seq->upper); msg_set_hdr_sz(hdr, MCAST_H_SIZE); res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, - !oport->user_port, &buf); + &buf); if (unlikely(!buf)) return res; @@ -206,11 +203,11 @@ exit: } /** - * tipc_createport_raw - create a generic TIPC port + * tipc_createport - create a generic TIPC port * * Returns pointer to (locked) TIPC port, or NULL if unable to create it */ -struct tipc_port *tipc_createport_raw(void *usr_handle, +struct tipc_port *tipc_createport(void *usr_handle, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance) @@ -239,7 +236,6 @@ struct tipc_port *tipc_createport_raw(void *usr_handle, INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); p_ptr->dispatcher = dispatcher; p_ptr->wakeup = wakeup; - p_ptr->user_port = NULL; k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); INIT_LIST_HEAD(&p_ptr->publications); INIT_LIST_HEAD(&p_ptr->port_list); @@ -276,7 +272,6 @@ int tipc_deleteport(u32 ref) buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); tipc_nodesub_unsubscribe(&p_ptr->subscription); } - kfree(p_ptr->user_port); spin_lock_bh(&tipc_port_list_lock); list_del(&p_ptr->port_list); @@ -473,7 +468,7 @@ int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, int res; res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, - !p_ptr->user_port, &buf); + &buf); if (!buf) return res; @@ -692,212 +687,6 @@ void tipc_port_reinit(void) spin_unlock_bh(&tipc_port_list_lock); } - -/* - * port_dispatcher_sigh(): Signal handler for messages destinated - * to the tipc_port interface. - */ -static void port_dispatcher_sigh(void *dummy) -{ - struct sk_buff *buf; - - spin_lock_bh(&queue_lock); - buf = msg_queue_head; - msg_queue_head = NULL; - spin_unlock_bh(&queue_lock); - - while (buf) { - struct tipc_port *p_ptr; - struct user_port *up_ptr; - struct tipc_portid orig; - struct tipc_name_seq dseq; - void *usr_handle; - int connected; - int peer_invalid; - int published; - u32 message_type; - - struct sk_buff *next = buf->next; - struct tipc_msg *msg = buf_msg(buf); - u32 dref = msg_destport(msg); - - message_type = msg_type(msg); - if (message_type > TIPC_DIRECT_MSG) - goto reject; /* Unsupported message type */ - - p_ptr = tipc_port_lock(dref); - if (!p_ptr) - goto reject; /* Port deleted while msg in queue */ - - orig.ref = msg_origport(msg); - orig.node = msg_orignode(msg); - up_ptr = p_ptr->user_port; - usr_handle = up_ptr->usr_handle; - connected = p_ptr->connected; - peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg); - published = p_ptr->published; - - if (unlikely(msg_errcode(msg))) - goto err; - - switch (message_type) { - - case TIPC_CONN_MSG:{ - tipc_conn_msg_event cb = up_ptr->conn_msg_cb; - u32 dsz; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb)) - goto reject; - if (unlikely(!connected)) { - if (tipc_connect2port(dref, &orig)) - goto reject; - } else if (peer_invalid) - goto reject; - dsz = msg_data_sz(msg); - if (dsz) - acknowledge_rx_data(p_ptr); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), dsz); - break; - } - case TIPC_DIRECT_MSG:{ - tipc_msg_event cb = up_ptr->msg_cb; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb || connected)) - goto reject; - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_importance(msg), - &orig); - break; - } - case TIPC_MCAST_MSG: - case TIPC_NAMED_MSG:{ - tipc_named_msg_event cb = up_ptr->named_msg_cb; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb || connected || !published)) - goto reject; - dseq.type = msg_nametype(msg); - dseq.lower = msg_nameinst(msg); - dseq.upper = (message_type == TIPC_NAMED_MSG) - ? dseq.lower : msg_nameupper(msg); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_importance(msg), - &orig, &dseq); - break; - } - } - if (buf) - kfree_skb(buf); - buf = next; - continue; -err: - switch (message_type) { - - case TIPC_CONN_MSG:{ - tipc_conn_shutdown_event cb = - up_ptr->conn_err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || !connected || peer_invalid) - break; - tipc_disconnect(dref); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg)); - break; - } - case TIPC_DIRECT_MSG:{ - tipc_msg_err_event cb = up_ptr->err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || connected) - break; - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg), &orig); - break; - } - case TIPC_MCAST_MSG: - case TIPC_NAMED_MSG:{ - tipc_named_msg_err_event cb = - up_ptr->named_err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || connected) - break; - dseq.type = msg_nametype(msg); - dseq.lower = msg_nameinst(msg); - dseq.upper = (message_type == TIPC_NAMED_MSG) - ? dseq.lower : msg_nameupper(msg); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg), &dseq); - break; - } - } - if (buf) - kfree_skb(buf); - buf = next; - continue; -reject: - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); - buf = next; - } -} - -/* - * port_dispatcher(): Dispatcher for messages destinated - * to the tipc_port interface. Called with port locked. - */ -static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) -{ - buf->next = NULL; - spin_lock_bh(&queue_lock); - if (msg_queue_head) { - msg_queue_tail->next = buf; - msg_queue_tail = buf; - } else { - msg_queue_tail = msg_queue_head = buf; - tipc_k_signal((Handler)port_dispatcher_sigh, 0); - } - spin_unlock_bh(&queue_lock); - return 0; -} - -/* - * Wake up port after congestion: Called with port locked - */ -static void port_wakeup_sh(unsigned long ref) -{ - struct tipc_port *p_ptr; - struct user_port *up_ptr; - tipc_continue_event cb = NULL; - void *uh = NULL; - - p_ptr = tipc_port_lock(ref); - if (p_ptr) { - up_ptr = p_ptr->user_port; - if (up_ptr) { - cb = up_ptr->continue_event_cb; - uh = up_ptr->usr_handle; - } - tipc_port_unlock(p_ptr); - } - if (cb) - cb(uh, ref); -} - - -static void port_wakeup(struct tipc_port *p_ptr) -{ - tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); -} - void tipc_acknowledge(u32 ref) { struct tipc_port *p_ptr; @@ -929,50 +718,6 @@ void acknowledge_rx_data(struct tipc_port *p_ptr) tipc_acknowledge(p_ptr->ref); } -/* - * tipc_createport(): user level call. - */ -int tipc_createport(void *usr_handle, - unsigned int importance, - tipc_msg_err_event error_cb, - tipc_named_msg_err_event named_error_cb, - tipc_conn_shutdown_event conn_error_cb, - tipc_msg_event msg_cb, - tipc_named_msg_event named_msg_cb, - tipc_conn_msg_event conn_msg_cb, - tipc_continue_event continue_event_cb, /* May be zero */ - u32 *portref) -{ - struct user_port *up_ptr; - struct tipc_port *p_ptr; - - up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); - if (!up_ptr) { - pr_warn("Port creation failed, no memory\n"); - return -ENOMEM; - } - p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, - importance); - if (!p_ptr) { - kfree(up_ptr); - return -ENOMEM; - } - - p_ptr->user_port = up_ptr; - up_ptr->usr_handle = usr_handle; - up_ptr->ref = p_ptr->ref; - up_ptr->err_cb = error_cb; - up_ptr->named_err_cb = named_error_cb; - up_ptr->conn_err_cb = conn_error_cb; - up_ptr->msg_cb = msg_cb; - up_ptr->named_msg_cb = named_msg_cb; - up_ptr->conn_msg_cb = conn_msg_cb; - up_ptr->continue_event_cb = continue_event_cb; - *portref = p_ptr->ref; - tipc_port_unlock(p_ptr); - return 0; -} - int tipc_portimportance(u32 ref, unsigned int *importance) { struct tipc_port *p_ptr; @@ -1215,7 +960,7 @@ static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_se int res; res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len, - MAX_MSG_SIZE, !sender->user_port, &buf); + MAX_MSG_SIZE, &buf); if (likely(buf)) tipc_port_recv_msg(buf); return res; diff --git a/net/tipc/port.h b/net/tipc/port.h index 5009113..a8e457b 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -44,56 +44,6 @@ #define TIPC_FLOW_CONTROL_WIN 512 -typedef void (*tipc_msg_err_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason, - struct tipc_portid const *attmpt_destid); - -typedef void (*tipc_named_msg_err_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason, - struct tipc_name_seq const *attmpt_dest); - -typedef void (*tipc_conn_shutdown_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason); - -typedef void (*tipc_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, unsigned int importance, - struct tipc_portid const *origin); - -typedef void (*tipc_named_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, unsigned int importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest); - -typedef void (*tipc_conn_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size); - -typedef void (*tipc_continue_event) (void *usr_handle, u32 portref); - -/** - * struct user_port - TIPC user port (used with native API) - * @usr_handle: user-specified field - * @ref: object reference to associated TIPC port - * - * <various callback routines> - */ -struct user_port { - void *usr_handle; - u32 ref; - tipc_msg_err_event err_cb; - tipc_named_msg_err_event named_err_cb; - tipc_conn_shutdown_event conn_err_cb; - tipc_msg_event msg_cb; - tipc_named_msg_event named_msg_cb; - tipc_conn_msg_event conn_msg_cb; - tipc_continue_event continue_event_cb; -}; - /** * struct tipc_port - TIPC port structure * @usr_handle: pointer to additional user-defined information about port @@ -111,7 +61,6 @@ struct user_port { * @port_list: adjacent ports in TIPC's global list of ports * @dispatcher: ptr to routine which handles received messages * @wakeup: ptr to routine to call when port is no longer congested - * @user_port: ptr to user port associated with port (if any) * @wait_list: adjacent ports in list of ports waiting on link congestion * @waiting_pkts: * @sent: # of non-empty messages sent by port @@ -139,7 +88,6 @@ struct tipc_port { struct list_head port_list; u32 (*dispatcher)(struct tipc_port *, struct sk_buff *); void (*wakeup)(struct tipc_port *); - struct user_port *user_port; struct list_head wait_list; u32 waiting_pkts; u32 sent; @@ -158,7 +106,7 @@ struct tipc_port_list; /* * TIPC port manipulation routines */ -struct tipc_port *tipc_createport_raw(void *usr_handle, +struct tipc_port *tipc_createport(void *usr_handle, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance); @@ -170,14 +118,6 @@ void tipc_acknowledge(u32 port_ref); void acknowledge_rx_data(struct tipc_port *p_ptr); -int tipc_createport(void *usr_handle, - unsigned int importance, tipc_msg_err_event error_cb, - tipc_named_msg_err_event named_error_cb, - tipc_conn_shutdown_event conn_error_cb, tipc_msg_event msg_cb, - tipc_named_msg_event named_msg_cb, - tipc_conn_msg_event conn_msg_cb, - tipc_continue_event continue_event_cb, u32 *portref); - int tipc_deleteport(u32 portref); int tipc_portimportance(u32 portref, unsigned int *importance); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3abae14..462d808 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -204,7 +204,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, return -ENOMEM; /* Allocate TIPC port for socket to use */ - tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch, + tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, TIPC_LOW_IMPORTANCE); if (unlikely(!tp_ptr)) { sk_free(sk); -- 1.7.1 |
From: Ying X. <yin...@wi...> - 2012-09-26 08:21:16
|
As now configuration server is always run under process context, it's unreasonable to declare config_lock to spin lock. Signed-off-by: Ying Xue <yin...@wi...> --- net/tipc/config.c | 17 +++-------------- 1 files changed, 3 insertions(+), 14 deletions(-) diff --git a/net/tipc/config.c b/net/tipc/config.c index c6d12b3..778a71e 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -42,7 +42,7 @@ #define REPLY_TRUNCATED "<truncated>\n" -static DEFINE_SPINLOCK(config_lock); +static DEFINE_MUTEX(config_mutex); static struct tipc_server cfgsrv; static const void *req_tlv_area; /* request message TLV area */ @@ -181,18 +181,7 @@ static struct sk_buff *cfg_set_own_addr(void) if (tipc_own_addr) return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot change node address once assigned)"); - - /* - * Must temporarily release configuration spinlock while switching into - * networking mode as it calls tipc_eth_media_start(), which may sleep. - * Releasing the lock is harmless as other locally-issued configuration - * commands won't occur until this one completes, and remotely-issued - * configuration commands can't be received until a local configuration - * command to enable the first bearer is received and processed. - */ - spin_unlock_bh(&config_lock); tipc_core_start_net(addr); - spin_lock_bh(&config_lock); return tipc_cfg_reply_none(); } @@ -248,7 +237,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area { struct sk_buff *rep_tlv_buf; - spin_lock_bh(&config_lock); + mutex_lock(&config_mutex); /* Save request and reply details in a well-known location */ req_tlv_area = request_area; @@ -377,7 +366,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area /* Return reply buffer */ exit: - spin_unlock_bh(&config_lock); + mutex_unlock(&config_mutex); return rep_tlv_buf; } -- 1.7.1 |
From: Erik H. <eri...@er...> - 2012-09-26 12:34:47
|
> As the new server module has been introduced, it can be used to > convert configuration server to reduce its locking policy. We have previously agreed to drop the remote management support. It's probably better if this series only consists of: tipc: introduce new TIPC server infrastructure tipc: convert topology server with new server facility tipc: add lock nesting notation to quiet lockdep warning Then, in a separate patch, we remove the remote management. I think that would make this patch redundant: tipc: convert config_lock from spin lock to mutex //E |
From: Ying X. <yin...@wi...> - 2012-09-27 00:47:33
|
Erik Hugne wrote: >> As the new server module has been introduced, it can be used to >> convert configuration server to reduce its locking policy. > We have previously agreed to drop the remote management support. > With interfaces provided by the new server module, the remote management now works very well. So it's unnecessary to remove it. And I still think this is a very cool and useful feature! > It's probably better if this series only consists of: > tipc: introduce new TIPC server infrastructure > tipc: convert topology server with new server facility > tipc: add lock nesting notation to quiet lockdep warning > > Then, in a separate patch, we remove the remote management. > I think that would make this patch redundant: > tipc: convert config_lock from spin lock to mutex > > //E > |
From: Erik H. <eri...@er...> - 2012-09-27 06:19:15
|
> With interfaces provided by the new server module, the remote management > now works very well. > So it's unnecessary to remove it. And I still think this is a very cool > and useful feature! I'm a little worried since remote mgmt doesnt require any authentication. Even though what you can change remotely is limited, you can still view/alter the state of another node. We're adding IP based bearers soon, this might me a dangerous combination.. Or am i just being paranoid? :) //E |
From: Ying X. <yin...@wi...> - 2012-09-27 06:53:43
|
Erik Hugne wrote: >> With interfaces provided by the new server module, the remote management >> now works very well. >> So it's unnecessary to remove it. And I still think this is a very cool >> and useful feature! > > I'm a little worried since remote mgmt doesnt require any authentication. > Even though what you can change remotely is limited, you can still > view/alter the state of another node. > Yes, there is no any authentication for us using remote management. TIPC protocol is designed for communication with local machines within one cluster, so security is not its primary goal. > We're adding IP based bearers soon, this might me a dangerous > combination.. > Or am i just being paranoid? :) > No, your concern is reasonable. Under internet circumstance security becomes very important for us. So if IP bearer is introduced, our situation will get different than before. But the good news is that the management function _only_ supports to _view_ its peer node state remotely by my understanding. So if we are going to strengthen the security capability of TIPC, maybe it's better to consider a completely solution to do this. For example, the remote management not only needs the authentication, but also the discovery protocol should also need when two links established. In all, I think the remote management function can be remained. If security is very important for us one day, we can add an extra security module. What do you think? Regards, Ying > //E > > > |
From: Paul G. <pau...@wi...> - 2012-10-25 19:12:12
|
[[PATCH net-next v2 1/6] tipc: introduce new TIPC server infrastructure] On 26/09/2012 (Wed 16:20) Ying Xue wrote: [I'm circling back to this series since Jon indicated the topology drops were one of the larger issues to be solved, and he was putting this review to the top of his list.] > TIPC have two internal servers, one that provides a subscription s/have/has/ > service for topology events and another that provides the > configuration interface. These servers have previously been running > in BH context, accessing the TIPC-port API directly. This required > complex lock policies to be implemented and it also caused > scalability problems when the topology server port got congested > and events got dropped silently. It would be nice to know a bit more detail about the above problems, so that the reviewer can know what you know, and then hopefully agree that the server introduction is the right thing to do. > > Therefor, we are introducing a TIPC server module that uses kernel > sockets for message passing(instead of TIPC port directly). Similarly, some more information about this new server would be good. How does it change use cases for people, or anything else? Details on what you thought about when choosing this path, and how it would be efficient and solve the problems that I want expanded above. Details on the flowpath that is now taken would be a part of that. > > Signed-off-by: Ying Xue <yin...@wi...> > --- > net/tipc/Makefile | 2 +- > net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ > net/tipc/server.h | 103 ++++++++ > 3 files changed, 790 insertions(+), 1 deletions(-) Looking at the diffstat, it appears to me that the code is added, and it is compiled, but since no other files are touched at all, it does nothing (i.e. is totally unused) after just this commit is applied. For example, tipc_server_start is never called. The reason I mention this, is because this is somewhat of a forced separation. Yes it is good to keep patches smaller where possible, but at the same time I have seen people get review comments saying to not introduce orphaned code -- i.e. the commit that adds the code should also enable and use the code. (If you don't do it this way, then a bisect will lead to a random user of broken code, and not the addition of the broken code). > create mode 100644 net/tipc/server.c > create mode 100644 net/tipc/server.h > > diff --git a/net/tipc/Makefile b/net/tipc/Makefile > index 6cd55d6..af7206b 100644 > --- a/net/tipc/Makefile > +++ b/net/tipc/Makefile > @@ -8,4 +8,4 @@ tipc-y += addr.o bcast.o bearer.o config.o \ > core.o handler.o link.o discover.o msg.o \ > name_distr.o subscr.o name_table.o net.o \ > netlink.o node.o node_subscr.o port.o ref.o \ > - socket.o log.o eth_media.o > + socket.o log.o eth_media.o server.o > diff --git a/net/tipc/server.c b/net/tipc/server.c > new file mode 100644 > index 0000000..0c369ae > --- /dev/null > +++ b/net/tipc/server.c > @@ -0,0 +1,686 @@ > +/* > + * net/tipc/server.c: TIPC server infrastructure > + * > + * Copyright (c) 2012 Wind River Systems > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions are met: > + * > + * 1. Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * 2. Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in the > + * documentation and/or other materials provided with the distribution. > + * 3. Neither the names of the copyright holders nor the names of its > + * contributors may be used to endorse or promote products derived from > + * this software without specific prior written permission. > + * > + * Alternatively, this software may be distributed under the terms of the > + * GNU General Public License ("GPL") version 2 as published by the Free > + * Software Foundation. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" > + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE > + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE > + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR > + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS > + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN > + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) > + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE > + * POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#include "server.h" > +#include <net/sock.h> > +#include <linux/module.h> > + > +/* Number of messages to send before rescheduling */ > +#define MAX_SEND_MSG_COUNT 25 > +#define CF_CONNECTED 1 Odd whitespace usage here. > + > +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) > + > +/** > + * struct tipc_conn - TIPC connection structure > + * @kref: reference counter to connection object > + * @conid: connection identifier > + * @sock: socket handler associated with connection > + * @flags: indicates connection state > + * @server: pointer to connected server > + * @outqueue: pointer to first outbound message in queue > + * @outqueue_lock: controll access to the outqueue > + * @rx_action: what to do when connection socket is active > + * @list: list of connection objects for its server > + * @rwork: receive work item > + * @swork: send work item > + * @usr_data: user-specified field > + */ > +struct tipc_conn { > + struct kref kref; > + unsigned int conid; > + struct socket *sock; > + unsigned long flags; > + struct tipc_server *server; > + struct list_head outqueue; > + spinlock_t outqueue_lock; > + int (*rx_action) (struct tipc_conn *con); > + struct hlist_node list; > + struct work_struct rwork; > + struct work_struct swork; > + void *usr_data; > +}; > + > +/* An entry waiting to be sent */ > +struct outqueue_entry { > + struct list_head list; > + struct kvec iov; > + struct sockaddr_tipc dest; > +}; > + > +static void tipc_recv_work(struct work_struct *work); > +static void tipc_send_work(struct work_struct *work); > +static void clean_outqueues(struct tipc_conn *con); > + > +static void tipc_conn_kref_release(struct kref *kref) > +{ > + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); > + struct tipc_server *s = con->server; > + > + if (con->sock) { > + __module_get(con->sock->ops->owner); > + __module_get(con->sock->sk->sk_prot_creator->owner); The use of __module_get (vs non underscore) and its use 2x probably needs some sort of explanation. > + sock_release(con->sock); > + con->sock = NULL; > + } > + > + clean_outqueues(con); > + > + if (con->conid) > + s->tipc_conn_shutdown(con->conid, con->usr_data); > + > + kfree(con); > +} > + > +static void conn_put(struct tipc_conn *con) > +{ > + kref_put(&con->kref, tipc_conn_kref_release); > +} > + > +static void conn_get(struct tipc_conn *con) > +{ > + kref_get(&con->kref); > +} > + > +static inline unsigned int conid_hash(unsigned int conid) > +{ > + return conid & (CONN_HASH_SIZE - 1); > +} > + > +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, > + unsigned int conid) > +{ > + struct hlist_node *h; > + struct tipc_conn *con; > + unsigned int r; > + > + r = conid_hash(conid); > + > + read_lock_bh(&s->conn_hlist_lock); > + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { > + if (con->conid == conid) { > + conn_get(con); > + read_unlock_bh(&s->conn_hlist_lock); > + return con; > + } > + } > + read_unlock_bh(&s->conn_hlist_lock); > + return NULL; > +} > + > +static void sock_data_ready(struct sock *sk, int unused) > +{ > + struct tipc_conn *con; > + > + read_lock(&sk->sk_callback_lock); > + con = sock2con(sk); > + if (con && test_bit(CF_CONNECTED, &con->flags)) { > + conn_get(con); > + if (!queue_work(con->server->rcv_wq, &con->rwork)) > + conn_put(con); > + } > + read_unlock(&sk->sk_callback_lock); > +} > + > +static void sock_write_space(struct sock *sk) > +{ > + struct tipc_conn *con; > + > + read_lock(&sk->sk_callback_lock); > + con = sock2con(sk); > + if (con && test_bit(CF_CONNECTED, &con->flags)) { > + conn_get(con); > + if (!queue_work(con->server->send_wq, &con->swork)) > + conn_put(con); > + } > + read_unlock(&sk->sk_callback_lock); > +} > + > +/** Lets not use the kerneldoc /** start marker if there is no actual kerneldoc content to be processed. (same for all instances). > + * tipc_register_callbacks - register socket callbacks > + */ > +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) > +{ > + struct sock *sk = sock->sk; > + > + write_lock_bh(&sk->sk_callback_lock); > + > + sk->sk_data_ready = sock_data_ready; > + sk->sk_write_space = sock_write_space; Do the above two lines have any relation to the other patch I have queued that does the tipc_data_ready and tipc_write_space additions? > + sk->sk_user_data = con; > + > + con->sock = sock; > + > + write_unlock_bh(&sk->sk_callback_lock); > +} > + > +/** > + * tipc_unregister_callbacks - unregister socket callbacks > + */ > +static void tipc_unregister_callbacks(struct tipc_conn *con) > +{ > + struct sock *sk = con->sock->sk; > + > + write_lock_bh(&sk->sk_callback_lock); > + sk->sk_user_data = NULL; > + write_unlock_bh(&sk->sk_callback_lock); > +} > + > +/** > + * tipc_close_conn - close connection > + */ > +static void tipc_close_conn(struct tipc_conn *con) > +{ > + struct tipc_server *s = con->server; > + > + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { > + write_lock_bh(&s->conn_hlist_lock); > + hlist_del(&con->list); > + write_unlock_bh(&s->conn_hlist_lock); > + > + tipc_unregister_callbacks(con); > + > + /* > + * We shouldn't flush pending works as we may be in the > + * thread. In fact the races with pending rx/tx work structs > + * are harmless for us here as we have already deleted this > + * connection from server connection list and set > + * sk->sk_user_data to 0 before release connection object. s/release/releasing/ > + */ > + kernel_sock_shutdown(con->sock, SHUT_RDWR); > + > + conn_put(con); > + } > +} > + > +/** > + * get_conid - get unique connection ID > + */ > +static unsigned int get_conid(struct tipc_server *s) > +{ > + struct hlist_node *h; > + struct tipc_conn *con; > + unsigned int r; > + unsigned int i; > + > + for (i = 0; i < 0xffffffff; i++) { How often is this called? How far in the loop will it typically go before finding something? It is used within write_lock_bh scope. > + r = conid_hash(s->conid); > + > + if (hlist_empty(&s->conn_hlist[r])) > + return s->conid; > + > + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { > + if (con->conid != s->conid) > + return s->conid; > + } > + s->conid++; > + } > + panic("failed to allocate unique connection ID\n"); No. Don't do this. Linus consistently gives people trouble for using panic too much. It should only be used when things like memory corruption or similar catastrophe means the machine _really_ can not continue to run. > + return 0; > +} > + > +/** > + * alloc_tipc_conn - allocate connection object > + */ > +static struct tipc_conn *alloc_tipc_conn(struct tipc_server *s) > +{ > + struct tipc_conn *con; > + int r; > + > + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); > + if (!con) > + return NULL; > + > + write_lock_bh(&s->conn_hlist_lock); > + r = conid_hash(get_conid(s)); > + hlist_add_head(&con->list, &s->conn_hlist[r]); > + con->conid = s->conid++; > + write_unlock_bh(&s->conn_hlist_lock); > + > + set_bit(CF_CONNECTED, &con->flags); > + con->server = s; > + kref_init(&con->kref); > + INIT_LIST_HEAD(&con->outqueue); > + spin_lock_init(&con->outqueue_lock); > + INIT_WORK(&con->swork, tipc_send_work); > + INIT_WORK(&con->rwork, tipc_recv_work); > + > + return con; > +} > + > +/** > + * tipc_receive_from_sock - receive data from remote end > + */ > +static int tipc_receive_from_sock(struct tipc_conn *con) > +{ > + struct kvec iov; > + void *buf; > + int ret; > + struct sockaddr_tipc addr; > + struct msghdr msg = {}; > + struct tipc_server *s = con->server; > + > + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); > + if (!buf) { > + ret = -ENOMEM; > + goto out_close; > + } > + > + iov.iov_base = buf; > + iov.iov_len = s->max_rcvbuf_size; > + msg.msg_name = &addr; > + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, > + MSG_DONTWAIT); > + if (ret <= 0) { > + kmem_cache_free(s->rcvbuf_cache, buf); > + goto out_close; > + } > + > + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); > + > + kmem_cache_free(s->rcvbuf_cache, buf); > + > + return 0; > + > +out_close: > + if (ret != -EWOULDBLOCK) > + tipc_close_conn(con); > + else if (ret == 0) > + /* Don't return success if we really got EOF */ > + ret = -EAGAIN; > + > + return ret; > +} > + > +/** > + * tipc_accept_from_sock - listen and accept a new connection > + */ > +static int tipc_accept_from_sock(struct tipc_conn *con) > +{ > + struct socket *newsock; > + struct tipc_conn *newcon; > + int ret; > + struct tipc_server *s = con->server; > + > + ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); > + if (ret < 0) > + goto exit; > + > + newcon = alloc_tipc_conn(con->server); > + if (!newcon) { > + ret = -ENOMEM; > + sock_release(newsock); > + goto exit; > + } > + > + newcon->rx_action = tipc_receive_from_sock; > + tipc_register_callbacks(newsock, newcon); > + > + /* Notify new connection is incoming */ > + newcon->usr_data = s->tipc_conn_new(newcon->conid); > + > + module_put(newsock->ops->owner); > + module_put(newsock->sk->sk_prot_creator->owner); > + > +exit: > + return ret; > +} > + > +/** > + * tipc_create_listen_sock - create and listen server socket > + */ > +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) > +{ > + int ret; > + struct socket *sock = NULL; > + struct tipc_server *s = con->server; > + > + ret = sock_create_kern(AF_TIPC, s->type, 0, &sock); > + if (ret < 0) > + return NULL; > + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, > + (char *)&s->imp, sizeof(s->imp)); > + if (ret < 0) > + goto create_err; > + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, > + sizeof(*s->saddr)); > + if (ret < 0) > + goto create_err; > + > + switch (s->type) { > + case SOCK_STREAM: > + case SOCK_SEQPACKET: > + con->rx_action = tipc_accept_from_sock; > + > + ret = kernel_listen(sock, 0); > + if (ret < 0) > + goto create_err; > + break; > + case SOCK_DGRAM: > + case SOCK_RDM: > + con->rx_action = tipc_receive_from_sock; > + break; > + default: > + pr_err("Unknown socket type %d\n", s->type); > + goto create_err; > + } > + > + /* > + * Hack: we put TIPC module twice since its refcount has been > + * increased two times when socket is created, otherwise, the > + * module cannot be unloaded at all. But it's safe to descrease > + * its refcount here because the lifetime of the socket is always > + * same as TIPC module. Likely we must inscrease it when the > + * socket is closed. > + */ > + module_put(sock->ops->owner); > + module_put(sock->sk->sk_prot_creator->owner); This is very likely to create a red flag when(if?) it goes to netdev. Maybe once you write more in the commit log about the whole strategy of what you are trying to solve, then I can comment more on what to do? > + > + return sock; > + > +create_err: > + sock_release(sock); > + con->sock = NULL; > + return NULL; > +} > + > +/** > + * tipc_open_listening_sock - create server to listen connections on its socket > + */ > +static int tipc_open_listening_sock(struct tipc_server *server) > +{ > + struct socket *sock; > + struct tipc_conn *con; > + int ret = -EINVAL; > + > + con = alloc_tipc_conn(server); > + if (!con) > + return -ENOMEM; > + > + sock = tipc_create_listen_sock(con); > + if (sock) { > + tipc_register_callbacks(sock, con); > + ret = 0; > + } > + > + return ret; > +} > + > +static struct outqueue_entry *alloc_entry(void *data, int len) > +{ > + struct outqueue_entry *entry; > + void *buf; > + > + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); > + if (!entry) > + return NULL; > + > + buf = kmalloc(len, GFP_ATOMIC); > + if (!buf) { > + kfree(entry); > + return NULL; > + } > + > + memcpy(buf, data, len); > + entry->iov.iov_base = buf; > + entry->iov.iov_len = len; > + > + return entry; > +} > + > +static void free_entry(struct outqueue_entry *e) > +{ > + kfree(e->iov.iov_base); > + kfree(e); > +} I wouldn't do this, since it is only used twice. That, and the name makes it sound "generic" -- meaning when I 1st saw it used, I went to grep the linux/include dir for it (and eventually returned here). Similar comments for anything else that sounds generic and not tipc specific -- even if it is static, it can appear in a backtrace. See mainline commits starting at 7f9ab6ac2e79b9658eba7c8e3ad8a4392d308057 and going back from there, as a cleanup requested by DaveM. I'd wait until Jon has added his comments about the overall architecture, locking design and data flow strategy before resending. Those kinds of issues are even more important to get right than the kinds of things I've pointed out here. But detailing what the initial plan was (like I asked for at the very top) may in fact help Jon, so making a 1st pass at creating that new text might be good to start now. (I know it would help me to better understand the goal here.) Paul. -- > + > +/** > + * clean_outqueues - discard all entries on the outqueues > + */ > +static void clean_outqueues(struct tipc_conn *con) > +{ > + struct outqueue_entry *e, *safe; > + > + spin_lock_bh(&con->outqueue_lock); > + list_for_each_entry_safe(e, safe, &con->outqueue, list) { > + list_del(&e->list); > + free_entry(e); > + } > + spin_unlock_bh(&con->outqueue_lock); > +} > + > +/** > + * tipc_conn_sendmsg - send message to server with connection ID > + */ > +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, > + struct sockaddr_tipc *addr, void *data, size_t len) > +{ > + struct outqueue_entry *e; > + struct tipc_conn *con; > + > + con = tipc_conn_lookup(s, conid); > + if (!con) > + return -1; > + > + e = alloc_entry(data, len); > + if (!e) { > + conn_put(con); > + return -ENOMEM; > + } > + > + if (addr) > + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); > + > + spin_lock_bh(&con->outqueue_lock); > + list_add_tail(&e->list, &con->outqueue); > + spin_unlock_bh(&con->outqueue_lock); > + > + if (test_bit(CF_CONNECTED, &con->flags)) { > + if (!queue_work(s->send_wq, &con->swork)) > + conn_put(con); > + } > + > + return 0; > +} > + > +/** > + * tipc_conn_terminate - terminate connection with server > + */ > +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid) > +{ > + struct tipc_conn *con; > + > + con = tipc_conn_lookup(s, conid); > + if (con) { > + tipc_close_conn(con); > + conn_put(con); > + } > +} > + > +/** > + * tipc_send_to_sock - try its best to send out all messages in its outqueue > + */ > +static void tipc_send_to_sock(struct tipc_conn *con) > +{ > + int count = 0; > + struct tipc_server *s = con->server; > + struct outqueue_entry *e; > + struct msghdr msg; > + int ret; > + > + spin_lock_bh(&con->outqueue_lock); > + while (1) { > + e = list_entry(con->outqueue.next, struct outqueue_entry, > + list); > + if ((struct list_head *) e == &con->outqueue) > + break; > + spin_unlock_bh(&con->outqueue_lock); > + > + memset(&msg, 0, sizeof(msg)); > + msg.msg_flags = MSG_DONTWAIT; > + > + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { > + msg.msg_name = &e->dest; > + msg.msg_namelen = sizeof(struct sockaddr_tipc); > + } > + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, > + e->iov.iov_len); > + if (ret == -EWOULDBLOCK || ret == 0) { > + cond_resched(); > + goto out; > + } else if (ret < 0) { > + goto send_err; > + } > + > + /* Don't starve people filling buffers */ > + if (++count >= MAX_SEND_MSG_COUNT) { > + cond_resched(); > + count = 0; > + } > + > + spin_lock_bh(&con->outqueue_lock); > + list_del(&e->list); > + free_entry(e); > + } > + spin_unlock_bh(&con->outqueue_lock); > +out: > + return; > + > +send_err: > + tipc_close_conn(con); > +} > + > +/** > + * tipc_recv_work - receive workqueue function > + */ > +static void tipc_recv_work(struct work_struct *work) > +{ > + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); > + int err; > + > + do { > + err = con->rx_action(con); > + } while (!err); > + > + conn_put(con); > +} > + > +/** > + * tipc_send_work - send workqueue function > + */ > +static void tipc_send_work(struct work_struct *work) > +{ > + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); > + > + tipc_send_to_sock(con); > + conn_put(con); > +} > + > +/** > + * tipc_work_stop - destroy allocated workqueues > + */ > +static void tipc_work_stop(struct tipc_server *s) > +{ > + destroy_workqueue(s->rcv_wq); > + destroy_workqueue(s->send_wq); > +} > + > +/** > + * tipc_work_start - allocate workqueues > + */ > +static int tipc_work_start(struct tipc_server *s) > +{ > + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); > + if (!s->rcv_wq) { > + pr_err("can't start tipc receive workqueue\n"); > + return -ENOMEM; > + } > + > + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); > + if (!s->send_wq) { > + pr_err("can't start tipc send workqueue\n"); > + destroy_workqueue(s->rcv_wq); > + return -ENOMEM; > + } > + > + return 0; > +} > + > +/** > + * tipc_server_start - launch server > + */ > +int tipc_server_start(struct tipc_server *s) > +{ > + int ret; > + int i; > + > + s->conid = 0; > + rwlock_init(&s->conn_hlist_lock); > + > + for (i = 0; i < CONN_HASH_SIZE; i++) > + INIT_HLIST_HEAD(&s->conn_hlist[i]); > + > + s->rcvbuf_cache = kmem_cache_create("tipc_rcvbuf", s->max_rcvbuf_size, > + 0, SLAB_HWCACHE_ALIGN, NULL); > + if (!s->rcvbuf_cache) > + return -ENOMEM; > + > + ret = tipc_work_start(s); > + if (ret < 0) { > + kmem_cache_destroy(s->rcvbuf_cache); > + return ret; > + } > + > + ret = tipc_open_listening_sock(s); > + return ret; > +} > + > +/** > + * tipc_server_stop - stop server and destroy all allocated resources > + */ > +void tipc_server_stop(struct tipc_server *s) > +{ > + struct hlist_node *h, *h1; > + struct tipc_conn *con; > + int i; > + > + for (i = 0; i < CONN_HASH_SIZE; i++) { > + hlist_for_each_entry_safe(con, h, h1, &s->conn_hlist[i], list) > + tipc_close_conn(con); > + } > + > + tipc_work_stop(s); > + kmem_cache_destroy(s->rcvbuf_cache); > +} > diff --git a/net/tipc/server.h b/net/tipc/server.h > new file mode 100644 > index 0000000..4b2c4cf > --- /dev/null > +++ b/net/tipc/server.h > @@ -0,0 +1,103 @@ > +/* > + * net/tipc/server.h: Include file for TIPC server code > + * > + * Copyright (c) 2012, Wind River Systems > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions are met: > + * > + * 1. Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * 2. Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in the > + * documentation and/or other materials provided with the distribution. > + * 3. Neither the names of the copyright holders nor the names of its > + * contributors may be used to endorse or promote products derived from > + * this software without specific prior written permission. > + * > + * Alternatively, this software may be distributed under the terms of the > + * GNU General Public License ("GPL") version 2 as published by the Free > + * Software Foundation. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" > + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE > + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE > + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR > + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS > + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN > + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) > + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE > + * POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#ifndef _TIPC_SERVER_H > +#define _TIPC_SERVER_H > + > +#include "core.h" > + > +#define TIPC_SERVER_NAME_LEN 32 > +#define CONN_HASH_SIZE 32 > + > +/** > + * struct tipc_server - TIPC server structure > + * @conid: connection identifier associated with server > + * @conn_hlist: links to all client connections connecting to server > + * @conn_hlist_lock: conn_hlist list lock > + * @rcvbuf_cache: memory cache of server receive buffer > + * @rcv_wq: receive workqueue > + * @send_wq: send workqueue > + * @saddr: TIPC server address > + * @imp: message importance > + * @type: socket type > + * @max_rcvbuf_size: maximum permitted receive message length > + * @name: server name > + * @tipc_conn_new: callback will be called when new connection is incoming > + * @tipc_conn_shutdown: callback will be called when connection is shut down > + * @tipc_conn_recvmsg: callback will be called when message arrives > + */ > +struct tipc_server { > + unsigned int conid; > + struct hlist_head conn_hlist[CONN_HASH_SIZE]; > + rwlock_t conn_hlist_lock; > + struct kmem_cache *rcvbuf_cache; > + struct workqueue_struct *rcv_wq; > + struct workqueue_struct *send_wq; > + struct sockaddr_tipc *saddr; > + int imp; > + int type; > + int max_rcvbuf_size; > + const char name[TIPC_SERVER_NAME_LEN]; > + void *(*tipc_conn_new) (unsigned int conid); > + void (*tipc_conn_shutdown) (unsigned int conid, void *usr_data); > + void (*tipc_conn_recvmsg) (unsigned int conid, > + struct sockaddr_tipc *addr, > + void *usr_data, void *buf, size_t len); > +}; > + > +/** > + * tipc_conn_sendmsg - send message to connection > + */ > +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, > + struct sockaddr_tipc *addr, void *data, size_t len); > + > +/** > + * tipc_conn_terminate - terminate connection with server > + * > + * Note: Must call it in process context since it might sleep > + */ > +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid); > + > +/** > + * tipc_server_start - start server > + */ > +int tipc_server_start(struct tipc_server *s); > + > +/** > + * tipc_server_stop - stop server > + */ > +void tipc_server_stop(struct tipc_server *s); > + > +#endif > -- > 1.7.1 > |
From: Ying X. <yin...@wi...> - 2012-10-26 10:07:20
|
Hi Paul, Thank you for your given comments, please see my responses inline. Paul Gortmaker wrote: > [[PATCH net-next v2 1/6] tipc: introduce new TIPC server infrastructure] On 26/09/2012 (Wed 16:20) Ying Xue wrote: > > [I'm circling back to this series since Jon indicated the topology drops > were one of the larger issues to be solved, and he was putting this > review to the top of his list.] > > >> TIPC have two internal servers, one that provides a subscription >> > > s/have/has/ > > OK. >> service for topology events and another that provides the >> configuration interface. These servers have previously been running >> in BH context, accessing the TIPC-port API directly. This required >> complex lock policies to be implemented and it also caused >> scalability problems when the topology server port got congested >> and events got dropped silently. >> > > It would be nice to know a bit more detail about the above problems, > so that the reviewer can know what you know, and then hopefully agree > that the server introduction is the right thing to do. > > OK, I will add more explanations. >> Therefor, we are introducing a TIPC server module that uses kernel >> sockets for message passing(instead of TIPC port directly). >> > > Similarly, some more information about this new server would be good. > How does it change use cases for people, or anything else? Details > on what you thought about when choosing this path, and how it would > be efficient and solve the problems that I want expanded above. > Details on the flowpath that is now taken would be a part of that. > > OK. >> Signed-off-by: Ying Xue <yin...@wi...> >> --- >> net/tipc/Makefile | 2 +- >> net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ >> net/tipc/server.h | 103 ++++++++ >> 3 files changed, 790 insertions(+), 1 deletions(-) >> > > Looking at the diffstat, it appears to me that the code is added, and it > is compiled, but since no other files are touched at all, it does > nothing (i.e. is totally unused) after just this commit is applied. > For example, tipc_server_start is never called. > > Yes, in this patch file, we do not call it. In fact I just want to introduce the new server model in a separate patch. In subsequent patches I will convert two servers(topology serve and configuration server). > The reason I mention this, is because this is somewhat of a forced > separation. Yes it is good to keep patches smaller where possible, but > at the same time I have seen people get review comments saying to not > introduce orphaned code -- i.e. the commit that adds the code should > also enable and use the code. (If you don't do it this way, then a > bisect will lead to a random user of broken code, and not the addition > of the broken code). > > Do you suggest we should add one conversion(for example, topology serve) in the patch? >> create mode 100644 net/tipc/server.c >> create mode 100644 net/tipc/server.h >> >> diff --git a/net/tipc/Makefile b/net/tipc/Makefile >> index 6cd55d6..af7206b 100644 >> --- a/net/tipc/Makefile >> +++ b/net/tipc/Makefile >> @@ -8,4 +8,4 @@ tipc-y += addr.o bcast.o bearer.o config.o \ >> core.o handler.o link.o discover.o msg.o \ >> name_distr.o subscr.o name_table.o net.o \ >> netlink.o node.o node_subscr.o port.o ref.o \ >> - socket.o log.o eth_media.o >> + socket.o log.o eth_media.o server.o >> diff --git a/net/tipc/server.c b/net/tipc/server.c >> new file mode 100644 >> index 0000000..0c369ae >> --- /dev/null >> +++ b/net/tipc/server.c >> @@ -0,0 +1,686 @@ >> +/* >> + * net/tipc/server.c: TIPC server infrastructure >> + * >> + * Copyright (c) 2012 Wind River Systems >> + * All rights reserved. >> + * >> + * Redistribution and use in source and binary forms, with or without >> + * modification, are permitted provided that the following conditions are met: >> + * >> + * 1. Redistributions of source code must retain the above copyright >> + * notice, this list of conditions and the following disclaimer. >> + * 2. Redistributions in binary form must reproduce the above copyright >> + * notice, this list of conditions and the following disclaimer in the >> + * documentation and/or other materials provided with the distribution. >> + * 3. Neither the names of the copyright holders nor the names of its >> + * contributors may be used to endorse or promote products derived from >> + * this software without specific prior written permission. >> + * >> + * Alternatively, this software may be distributed under the terms of the >> + * GNU General Public License ("GPL") version 2 as published by the Free >> + * Software Foundation. >> + * >> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" >> + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE >> + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE >> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE >> + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR >> + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF >> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS >> + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN >> + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) >> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE >> + * POSSIBILITY OF SUCH DAMAGE. >> + */ >> + >> +#include "server.h" >> +#include <net/sock.h> >> +#include <linux/module.h> >> + >> +/* Number of messages to send before rescheduling */ >> +#define MAX_SEND_MSG_COUNT 25 >> +#define CF_CONNECTED 1 >> > > Odd whitespace usage here. > > I will change it. >> + >> +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) >> + >> +/** >> + * struct tipc_conn - TIPC connection structure >> + * @kref: reference counter to connection object >> + * @conid: connection identifier >> + * @sock: socket handler associated with connection >> + * @flags: indicates connection state >> + * @server: pointer to connected server >> + * @outqueue: pointer to first outbound message in queue >> + * @outqueue_lock: controll access to the outqueue >> + * @rx_action: what to do when connection socket is active >> + * @list: list of connection objects for its server >> + * @rwork: receive work item >> + * @swork: send work item >> + * @usr_data: user-specified field >> + */ >> +struct tipc_conn { >> + struct kref kref; >> + unsigned int conid; >> + struct socket *sock; >> + unsigned long flags; >> + struct tipc_server *server; >> + struct list_head outqueue; >> + spinlock_t outqueue_lock; >> + int (*rx_action) (struct tipc_conn *con); >> + struct hlist_node list; >> + struct work_struct rwork; >> + struct work_struct swork; >> + void *usr_data; >> +}; >> + >> +/* An entry waiting to be sent */ >> +struct outqueue_entry { >> + struct list_head list; >> + struct kvec iov; >> + struct sockaddr_tipc dest; >> +}; >> + >> +static void tipc_recv_work(struct work_struct *work); >> +static void tipc_send_work(struct work_struct *work); >> +static void clean_outqueues(struct tipc_conn *con); >> + >> +static void tipc_conn_kref_release(struct kref *kref) >> +{ >> + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); >> + struct tipc_server *s = con->server; >> + >> + if (con->sock) { >> + __module_get(con->sock->ops->owner); >> + __module_get(con->sock->sk->sk_prot_creator->owner); >> > > The use of __module_get (vs non underscore) and its use 2x probably > needs some sort of explanation. > > OK, I will add more comments here. >> + sock_release(con->sock); >> + con->sock = NULL; >> + } >> + >> + clean_outqueues(con); >> + >> + if (con->conid) >> + s->tipc_conn_shutdown(con->conid, con->usr_data); >> + >> + kfree(con); >> +} >> + >> +static void conn_put(struct tipc_conn *con) >> +{ >> + kref_put(&con->kref, tipc_conn_kref_release); >> +} >> + >> +static void conn_get(struct tipc_conn *con) >> +{ >> + kref_get(&con->kref); >> +} >> + >> +static inline unsigned int conid_hash(unsigned int conid) >> +{ >> + return conid & (CONN_HASH_SIZE - 1); >> +} >> + >> +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, >> + unsigned int conid) >> +{ >> + struct hlist_node *h; >> + struct tipc_conn *con; >> + unsigned int r; >> + >> + r = conid_hash(conid); >> + >> + read_lock_bh(&s->conn_hlist_lock); >> + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { >> + if (con->conid == conid) { >> + conn_get(con); >> + read_unlock_bh(&s->conn_hlist_lock); >> + return con; >> + } >> + } >> + read_unlock_bh(&s->conn_hlist_lock); >> + return NULL; >> +} >> + >> +static void sock_data_ready(struct sock *sk, int unused) >> +{ >> + struct tipc_conn *con; >> + >> + read_lock(&sk->sk_callback_lock); >> + con = sock2con(sk); >> + if (con && test_bit(CF_CONNECTED, &con->flags)) { >> + conn_get(con); >> + if (!queue_work(con->server->rcv_wq, &con->rwork)) >> + conn_put(con); >> + } >> + read_unlock(&sk->sk_callback_lock); >> +} >> + >> +static void sock_write_space(struct sock *sk) >> +{ >> + struct tipc_conn *con; >> + >> + read_lock(&sk->sk_callback_lock); >> + con = sock2con(sk); >> + if (con && test_bit(CF_CONNECTED, &con->flags)) { >> + conn_get(con); >> + if (!queue_work(con->server->send_wq, &con->swork)) >> + conn_put(con); >> + } >> + read_unlock(&sk->sk_callback_lock); >> +} >> + >> +/** >> > > Lets not use the kerneldoc /** start marker if there is no actual > kerneldoc content to be processed. (same for all instances). > > OK, I will remove them. >> + * tipc_register_callbacks - register socket callbacks >> + */ >> +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) >> +{ >> + struct sock *sk = sock->sk; >> + >> + write_lock_bh(&sk->sk_callback_lock); >> + >> + sk->sk_data_ready = sock_data_ready; >> + sk->sk_write_space = sock_write_space; >> > > Do the above two lines have any relation to the other patch I have > queued that does the tipc_data_ready and tipc_write_space additions? > > Yes, this patch must be based on your queued patches, otherwise, without them, it would not work correctly. That's another reason why I make them. >> + sk->sk_user_data = con; >> + >> + con->sock = sock; >> + >> + write_unlock_bh(&sk->sk_callback_lock); >> +} >> + >> +/** >> + * tipc_unregister_callbacks - unregister socket callbacks >> + */ >> +static void tipc_unregister_callbacks(struct tipc_conn *con) >> +{ >> + struct sock *sk = con->sock->sk; >> + >> + write_lock_bh(&sk->sk_callback_lock); >> + sk->sk_user_data = NULL; >> + write_unlock_bh(&sk->sk_callback_lock); >> +} >> + >> +/** >> + * tipc_close_conn - close connection >> + */ >> +static void tipc_close_conn(struct tipc_conn *con) >> +{ >> + struct tipc_server *s = con->server; >> + >> + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { >> + write_lock_bh(&s->conn_hlist_lock); >> + hlist_del(&con->list); >> + write_unlock_bh(&s->conn_hlist_lock); >> + >> + tipc_unregister_callbacks(con); >> + >> + /* >> + * We shouldn't flush pending works as we may be in the >> + * thread. In fact the races with pending rx/tx work structs >> + * are harmless for us here as we have already deleted this >> + * connection from server connection list and set >> + * sk->sk_user_data to 0 before release connection object. >> > > s/release/releasing/ > OK. > >> + */ >> + kernel_sock_shutdown(con->sock, SHUT_RDWR); >> + >> + conn_put(con); >> + } >> +} >> + >> +/** >> + * get_conid - get unique connection ID >> + */ >> +static unsigned int get_conid(struct tipc_server *s) >> +{ >> + struct hlist_node *h; >> + struct tipc_conn *con; >> + unsigned int r; >> + unsigned int i; >> + >> + for (i = 0; i < 0xffffffff; i++) { >> > > How often is this called? How far in the loop will it typically go > before finding something? It is used within write_lock_bh scope. > > I know your concern that you are afraid it will trigger soft lockup something else. In fact, how much time is consumed here, it depends on how many clients of a server will be created. In practice it's hard that the number of clients exceeds 10. Therefor, the 0xffffffff is only value in theory. >> + r = conid_hash(s->conid); >> + >> + if (hlist_empty(&s->conn_hlist[r])) >> + return s->conid; >> + >> + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { >> + if (con->conid != s->conid) >> + return s->conid; >> + } >> + s->conid++; >> + } >> + panic("failed to allocate unique connection ID\n"); >> > > No. Don't do this. Linus consistently gives people trouble for using > panic too much. It should only be used when things like memory > corruption or similar catastrophe means the machine _really_ can not > continue to run. > > OK, I will change it. >> + return 0; >> +} >> + >> +/** >> + * alloc_tipc_conn - allocate connection object >> + */ >> +static struct tipc_conn *alloc_tipc_conn(struct tipc_server *s) >> +{ >> + struct tipc_conn *con; >> + int r; >> + >> + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); >> + if (!con) >> + return NULL; >> + >> + write_lock_bh(&s->conn_hlist_lock); >> + r = conid_hash(get_conid(s)); >> + hlist_add_head(&con->list, &s->conn_hlist[r]); >> + con->conid = s->conid++; >> + write_unlock_bh(&s->conn_hlist_lock); >> + >> + set_bit(CF_CONNECTED, &con->flags); >> + con->server = s; >> + kref_init(&con->kref); >> + INIT_LIST_HEAD(&con->outqueue); >> + spin_lock_init(&con->outqueue_lock); >> + INIT_WORK(&con->swork, tipc_send_work); >> + INIT_WORK(&con->rwork, tipc_recv_work); >> + >> + return con; >> +} >> + >> +/** >> + * tipc_receive_from_sock - receive data from remote end >> + */ >> +static int tipc_receive_from_sock(struct tipc_conn *con) >> +{ >> + struct kvec iov; >> + void *buf; >> + int ret; >> + struct sockaddr_tipc addr; >> + struct msghdr msg = {}; >> + struct tipc_server *s = con->server; >> + >> + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); >> + if (!buf) { >> + ret = -ENOMEM; >> + goto out_close; >> + } >> + >> + iov.iov_base = buf; >> + iov.iov_len = s->max_rcvbuf_size; >> + msg.msg_name = &addr; >> + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, >> + MSG_DONTWAIT); >> + if (ret <= 0) { >> + kmem_cache_free(s->rcvbuf_cache, buf); >> + goto out_close; >> + } >> + >> + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); >> + >> + kmem_cache_free(s->rcvbuf_cache, buf); >> + >> + return 0; >> + >> +out_close: >> + if (ret != -EWOULDBLOCK) >> + tipc_close_conn(con); >> + else if (ret == 0) >> + /* Don't return success if we really got EOF */ >> + ret = -EAGAIN; >> + >> + return ret; >> +} >> + >> +/** >> + * tipc_accept_from_sock - listen and accept a new connection >> + */ >> +static int tipc_accept_from_sock(struct tipc_conn *con) >> +{ >> + struct socket *newsock; >> + struct tipc_conn *newcon; >> + int ret; >> + struct tipc_server *s = con->server; >> + >> + ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); >> + if (ret < 0) >> + goto exit; >> + >> + newcon = alloc_tipc_conn(con->server); >> + if (!newcon) { >> + ret = -ENOMEM; >> + sock_release(newsock); >> + goto exit; >> + } >> + >> + newcon->rx_action = tipc_receive_from_sock; >> + tipc_register_callbacks(newsock, newcon); >> + >> + /* Notify new connection is incoming */ >> + newcon->usr_data = s->tipc_conn_new(newcon->conid); >> + >> + module_put(newsock->ops->owner); >> + module_put(newsock->sk->sk_prot_creator->owner); >> + >> +exit: >> + return ret; >> +} >> + >> +/** >> + * tipc_create_listen_sock - create and listen server socket >> + */ >> +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) >> +{ >> + int ret; >> + struct socket *sock = NULL; >> + struct tipc_server *s = con->server; >> + >> + ret = sock_create_kern(AF_TIPC, s->type, 0, &sock); >> + if (ret < 0) >> + return NULL; >> + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, >> + (char *)&s->imp, sizeof(s->imp)); >> + if (ret < 0) >> + goto create_err; >> + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, >> + sizeof(*s->saddr)); >> + if (ret < 0) >> + goto create_err; >> + >> + switch (s->type) { >> + case SOCK_STREAM: >> + case SOCK_SEQPACKET: >> + con->rx_action = tipc_accept_from_sock; >> + >> + ret = kernel_listen(sock, 0); >> + if (ret < 0) >> + goto create_err; >> + break; >> + case SOCK_DGRAM: >> + case SOCK_RDM: >> + con->rx_action = tipc_receive_from_sock; >> + break; >> + default: >> + pr_err("Unknown socket type %d\n", s->type); >> + goto create_err; >> + } >> + >> + /* >> + * Hack: we put TIPC module twice since its refcount has been >> + * increased two times when socket is created, otherwise, the >> + * module cannot be unloaded at all. But it's safe to descrease >> + * its refcount here because the lifetime of the socket is always >> + * same as TIPC module. Likely we must inscrease it when the >> + * socket is closed. >> + */ >> + module_put(sock->ops->owner); >> + module_put(sock->sk->sk_prot_creator->owner); >> > > This is very likely to create a red flag when(if?) it goes to netdev. > Maybe once you write more in the commit log about the whole strategy of > what you are trying to solve, then I can comment more on what to do? > > OK, I understood your meaning. >> + >> + return sock; >> + >> +create_err: >> + sock_release(sock); >> + con->sock = NULL; >> + return NULL; >> +} >> + >> +/** >> + * tipc_open_listening_sock - create server to listen connections on its socket >> + */ >> +static int tipc_open_listening_sock(struct tipc_server *server) >> +{ >> + struct socket *sock; >> + struct tipc_conn *con; >> + int ret = -EINVAL; >> + >> + con = alloc_tipc_conn(server); >> + if (!con) >> + return -ENOMEM; >> + >> + sock = tipc_create_listen_sock(con); >> + if (sock) { >> + tipc_register_callbacks(sock, con); >> + ret = 0; >> + } >> + >> + return ret; >> +} >> + >> +static struct outqueue_entry *alloc_entry(void *data, int len) >> +{ >> + struct outqueue_entry *entry; >> + void *buf; >> + >> + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); >> + if (!entry) >> + return NULL; >> + >> + buf = kmalloc(len, GFP_ATOMIC); >> + if (!buf) { >> + kfree(entry); >> + return NULL; >> + } >> + >> + memcpy(buf, data, len); >> + entry->iov.iov_base = buf; >> + entry->iov.iov_len = len; >> + >> + return entry; >> +} >> + >> +static void free_entry(struct outqueue_entry *e) >> +{ >> + kfree(e->iov.iov_base); >> + kfree(e); >> +} >> > > I wouldn't do this, since it is only used twice. That, and the name > makes it sound "generic" -- meaning when I 1st saw it used, I went to > grep the linux/include dir for it (and eventually returned here). > Similar comments for anything else that sounds generic and not tipc > specific -- even if it is static, it can appear in a backtrace. > > See mainline commits starting at 7f9ab6ac2e79b9658eba7c8e3ad8a4392d308057 > and going back from there, as a cleanup requested by DaveM. > > I will change its function name. > I'd wait until Jon has added his comments about the overall > architecture, locking design and data flow strategy before resending. > Those kinds of issues are even more important to get right than the > kinds of things I've pointed out here. But detailing what the initial > plan was (like I asked for at the very top) may in fact help Jon, so > making a 1st pass at creating that new text might be good to start now. > (I know it would help me to better understand the goal here.) > > Before I send next review request, I am also looking forward to listening Jon's suggestions. Regards, Ying > Paul. > -- > > >> + >> +/** >> + * clean_outqueues - discard all entries on the outqueues >> + */ >> +static void clean_outqueues(struct tipc_conn *con) >> +{ >> + struct outqueue_entry *e, *safe; >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_for_each_entry_safe(e, safe, &con->outqueue, list) { >> + list_del(&e->list); >> + free_entry(e); >> + } >> + spin_unlock_bh(&con->outqueue_lock); >> +} >> + >> +/** >> + * tipc_conn_sendmsg - send message to server with connection ID >> + */ >> +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, >> + struct sockaddr_tipc *addr, void *data, size_t len) >> +{ >> + struct outqueue_entry *e; >> + struct tipc_conn *con; >> + >> + con = tipc_conn_lookup(s, conid); >> + if (!con) >> + return -1; >> + >> + e = alloc_entry(data, len); >> + if (!e) { >> + conn_put(con); >> + return -ENOMEM; >> + } >> + >> + if (addr) >> + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_add_tail(&e->list, &con->outqueue); >> + spin_unlock_bh(&con->outqueue_lock); >> + >> + if (test_bit(CF_CONNECTED, &con->flags)) { >> + if (!queue_work(s->send_wq, &con->swork)) >> + conn_put(con); >> + } >> + >> + return 0; >> +} >> + >> +/** >> + * tipc_conn_terminate - terminate connection with server >> + */ >> +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid) >> +{ >> + struct tipc_conn *con; >> + >> + con = tipc_conn_lookup(s, conid); >> + if (con) { >> + tipc_close_conn(con); >> + conn_put(con); >> + } >> +} >> + >> +/** >> + * tipc_send_to_sock - try its best to send out all messages in its outqueue >> + */ >> +static void tipc_send_to_sock(struct tipc_conn *con) >> +{ >> + int count = 0; >> + struct tipc_server *s = con->server; >> + struct outqueue_entry *e; >> + struct msghdr msg; >> + int ret; >> + >> + spin_lock_bh(&con->outqueue_lock); >> + while (1) { >> + e = list_entry(con->outqueue.next, struct outqueue_entry, >> + list); >> + if ((struct list_head *) e == &con->outqueue) >> + break; >> + spin_unlock_bh(&con->outqueue_lock); >> + >> + memset(&msg, 0, sizeof(msg)); >> + msg.msg_flags = MSG_DONTWAIT; >> + >> + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { >> + msg.msg_name = &e->dest; >> + msg.msg_namelen = sizeof(struct sockaddr_tipc); >> + } >> + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, >> + e->iov.iov_len); >> + if (ret == -EWOULDBLOCK || ret == 0) { >> + cond_resched(); >> + goto out; >> + } else if (ret < 0) { >> + goto send_err; >> + } >> + >> + /* Don't starve people filling buffers */ >> + if (++count >= MAX_SEND_MSG_COUNT) { >> + cond_resched(); >> + count = 0; >> + } >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_del(&e->list); >> + free_entry(e); >> + } >> + spin_unlock_bh(&con->outqueue_lock); >> +out: >> + return; >> + >> +send_err: >> + tipc_close_conn(con); >> +} >> + >> +/** >> + * tipc_recv_work - receive workqueue function >> + */ >> +static void tipc_recv_work(struct work_struct *work) >> +{ >> + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); >> + int err; >> + >> + do { >> + err = con->rx_action(con); >> + } while (!err); >> + >> + conn_put(con); >> +} >> + >> +/** >> + * tipc_send_work - send workqueue function >> + */ >> +static void tipc_send_work(struct work_struct *work) >> +{ >> + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); >> + >> + tipc_send_to_sock(con); >> + conn_put(con); >> +} >> + >> +/** >> + * tipc_work_stop - destroy allocated workqueues >> + */ >> +static void tipc_work_stop(struct tipc_server *s) >> +{ >> + destroy_workqueue(s->rcv_wq); >> + destroy_workqueue(s->send_wq); >> +} >> + >> +/** >> + * tipc_work_start - allocate workqueues >> + */ >> +static int tipc_work_start(struct tipc_server *s) >> +{ >> + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); >> + if (!s->rcv_wq) { >> + pr_err("can't start tipc receive workqueue\n"); >> + return -ENOMEM; >> + } >> + >> + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); >> + if (!s->send_wq) { >> + pr_err("can't start tipc send workqueue\n"); >> + destroy_workqueue(s->rcv_wq); >> + return -ENOMEM; >> + } >> + >> + return 0; >> +} >> + >> +/** >> + * tipc_server_start - launch server >> + */ >> +int tipc_server_start(struct tipc_server *s) >> +{ >> + int ret; >> + int i; >> + >> + s->conid = 0; >> + rwlock_init(&s->conn_hlist_lock); >> + >> + for (i = 0; i < CONN_HASH_SIZE; i++) >> + INIT_HLIST_HEAD(&s->conn_hlist[i]); >> + >> + s->rcvbuf_cache = kmem_cache_create("tipc_rcvbuf", s->max_rcvbuf_size, >> + 0, SLAB_HWCACHE_ALIGN, NULL); >> + if (!s->rcvbuf_cache) >> + return -ENOMEM; >> + >> + ret = tipc_work_start(s); >> + if (ret < 0) { >> + kmem_cache_destroy(s->rcvbuf_cache); >> + return ret; >> + } >> + >> + ret = tipc_open_listening_sock(s); >> + return ret; >> +} >> + >> +/** >> + * tipc_server_stop - stop server and destroy all allocated resources >> + */ >> +void tipc_server_stop(struct tipc_server *s) >> +{ >> + struct hlist_node *h, *h1; >> + struct tipc_conn *con; >> + int i; >> + >> + for (i = 0; i < CONN_HASH_SIZE; i++) { >> + hlist_for_each_entry_safe(con, h, h1, &s->conn_hlist[i], list) >> + tipc_close_conn(con); >> + } >> + >> + tipc_work_stop(s); >> + kmem_cache_destroy(s->rcvbuf_cache); >> +} >> diff --git a/net/tipc/server.h b/net/tipc/server.h >> new file mode 100644 >> index 0000000..4b2c4cf >> --- /dev/null >> +++ b/net/tipc/server.h >> @@ -0,0 +1,103 @@ >> +/* >> + * net/tipc/server.h: Include file for TIPC server code >> + * >> + * Copyright (c) 2012, Wind River Systems >> + * All rights reserved. >> + * >> + * Redistribution and use in source and binary forms, with or without >> + * modification, are permitted provided that the following conditions are met: >> + * >> + * 1. Redistributions of source code must retain the above copyright >> + * notice, this list of conditions and the following disclaimer. >> + * 2. Redistributions in binary form must reproduce the above copyright >> + * notice, this list of conditions and the following disclaimer in the >> + * documentation and/or other materials provided with the distribution. >> + * 3. Neither the names of the copyright holders nor the names of its >> + * contributors may be used to endorse or promote products derived from >> + * this software without specific prior written permission. >> + * >> + * Alternatively, this software may be distributed under the terms of the >> + * GNU General Public License ("GPL") version 2 as published by the Free >> + * Software Foundation. >> + * >> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" >> + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE >> + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE >> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE >> + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR >> + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF >> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS >> + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN >> + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) >> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE >> + * POSSIBILITY OF SUCH DAMAGE. >> + */ >> + >> +#ifndef _TIPC_SERVER_H >> +#define _TIPC_SERVER_H >> + >> +#include "core.h" >> + >> +#define TIPC_SERVER_NAME_LEN 32 >> +#define CONN_HASH_SIZE 32 >> + >> +/** >> + * struct tipc_server - TIPC server structure >> + * @conid: connection identifier associated with server >> + * @conn_hlist: links to all client connections connecting to server >> + * @conn_hlist_lock: conn_hlist list lock >> + * @rcvbuf_cache: memory cache of server receive buffer >> + * @rcv_wq: receive workqueue >> + * @send_wq: send workqueue >> + * @saddr: TIPC server address >> + * @imp: message importance >> + * @type: socket type >> + * @max_rcvbuf_size: maximum permitted receive message length >> + * @name: server name >> + * @tipc_conn_new: callback will be called when new connection is incoming >> + * @tipc_conn_shutdown: callback will be called when connection is shut down >> + * @tipc_conn_recvmsg: callback will be called when message arrives >> + */ >> +struct tipc_server { >> + unsigned int conid; >> + struct hlist_head conn_hlist[CONN_HASH_SIZE]; >> + rwlock_t conn_hlist_lock; >> + struct kmem_cache *rcvbuf_cache; >> + struct workqueue_struct *rcv_wq; >> + struct workqueue_struct *send_wq; >> + struct sockaddr_tipc *saddr; >> + int imp; >> + int type; >> + int max_rcvbuf_size; >> + const char name[TIPC_SERVER_NAME_LEN]; >> + void *(*tipc_conn_new) (unsigned int conid); >> + void (*tipc_conn_shutdown) (unsigned int conid, void *usr_data); >> + void (*tipc_conn_recvmsg) (unsigned int conid, >> + struct sockaddr_tipc *addr, >> + void *usr_data, void *buf, size_t len); >> +}; >> + >> +/** >> + * tipc_conn_sendmsg - send message to connection >> + */ >> +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, >> + struct sockaddr_tipc *addr, void *data, size_t len); >> + >> +/** >> + * tipc_conn_terminate - terminate connection with server >> + * >> + * Note: Must call it in process context since it might sleep >> + */ >> +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid); >> + >> +/** >> + * tipc_server_start - start server >> + */ >> +int tipc_server_start(struct tipc_server *s); >> + >> +/** >> + * tipc_server_stop - stop server >> + */ >> +void tipc_server_stop(struct tipc_server *s); >> + >> +#endif >> -- >> 1.7.1 >> >> > > |
From: Paul G. <pau...@wi...> - 2012-10-26 21:36:34
|
[Re: [PATCH net-next v2 1/6] tipc: introduce new TIPC server infrastructure] On 26/10/2012 (Fri 18:07) Ying Xue wrote: > Hi Paul, > > Thank you for your given comments, please see my responses inline. > > Paul Gortmaker wrote: > >[[PATCH net-next v2 1/6] tipc: introduce new TIPC server infrastructure] On 26/09/2012 (Wed 16:20) Ying Xue wrote: > > [...] > >>--- > >> net/tipc/Makefile | 2 +- > >> net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ > >> net/tipc/server.h | 103 ++++++++ > >> 3 files changed, 790 insertions(+), 1 deletions(-) > > > >Looking at the diffstat, it appears to me that the code is added, and it > >is compiled, but since no other files are touched at all, it does > >nothing (i.e. is totally unused) after just this commit is applied. > >For example, tipc_server_start is never called. > > > Yes, in this patch file, we do not call it. > In fact I just want to introduce the new server model in a separate patch. > In subsequent patches I will convert two servers(topology serve and > configuration server). > >The reason I mention this, is because this is somewhat of a forced > >separation. Yes it is good to keep patches smaller where possible, but > >at the same time I have seen people get review comments saying to not > >introduce orphaned code -- i.e. the commit that adds the code should > >also enable and use the code. (If you don't do it this way, then a > >bisect will lead to a random user of broken code, and not the addition > >of the broken code). > > > Do you suggest we should add one conversion(for example, topology > serve) in the patch? Yes -- I think so. Unless it really does make a giant un-reviewable patch out of things. In that case, one might be able to justify it being a stand alone addition of "unused" code, but the commit log would have to actually acknowledge that, with something like: As of this commit, this new server infrastructure is built, but not actually yet called by the existing TIPC code, but since the conversion changes required in order to use it are significant, the addition is kept here as a separate commit. I did a similar thing (but in reverse) when I deleted all the token ring junk. See the 2nd from last paragraph here: http://lwn.net/Articles/497394/ Thanks, Paul. -- |
From: Erik H. <eri...@er...> - 2012-11-08 11:53:32
|
Didn't notice this until now.. after this patch, the tipc_createport() function is no longer referenced and should be removed from port.c/h //E |
From: Ying X. <yin...@wi...> - 2012-11-12 06:22:24
|
Erik Hugne wrote: > Didn't notice this until now.. after this patch, the tipc_createport() > function is no longer referenced and should be removed from port.c/h > > Thanks your reminder. Yes, the original tipc_createport() should be removed. Actually in the patch it was also removed. But, in the patch the tipc_createport_raw() was renamed to tipc_createport(). I guess this is why you still see the tipc_createport(). Regards, Ying > //E > > |