From: Ying X. <yin...@wi...> - 2012-10-26 10:07:20
|
Hi Paul, Thank you for your given comments, please see my responses inline. Paul Gortmaker wrote: > [[PATCH net-next v2 1/6] tipc: introduce new TIPC server infrastructure] On 26/09/2012 (Wed 16:20) Ying Xue wrote: > > [I'm circling back to this series since Jon indicated the topology drops > were one of the larger issues to be solved, and he was putting this > review to the top of his list.] > > >> TIPC have two internal servers, one that provides a subscription >> > > s/have/has/ > > OK. >> service for topology events and another that provides the >> configuration interface. These servers have previously been running >> in BH context, accessing the TIPC-port API directly. This required >> complex lock policies to be implemented and it also caused >> scalability problems when the topology server port got congested >> and events got dropped silently. >> > > It would be nice to know a bit more detail about the above problems, > so that the reviewer can know what you know, and then hopefully agree > that the server introduction is the right thing to do. > > OK, I will add more explanations. >> Therefor, we are introducing a TIPC server module that uses kernel >> sockets for message passing(instead of TIPC port directly). >> > > Similarly, some more information about this new server would be good. > How does it change use cases for people, or anything else? Details > on what you thought about when choosing this path, and how it would > be efficient and solve the problems that I want expanded above. > Details on the flowpath that is now taken would be a part of that. > > OK. >> Signed-off-by: Ying Xue <yin...@wi...> >> --- >> net/tipc/Makefile | 2 +- >> net/tipc/server.c | 686 +++++++++++++++++++++++++++++++++++++++++++++++++++++ >> net/tipc/server.h | 103 ++++++++ >> 3 files changed, 790 insertions(+), 1 deletions(-) >> > > Looking at the diffstat, it appears to me that the code is added, and it > is compiled, but since no other files are touched at all, it does > nothing (i.e. is totally unused) after just this commit is applied. > For example, tipc_server_start is never called. > > Yes, in this patch file, we do not call it. In fact I just want to introduce the new server model in a separate patch. In subsequent patches I will convert two servers(topology serve and configuration server). > The reason I mention this, is because this is somewhat of a forced > separation. Yes it is good to keep patches smaller where possible, but > at the same time I have seen people get review comments saying to not > introduce orphaned code -- i.e. the commit that adds the code should > also enable and use the code. (If you don't do it this way, then a > bisect will lead to a random user of broken code, and not the addition > of the broken code). > > Do you suggest we should add one conversion(for example, topology serve) in the patch? >> create mode 100644 net/tipc/server.c >> create mode 100644 net/tipc/server.h >> >> diff --git a/net/tipc/Makefile b/net/tipc/Makefile >> index 6cd55d6..af7206b 100644 >> --- a/net/tipc/Makefile >> +++ b/net/tipc/Makefile >> @@ -8,4 +8,4 @@ tipc-y += addr.o bcast.o bearer.o config.o \ >> core.o handler.o link.o discover.o msg.o \ >> name_distr.o subscr.o name_table.o net.o \ >> netlink.o node.o node_subscr.o port.o ref.o \ >> - socket.o log.o eth_media.o >> + socket.o log.o eth_media.o server.o >> diff --git a/net/tipc/server.c b/net/tipc/server.c >> new file mode 100644 >> index 0000000..0c369ae >> --- /dev/null >> +++ b/net/tipc/server.c >> @@ -0,0 +1,686 @@ >> +/* >> + * net/tipc/server.c: TIPC server infrastructure >> + * >> + * Copyright (c) 2012 Wind River Systems >> + * All rights reserved. >> + * >> + * Redistribution and use in source and binary forms, with or without >> + * modification, are permitted provided that the following conditions are met: >> + * >> + * 1. Redistributions of source code must retain the above copyright >> + * notice, this list of conditions and the following disclaimer. >> + * 2. Redistributions in binary form must reproduce the above copyright >> + * notice, this list of conditions and the following disclaimer in the >> + * documentation and/or other materials provided with the distribution. >> + * 3. Neither the names of the copyright holders nor the names of its >> + * contributors may be used to endorse or promote products derived from >> + * this software without specific prior written permission. >> + * >> + * Alternatively, this software may be distributed under the terms of the >> + * GNU General Public License ("GPL") version 2 as published by the Free >> + * Software Foundation. >> + * >> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" >> + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE >> + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE >> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE >> + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR >> + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF >> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS >> + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN >> + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) >> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE >> + * POSSIBILITY OF SUCH DAMAGE. >> + */ >> + >> +#include "server.h" >> +#include <net/sock.h> >> +#include <linux/module.h> >> + >> +/* Number of messages to send before rescheduling */ >> +#define MAX_SEND_MSG_COUNT 25 >> +#define CF_CONNECTED 1 >> > > Odd whitespace usage here. > > I will change it. >> + >> +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) >> + >> +/** >> + * struct tipc_conn - TIPC connection structure >> + * @kref: reference counter to connection object >> + * @conid: connection identifier >> + * @sock: socket handler associated with connection >> + * @flags: indicates connection state >> + * @server: pointer to connected server >> + * @outqueue: pointer to first outbound message in queue >> + * @outqueue_lock: controll access to the outqueue >> + * @rx_action: what to do when connection socket is active >> + * @list: list of connection objects for its server >> + * @rwork: receive work item >> + * @swork: send work item >> + * @usr_data: user-specified field >> + */ >> +struct tipc_conn { >> + struct kref kref; >> + unsigned int conid; >> + struct socket *sock; >> + unsigned long flags; >> + struct tipc_server *server; >> + struct list_head outqueue; >> + spinlock_t outqueue_lock; >> + int (*rx_action) (struct tipc_conn *con); >> + struct hlist_node list; >> + struct work_struct rwork; >> + struct work_struct swork; >> + void *usr_data; >> +}; >> + >> +/* An entry waiting to be sent */ >> +struct outqueue_entry { >> + struct list_head list; >> + struct kvec iov; >> + struct sockaddr_tipc dest; >> +}; >> + >> +static void tipc_recv_work(struct work_struct *work); >> +static void tipc_send_work(struct work_struct *work); >> +static void clean_outqueues(struct tipc_conn *con); >> + >> +static void tipc_conn_kref_release(struct kref *kref) >> +{ >> + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); >> + struct tipc_server *s = con->server; >> + >> + if (con->sock) { >> + __module_get(con->sock->ops->owner); >> + __module_get(con->sock->sk->sk_prot_creator->owner); >> > > The use of __module_get (vs non underscore) and its use 2x probably > needs some sort of explanation. > > OK, I will add more comments here. >> + sock_release(con->sock); >> + con->sock = NULL; >> + } >> + >> + clean_outqueues(con); >> + >> + if (con->conid) >> + s->tipc_conn_shutdown(con->conid, con->usr_data); >> + >> + kfree(con); >> +} >> + >> +static void conn_put(struct tipc_conn *con) >> +{ >> + kref_put(&con->kref, tipc_conn_kref_release); >> +} >> + >> +static void conn_get(struct tipc_conn *con) >> +{ >> + kref_get(&con->kref); >> +} >> + >> +static inline unsigned int conid_hash(unsigned int conid) >> +{ >> + return conid & (CONN_HASH_SIZE - 1); >> +} >> + >> +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, >> + unsigned int conid) >> +{ >> + struct hlist_node *h; >> + struct tipc_conn *con; >> + unsigned int r; >> + >> + r = conid_hash(conid); >> + >> + read_lock_bh(&s->conn_hlist_lock); >> + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { >> + if (con->conid == conid) { >> + conn_get(con); >> + read_unlock_bh(&s->conn_hlist_lock); >> + return con; >> + } >> + } >> + read_unlock_bh(&s->conn_hlist_lock); >> + return NULL; >> +} >> + >> +static void sock_data_ready(struct sock *sk, int unused) >> +{ >> + struct tipc_conn *con; >> + >> + read_lock(&sk->sk_callback_lock); >> + con = sock2con(sk); >> + if (con && test_bit(CF_CONNECTED, &con->flags)) { >> + conn_get(con); >> + if (!queue_work(con->server->rcv_wq, &con->rwork)) >> + conn_put(con); >> + } >> + read_unlock(&sk->sk_callback_lock); >> +} >> + >> +static void sock_write_space(struct sock *sk) >> +{ >> + struct tipc_conn *con; >> + >> + read_lock(&sk->sk_callback_lock); >> + con = sock2con(sk); >> + if (con && test_bit(CF_CONNECTED, &con->flags)) { >> + conn_get(con); >> + if (!queue_work(con->server->send_wq, &con->swork)) >> + conn_put(con); >> + } >> + read_unlock(&sk->sk_callback_lock); >> +} >> + >> +/** >> > > Lets not use the kerneldoc /** start marker if there is no actual > kerneldoc content to be processed. (same for all instances). > > OK, I will remove them. >> + * tipc_register_callbacks - register socket callbacks >> + */ >> +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) >> +{ >> + struct sock *sk = sock->sk; >> + >> + write_lock_bh(&sk->sk_callback_lock); >> + >> + sk->sk_data_ready = sock_data_ready; >> + sk->sk_write_space = sock_write_space; >> > > Do the above two lines have any relation to the other patch I have > queued that does the tipc_data_ready and tipc_write_space additions? > > Yes, this patch must be based on your queued patches, otherwise, without them, it would not work correctly. That's another reason why I make them. >> + sk->sk_user_data = con; >> + >> + con->sock = sock; >> + >> + write_unlock_bh(&sk->sk_callback_lock); >> +} >> + >> +/** >> + * tipc_unregister_callbacks - unregister socket callbacks >> + */ >> +static void tipc_unregister_callbacks(struct tipc_conn *con) >> +{ >> + struct sock *sk = con->sock->sk; >> + >> + write_lock_bh(&sk->sk_callback_lock); >> + sk->sk_user_data = NULL; >> + write_unlock_bh(&sk->sk_callback_lock); >> +} >> + >> +/** >> + * tipc_close_conn - close connection >> + */ >> +static void tipc_close_conn(struct tipc_conn *con) >> +{ >> + struct tipc_server *s = con->server; >> + >> + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { >> + write_lock_bh(&s->conn_hlist_lock); >> + hlist_del(&con->list); >> + write_unlock_bh(&s->conn_hlist_lock); >> + >> + tipc_unregister_callbacks(con); >> + >> + /* >> + * We shouldn't flush pending works as we may be in the >> + * thread. In fact the races with pending rx/tx work structs >> + * are harmless for us here as we have already deleted this >> + * connection from server connection list and set >> + * sk->sk_user_data to 0 before release connection object. >> > > s/release/releasing/ > OK. > >> + */ >> + kernel_sock_shutdown(con->sock, SHUT_RDWR); >> + >> + conn_put(con); >> + } >> +} >> + >> +/** >> + * get_conid - get unique connection ID >> + */ >> +static unsigned int get_conid(struct tipc_server *s) >> +{ >> + struct hlist_node *h; >> + struct tipc_conn *con; >> + unsigned int r; >> + unsigned int i; >> + >> + for (i = 0; i < 0xffffffff; i++) { >> > > How often is this called? How far in the loop will it typically go > before finding something? It is used within write_lock_bh scope. > > I know your concern that you are afraid it will trigger soft lockup something else. In fact, how much time is consumed here, it depends on how many clients of a server will be created. In practice it's hard that the number of clients exceeds 10. Therefor, the 0xffffffff is only value in theory. >> + r = conid_hash(s->conid); >> + >> + if (hlist_empty(&s->conn_hlist[r])) >> + return s->conid; >> + >> + hlist_for_each_entry(con, h, &s->conn_hlist[r], list) { >> + if (con->conid != s->conid) >> + return s->conid; >> + } >> + s->conid++; >> + } >> + panic("failed to allocate unique connection ID\n"); >> > > No. Don't do this. Linus consistently gives people trouble for using > panic too much. It should only be used when things like memory > corruption or similar catastrophe means the machine _really_ can not > continue to run. > > OK, I will change it. >> + return 0; >> +} >> + >> +/** >> + * alloc_tipc_conn - allocate connection object >> + */ >> +static struct tipc_conn *alloc_tipc_conn(struct tipc_server *s) >> +{ >> + struct tipc_conn *con; >> + int r; >> + >> + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); >> + if (!con) >> + return NULL; >> + >> + write_lock_bh(&s->conn_hlist_lock); >> + r = conid_hash(get_conid(s)); >> + hlist_add_head(&con->list, &s->conn_hlist[r]); >> + con->conid = s->conid++; >> + write_unlock_bh(&s->conn_hlist_lock); >> + >> + set_bit(CF_CONNECTED, &con->flags); >> + con->server = s; >> + kref_init(&con->kref); >> + INIT_LIST_HEAD(&con->outqueue); >> + spin_lock_init(&con->outqueue_lock); >> + INIT_WORK(&con->swork, tipc_send_work); >> + INIT_WORK(&con->rwork, tipc_recv_work); >> + >> + return con; >> +} >> + >> +/** >> + * tipc_receive_from_sock - receive data from remote end >> + */ >> +static int tipc_receive_from_sock(struct tipc_conn *con) >> +{ >> + struct kvec iov; >> + void *buf; >> + int ret; >> + struct sockaddr_tipc addr; >> + struct msghdr msg = {}; >> + struct tipc_server *s = con->server; >> + >> + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); >> + if (!buf) { >> + ret = -ENOMEM; >> + goto out_close; >> + } >> + >> + iov.iov_base = buf; >> + iov.iov_len = s->max_rcvbuf_size; >> + msg.msg_name = &addr; >> + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, >> + MSG_DONTWAIT); >> + if (ret <= 0) { >> + kmem_cache_free(s->rcvbuf_cache, buf); >> + goto out_close; >> + } >> + >> + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); >> + >> + kmem_cache_free(s->rcvbuf_cache, buf); >> + >> + return 0; >> + >> +out_close: >> + if (ret != -EWOULDBLOCK) >> + tipc_close_conn(con); >> + else if (ret == 0) >> + /* Don't return success if we really got EOF */ >> + ret = -EAGAIN; >> + >> + return ret; >> +} >> + >> +/** >> + * tipc_accept_from_sock - listen and accept a new connection >> + */ >> +static int tipc_accept_from_sock(struct tipc_conn *con) >> +{ >> + struct socket *newsock; >> + struct tipc_conn *newcon; >> + int ret; >> + struct tipc_server *s = con->server; >> + >> + ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); >> + if (ret < 0) >> + goto exit; >> + >> + newcon = alloc_tipc_conn(con->server); >> + if (!newcon) { >> + ret = -ENOMEM; >> + sock_release(newsock); >> + goto exit; >> + } >> + >> + newcon->rx_action = tipc_receive_from_sock; >> + tipc_register_callbacks(newsock, newcon); >> + >> + /* Notify new connection is incoming */ >> + newcon->usr_data = s->tipc_conn_new(newcon->conid); >> + >> + module_put(newsock->ops->owner); >> + module_put(newsock->sk->sk_prot_creator->owner); >> + >> +exit: >> + return ret; >> +} >> + >> +/** >> + * tipc_create_listen_sock - create and listen server socket >> + */ >> +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) >> +{ >> + int ret; >> + struct socket *sock = NULL; >> + struct tipc_server *s = con->server; >> + >> + ret = sock_create_kern(AF_TIPC, s->type, 0, &sock); >> + if (ret < 0) >> + return NULL; >> + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, >> + (char *)&s->imp, sizeof(s->imp)); >> + if (ret < 0) >> + goto create_err; >> + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, >> + sizeof(*s->saddr)); >> + if (ret < 0) >> + goto create_err; >> + >> + switch (s->type) { >> + case SOCK_STREAM: >> + case SOCK_SEQPACKET: >> + con->rx_action = tipc_accept_from_sock; >> + >> + ret = kernel_listen(sock, 0); >> + if (ret < 0) >> + goto create_err; >> + break; >> + case SOCK_DGRAM: >> + case SOCK_RDM: >> + con->rx_action = tipc_receive_from_sock; >> + break; >> + default: >> + pr_err("Unknown socket type %d\n", s->type); >> + goto create_err; >> + } >> + >> + /* >> + * Hack: we put TIPC module twice since its refcount has been >> + * increased two times when socket is created, otherwise, the >> + * module cannot be unloaded at all. But it's safe to descrease >> + * its refcount here because the lifetime of the socket is always >> + * same as TIPC module. Likely we must inscrease it when the >> + * socket is closed. >> + */ >> + module_put(sock->ops->owner); >> + module_put(sock->sk->sk_prot_creator->owner); >> > > This is very likely to create a red flag when(if?) it goes to netdev. > Maybe once you write more in the commit log about the whole strategy of > what you are trying to solve, then I can comment more on what to do? > > OK, I understood your meaning. >> + >> + return sock; >> + >> +create_err: >> + sock_release(sock); >> + con->sock = NULL; >> + return NULL; >> +} >> + >> +/** >> + * tipc_open_listening_sock - create server to listen connections on its socket >> + */ >> +static int tipc_open_listening_sock(struct tipc_server *server) >> +{ >> + struct socket *sock; >> + struct tipc_conn *con; >> + int ret = -EINVAL; >> + >> + con = alloc_tipc_conn(server); >> + if (!con) >> + return -ENOMEM; >> + >> + sock = tipc_create_listen_sock(con); >> + if (sock) { >> + tipc_register_callbacks(sock, con); >> + ret = 0; >> + } >> + >> + return ret; >> +} >> + >> +static struct outqueue_entry *alloc_entry(void *data, int len) >> +{ >> + struct outqueue_entry *entry; >> + void *buf; >> + >> + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); >> + if (!entry) >> + return NULL; >> + >> + buf = kmalloc(len, GFP_ATOMIC); >> + if (!buf) { >> + kfree(entry); >> + return NULL; >> + } >> + >> + memcpy(buf, data, len); >> + entry->iov.iov_base = buf; >> + entry->iov.iov_len = len; >> + >> + return entry; >> +} >> + >> +static void free_entry(struct outqueue_entry *e) >> +{ >> + kfree(e->iov.iov_base); >> + kfree(e); >> +} >> > > I wouldn't do this, since it is only used twice. That, and the name > makes it sound "generic" -- meaning when I 1st saw it used, I went to > grep the linux/include dir for it (and eventually returned here). > Similar comments for anything else that sounds generic and not tipc > specific -- even if it is static, it can appear in a backtrace. > > See mainline commits starting at 7f9ab6ac2e79b9658eba7c8e3ad8a4392d308057 > and going back from there, as a cleanup requested by DaveM. > > I will change its function name. > I'd wait until Jon has added his comments about the overall > architecture, locking design and data flow strategy before resending. > Those kinds of issues are even more important to get right than the > kinds of things I've pointed out here. But detailing what the initial > plan was (like I asked for at the very top) may in fact help Jon, so > making a 1st pass at creating that new text might be good to start now. > (I know it would help me to better understand the goal here.) > > Before I send next review request, I am also looking forward to listening Jon's suggestions. Regards, Ying > Paul. > -- > > >> + >> +/** >> + * clean_outqueues - discard all entries on the outqueues >> + */ >> +static void clean_outqueues(struct tipc_conn *con) >> +{ >> + struct outqueue_entry *e, *safe; >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_for_each_entry_safe(e, safe, &con->outqueue, list) { >> + list_del(&e->list); >> + free_entry(e); >> + } >> + spin_unlock_bh(&con->outqueue_lock); >> +} >> + >> +/** >> + * tipc_conn_sendmsg - send message to server with connection ID >> + */ >> +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, >> + struct sockaddr_tipc *addr, void *data, size_t len) >> +{ >> + struct outqueue_entry *e; >> + struct tipc_conn *con; >> + >> + con = tipc_conn_lookup(s, conid); >> + if (!con) >> + return -1; >> + >> + e = alloc_entry(data, len); >> + if (!e) { >> + conn_put(con); >> + return -ENOMEM; >> + } >> + >> + if (addr) >> + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_add_tail(&e->list, &con->outqueue); >> + spin_unlock_bh(&con->outqueue_lock); >> + >> + if (test_bit(CF_CONNECTED, &con->flags)) { >> + if (!queue_work(s->send_wq, &con->swork)) >> + conn_put(con); >> + } >> + >> + return 0; >> +} >> + >> +/** >> + * tipc_conn_terminate - terminate connection with server >> + */ >> +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid) >> +{ >> + struct tipc_conn *con; >> + >> + con = tipc_conn_lookup(s, conid); >> + if (con) { >> + tipc_close_conn(con); >> + conn_put(con); >> + } >> +} >> + >> +/** >> + * tipc_send_to_sock - try its best to send out all messages in its outqueue >> + */ >> +static void tipc_send_to_sock(struct tipc_conn *con) >> +{ >> + int count = 0; >> + struct tipc_server *s = con->server; >> + struct outqueue_entry *e; >> + struct msghdr msg; >> + int ret; >> + >> + spin_lock_bh(&con->outqueue_lock); >> + while (1) { >> + e = list_entry(con->outqueue.next, struct outqueue_entry, >> + list); >> + if ((struct list_head *) e == &con->outqueue) >> + break; >> + spin_unlock_bh(&con->outqueue_lock); >> + >> + memset(&msg, 0, sizeof(msg)); >> + msg.msg_flags = MSG_DONTWAIT; >> + >> + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { >> + msg.msg_name = &e->dest; >> + msg.msg_namelen = sizeof(struct sockaddr_tipc); >> + } >> + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, >> + e->iov.iov_len); >> + if (ret == -EWOULDBLOCK || ret == 0) { >> + cond_resched(); >> + goto out; >> + } else if (ret < 0) { >> + goto send_err; >> + } >> + >> + /* Don't starve people filling buffers */ >> + if (++count >= MAX_SEND_MSG_COUNT) { >> + cond_resched(); >> + count = 0; >> + } >> + >> + spin_lock_bh(&con->outqueue_lock); >> + list_del(&e->list); >> + free_entry(e); >> + } >> + spin_unlock_bh(&con->outqueue_lock); >> +out: >> + return; >> + >> +send_err: >> + tipc_close_conn(con); >> +} >> + >> +/** >> + * tipc_recv_work - receive workqueue function >> + */ >> +static void tipc_recv_work(struct work_struct *work) >> +{ >> + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); >> + int err; >> + >> + do { >> + err = con->rx_action(con); >> + } while (!err); >> + >> + conn_put(con); >> +} >> + >> +/** >> + * tipc_send_work - send workqueue function >> + */ >> +static void tipc_send_work(struct work_struct *work) >> +{ >> + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); >> + >> + tipc_send_to_sock(con); >> + conn_put(con); >> +} >> + >> +/** >> + * tipc_work_stop - destroy allocated workqueues >> + */ >> +static void tipc_work_stop(struct tipc_server *s) >> +{ >> + destroy_workqueue(s->rcv_wq); >> + destroy_workqueue(s->send_wq); >> +} >> + >> +/** >> + * tipc_work_start - allocate workqueues >> + */ >> +static int tipc_work_start(struct tipc_server *s) >> +{ >> + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); >> + if (!s->rcv_wq) { >> + pr_err("can't start tipc receive workqueue\n"); >> + return -ENOMEM; >> + } >> + >> + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); >> + if (!s->send_wq) { >> + pr_err("can't start tipc send workqueue\n"); >> + destroy_workqueue(s->rcv_wq); >> + return -ENOMEM; >> + } >> + >> + return 0; >> +} >> + >> +/** >> + * tipc_server_start - launch server >> + */ >> +int tipc_server_start(struct tipc_server *s) >> +{ >> + int ret; >> + int i; >> + >> + s->conid = 0; >> + rwlock_init(&s->conn_hlist_lock); >> + >> + for (i = 0; i < CONN_HASH_SIZE; i++) >> + INIT_HLIST_HEAD(&s->conn_hlist[i]); >> + >> + s->rcvbuf_cache = kmem_cache_create("tipc_rcvbuf", s->max_rcvbuf_size, >> + 0, SLAB_HWCACHE_ALIGN, NULL); >> + if (!s->rcvbuf_cache) >> + return -ENOMEM; >> + >> + ret = tipc_work_start(s); >> + if (ret < 0) { >> + kmem_cache_destroy(s->rcvbuf_cache); >> + return ret; >> + } >> + >> + ret = tipc_open_listening_sock(s); >> + return ret; >> +} >> + >> +/** >> + * tipc_server_stop - stop server and destroy all allocated resources >> + */ >> +void tipc_server_stop(struct tipc_server *s) >> +{ >> + struct hlist_node *h, *h1; >> + struct tipc_conn *con; >> + int i; >> + >> + for (i = 0; i < CONN_HASH_SIZE; i++) { >> + hlist_for_each_entry_safe(con, h, h1, &s->conn_hlist[i], list) >> + tipc_close_conn(con); >> + } >> + >> + tipc_work_stop(s); >> + kmem_cache_destroy(s->rcvbuf_cache); >> +} >> diff --git a/net/tipc/server.h b/net/tipc/server.h >> new file mode 100644 >> index 0000000..4b2c4cf >> --- /dev/null >> +++ b/net/tipc/server.h >> @@ -0,0 +1,103 @@ >> +/* >> + * net/tipc/server.h: Include file for TIPC server code >> + * >> + * Copyright (c) 2012, Wind River Systems >> + * All rights reserved. >> + * >> + * Redistribution and use in source and binary forms, with or without >> + * modification, are permitted provided that the following conditions are met: >> + * >> + * 1. Redistributions of source code must retain the above copyright >> + * notice, this list of conditions and the following disclaimer. >> + * 2. Redistributions in binary form must reproduce the above copyright >> + * notice, this list of conditions and the following disclaimer in the >> + * documentation and/or other materials provided with the distribution. >> + * 3. Neither the names of the copyright holders nor the names of its >> + * contributors may be used to endorse or promote products derived from >> + * this software without specific prior written permission. >> + * >> + * Alternatively, this software may be distributed under the terms of the >> + * GNU General Public License ("GPL") version 2 as published by the Free >> + * Software Foundation. >> + * >> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" >> + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE >> + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE >> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE >> + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR >> + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF >> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS >> + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN >> + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) >> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE >> + * POSSIBILITY OF SUCH DAMAGE. >> + */ >> + >> +#ifndef _TIPC_SERVER_H >> +#define _TIPC_SERVER_H >> + >> +#include "core.h" >> + >> +#define TIPC_SERVER_NAME_LEN 32 >> +#define CONN_HASH_SIZE 32 >> + >> +/** >> + * struct tipc_server - TIPC server structure >> + * @conid: connection identifier associated with server >> + * @conn_hlist: links to all client connections connecting to server >> + * @conn_hlist_lock: conn_hlist list lock >> + * @rcvbuf_cache: memory cache of server receive buffer >> + * @rcv_wq: receive workqueue >> + * @send_wq: send workqueue >> + * @saddr: TIPC server address >> + * @imp: message importance >> + * @type: socket type >> + * @max_rcvbuf_size: maximum permitted receive message length >> + * @name: server name >> + * @tipc_conn_new: callback will be called when new connection is incoming >> + * @tipc_conn_shutdown: callback will be called when connection is shut down >> + * @tipc_conn_recvmsg: callback will be called when message arrives >> + */ >> +struct tipc_server { >> + unsigned int conid; >> + struct hlist_head conn_hlist[CONN_HASH_SIZE]; >> + rwlock_t conn_hlist_lock; >> + struct kmem_cache *rcvbuf_cache; >> + struct workqueue_struct *rcv_wq; >> + struct workqueue_struct *send_wq; >> + struct sockaddr_tipc *saddr; >> + int imp; >> + int type; >> + int max_rcvbuf_size; >> + const char name[TIPC_SERVER_NAME_LEN]; >> + void *(*tipc_conn_new) (unsigned int conid); >> + void (*tipc_conn_shutdown) (unsigned int conid, void *usr_data); >> + void (*tipc_conn_recvmsg) (unsigned int conid, >> + struct sockaddr_tipc *addr, >> + void *usr_data, void *buf, size_t len); >> +}; >> + >> +/** >> + * tipc_conn_sendmsg - send message to connection >> + */ >> +int tipc_conn_sendmsg(struct tipc_server *s, unsigned int conid, >> + struct sockaddr_tipc *addr, void *data, size_t len); >> + >> +/** >> + * tipc_conn_terminate - terminate connection with server >> + * >> + * Note: Must call it in process context since it might sleep >> + */ >> +void tipc_conn_terminate(struct tipc_server *s, unsigned int conid); >> + >> +/** >> + * tipc_server_start - start server >> + */ >> +int tipc_server_start(struct tipc_server *s); >> + >> +/** >> + * tipc_server_stop - stop server >> + */ >> +void tipc_server_stop(struct tipc_server *s); >> + >> +#endif >> -- >> 1.7.1 >> >> > > |