From: Michael M. <Mic...@cs...> - 2011-06-22 22:00:08
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Hi, Here's an improved version of my previous patch. It now adds a distinct new threading mode to FUSE while preserving the old options. Under the new mode, kernel requests are received by a single thread. Requests are then dispatched to a thread selected from a pool of workers that are created at mount time. Requests for any particular inode are handled by a single worker (determined by a hash of the inode number) such that requests to each inode are processed in the original order and serially by a single elected worker thread. Non-inode related requests are handed to workers in a round-robin fashion. This has benefits over the current two options which may help some file system implementers: - - Requests arrive at the fs layer in a the original order. - - A simple fs layer can benefit from threading without using needing locks or synchronisation primitives by storing file state in fuse_file_info.fh, or thread local storage. - - A fixed number of threads are used, and are created once at mount time. The default threading modes are unchanged with both -s and -m mount options operating as before so there should be no impact on existing FUSE filesystems. The new threading mode is enabled by mounting with -m -osync_inode_read. Alternatively fuse_conn_info.async_inode_read can be cleared by the filesystem's init() method. The patch can be applied like this: $ tar -xzf fuse.2.8.5.tar.gz $ cd fuse-2.8.5 $ patch -p1 < ~/fuse-loop_it.patch To make some benchmarks, I took an early version of aifffffs which had more overhead on seek operations. I ran a test in each of the FUSE threading modes to show the effect of this patch. Since aifffffs is a transcoding filesystem, I created 10 flac files each of 51.2MB on a tmpfs as the basis for the tests. Swap and CPU frequency scaling were disabled on the test machine which is an 2GHz Intel(R) Core(TM)2 Duo CPU T7250 with 4GB RAM. The first batch of tests simply cat each file one at a time: $time find aiff/ -name "*.aiff" -print0 | xargs -0 -n 20 -P 1 cat > /dev/null Each test was re-ran 5 times and the wall clock times recorded: Serial Mount (-s) real 0m10.064s 0m10.116s 0m10.100s 0m10.103s 0m10.118s Fully Threaded Mount (-m) real 0m12.001s 0m12.060s 0m11.645s 0m12.025s 0m11.497s Inode Serialised Mount (-m -osync_inode_read) real 0m10.044s 0m10.161s 0m10.173s 0m10.117s 0m10.130s The next batch of tests cats the files in parallel: $time find aiff/ -name "*.aiff" -print0 | xargs -0 -n 1 -P 20 cat > /dev/null Again each test was re-ran 5 times and wall clock times recorded: Serial Mount (-s) real 0m10.654s 0m10.669s 0m10.650s 0m10.647s 0m10.653s Fully Threaded Mount (-m) real 0m9.779s 0m10.624s 0m7.826s 0m8.267s 0m10.077s Inode Serialised Mount (-m -osync_inode_read) real 0m5.720s 0m5.730s 0m5.792s 0m5.800s 0m5.760s - From this we can see: - The '-s' and '-m -osync_inode_read' mounts are almost equal in speed when making serial access. - The fully threaded mount is slowest when making serial access (as it incurs seek overhead). - '-m -osync_inode_read' is fastest when making parallel access (threaded with no seek overhead). - '-m' has the widest distribution of results in parallel mode, presumably due to non-deterministic thread scheduling incurring varying amounts of seek overhead per run under this heavily loaded scenario. Changes since the original version of the patch are mainly that fuse_session_loop_mt() has been restored and the modified functionality placed in fuse_loop_it.c:fuse_session_loop_it() where it = inode threaded. fuse_conn_info now has fields to allow inode threaded mode to be selected at mount time. Since others have expressed interest in the earlier version of this patch and the threading model it offers, I would be pleased if the following patch can be considered for inclusion into FUSE. Regards, Mike diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/include/fuse_common.h fuse-2.8.5/include/fuse_common.h - --- fuse-2.8.5.orig/include/fuse_common.h 2011-05-16 22:52:04.691712560 +0100 +++ fuse-2.8.5/include/fuse_common.h 2011-06-22 20:28:11.054756649 +0100 @@ -22,7 +22,7 @@ #define FUSE_MAJOR_VERSION 2 /** Minor version of FUSE library interface */ - -#define FUSE_MINOR_VERSION 8 +#define FUSE_MINOR_VERSION 9 #define FUSE_MAKE_VERSION(maj, min) ((maj) * 10 + (min)) #define FUSE_VERSION FUSE_MAKE_VERSION(FUSE_MAJOR_VERSION, FUSE_MINOR_VERSION) @@ -156,9 +156,16 @@ struct fuse_conn_info { unsigned want; /** + * Should access be synchronous per inode (read-write) + * Note this only takes effect if \a async_read is also set. + * Default is set. Introduced in version 2.9. + */ + unsigned async_inode_read; + + /** * For future use. */ - - unsigned reserved[25]; + unsigned reserved[24]; }; struct fuse_session; diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/include/fuse_lowlevel.h fuse-2.8.5/include/fuse_lowlevel.h - --- fuse-2.8.5.orig/include/fuse_lowlevel.h 2011-05-16 22:52:04.691712560 +0100 +++ fuse-2.8.5/include/fuse_lowlevel.h 2011-06-22 19:34:10.350756950 +0100 @@ -1429,6 +1429,15 @@ int fuse_session_loop(struct fuse_sessio */ int fuse_session_loop_mt(struct fuse_session *se); +/** + * Enter a multi-threaded event loop with serial access per inode. + * + * @param se the session + * @param threads number of worker threads to create + * @return 0 on success, -1 on error + */ +int fuse_session_loop_it(struct fuse_session *se, int threads); + /* ----------------------------------------------------------- * * Channel interface * * ----------------------------------------------------------- */ diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/bbuf.c fuse-2.8.5/lib/bbuf.c - --- fuse-2.8.5.orig/lib/bbuf.c 1970-01-01 01:00:00.000000000 +0100 +++ fuse-2.8.5/lib/bbuf.c 2011-06-20 23:27:09.028461438 +0100 @@ -0,0 +1,102 @@ +/* + bbuf: Generalised Bounded Buffer. + Copyright (C) 2011 Michael McTernan <Mic...@cs...> + + This program can be distributed under the terms of the GNU LGPLv2. + See the file COPYING.LIB. +*/ + +#include "bbuf.h" + +#include <stdlib.h> + +static int next_head(struct bbuf *b) +{ + return (b->head + 1) % b->depth; +} + +static int next_tail(struct bbuf *b) +{ + return (b->tail + 1) % b->depth; +} + +struct bbuf *bbuf_init(int depth) +{ + struct bbuf *b = malloc(sizeof(struct bbuf) + sizeof(void *) * depth); + + if (b) { + pthread_mutex_init(&b->lock, NULL); + pthread_cond_init(&b->notfull, NULL); + pthread_cond_init(&b->notempty, NULL); + b->head = b->tail = 0; + b->depth = depth; + } + + return b; +} + +void bbuf_put(struct bbuf *b, void *data) +{ + int nh; + + pthread_mutex_lock(&b->lock); + + while ((nh = next_head(b)) == b->tail) { + pthread_cond_wait(&b->notfull, &b->lock); + } + + b->data[b->head] = data; + b->head = nh; + + pthread_cond_signal(&b->notempty); + pthread_mutex_unlock(&b->lock); +} + +void *bbuf_get(struct bbuf *b) +{ + void *data; + + pthread_mutex_lock(&b->lock); + + while (b->tail == b->head) { + pthread_cond_wait(&b->notempty, &b->lock); + } + + data = b->data[b->tail]; + b->tail = next_tail(b); + + pthread_cond_signal(&b->notfull); + pthread_mutex_unlock(&b->lock); + + return data; +} + +int bbuf_isempty(struct bbuf *b) +{ + int r; + + pthread_mutex_lock(&b->lock); + r = b->head == b->tail; + pthread_mutex_unlock(&b->lock); + + return r; +} + +int bbuf_isfull(struct bbuf *b) +{ + int r; + + pthread_mutex_lock(&b->lock); + r = next_head(b) == b->tail; + pthread_mutex_unlock(&b->lock); + + return r; +} + +void bbuf_destroy(struct bbuf *b) +{ + pthread_mutex_destroy(&b->lock); + pthread_cond_destroy(&b->notfull); + pthread_cond_destroy(&b->notempty); + free(b); +} diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/bbuf.h fuse-2.8.5/lib/bbuf.h - --- fuse-2.8.5.orig/lib/bbuf.h 1970-01-01 01:00:00.000000000 +0100 +++ fuse-2.8.5/lib/bbuf.h 2011-06-20 21:14:44.670710521 +0100 @@ -0,0 +1,25 @@ +/* + bbuf: Generalised Bounded Buffer. + Copyright (C) 2011 Michael McTernan <Mic...@cs...> + + This program can be distributed under the terms of the GNU LGPLv2. + See the file COPYING.LIB. +*/ + +#include "config.h" +#include <pthread.h> + +/* structure for the buffer; should only be accessed by bbuf_ functions */ +struct bbuf { + pthread_mutex_t lock; + pthread_cond_t notfull, notempty; + int depth, head, tail; + void *data[]; +}; + +struct bbuf *bbuf_init(int depth); +void bbuf_put(struct bbuf *b, void *data); +void *bbuf_get(struct bbuf *b); +void bbuf_destroy(struct bbuf *b); +int bbuf_isempty(struct bbuf *b); +int bbuf_isfull(struct bbuf *b); diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/fuse_loop_it.c fuse-2.8.5/lib/fuse_loop_it.c - --- fuse-2.8.5.orig/lib/fuse_loop_it.c 1970-01-01 01:00:00.000000000 +0100 +++ fuse-2.8.5/lib/fuse_loop_it.c 2011-06-22 20:15:56.610762899 +0100 @@ -0,0 +1,205 @@ +/* + FUSE: Filesystem in Userspace + Copyright (C) 2001-2007 Miklos Szeredi <mi...@sz...> + Copyright (C) 2011-2011 Michael McTernan <Mic...@cs...> + + This program can be distributed under the terms of the GNU LGPLv2. + See the file COPYING.LIB. +*/ + +#include "fuse_lowlevel.h" +#include "fuse_misc.h" +#include "fuse_kernel.h" +#include "bbuf.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <signal.h> +#include <semaphore.h> +#include <errno.h> +#include <sys/time.h> + +/* Environment var controlling the thread stack size */ +#define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK" + +#define MAX_QUEUE_DEPTH 16 + +struct fuse_workitem +{ + size_t len; + char data[]; +}; + +struct fuse_worker { + pthread_t thread_id; + struct fuse_mt *mt; + struct bbuf *bbuf; +}; + +struct fuse_mt { + struct fuse_session *se; + struct fuse_chan *prevch; + struct bbuf *membuf; +}; + + +static void *fuse_do_work(void *data) +{ + struct fuse_worker *w = (struct fuse_worker *) data; + struct fuse_mt *mt = w->mt; + struct bbuf *bbuf = w->bbuf; + struct fuse_workitem *wi; + + while ((wi = bbuf_get(bbuf)) != NULL) { + fuse_session_process(mt->se, wi->data, wi->len, mt->prevch); + bbuf_put(mt->membuf, wi); + } + + return NULL; +} + +static int fuse_start_thread(struct fuse_worker *w, struct fuse_mt *mt) +{ + sigset_t oldset; + sigset_t newset; + int res; + pthread_attr_t attr; + char *stack_size; + + memset(w, 0, sizeof(struct fuse_worker)); + w->mt = mt; + + /* Create bounded-buffer for task handoff */ + w->bbuf = bbuf_init(MAX_QUEUE_DEPTH); + if (!w->bbuf) { + fprintf(stderr, "fuse: error creating bounded buffer\n"); + return -1; + } + + /* Override default stack size */ + pthread_attr_init(&attr); + stack_size = getenv(ENVNAME_THREAD_STACK); + if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size))) + fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size); + + /* Disallow signal reception in worker threads */ + sigemptyset(&newset); + sigaddset(&newset, SIGTERM); + sigaddset(&newset, SIGINT); + sigaddset(&newset, SIGHUP); + sigaddset(&newset, SIGQUIT); + pthread_sigmask(SIG_BLOCK, &newset, &oldset); + res = pthread_create(&w->thread_id, &attr, fuse_do_work, w); + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + pthread_attr_destroy(&attr); + if (res != 0) { + fprintf(stderr, "fuse: error creating thread: %s\n", + strerror(res)); + return -1; + } + + return 0; +} + +int fuse_session_loop_it(struct fuse_session *se, const int threads) +{ + size_t bufsize; + int err, w; + struct fuse_mt mt; + struct fuse_worker wt[threads]; + + memset(&mt, 0, sizeof(struct fuse_mt)); + mt.se = se; + mt.prevch = fuse_session_next_chan(se, NULL); + + bufsize = fuse_chan_bufsize(mt.prevch); + + /* Create bounded-buffer to pass back empty work items */ + mt.membuf = bbuf_init(threads * MAX_QUEUE_DEPTH); + if(!mt.membuf) { + err = -ENOMEM; + goto out_membuf; + } + + /* Pre-allocate work item structures */ + while (!bbuf_isfull(mt.membuf)) { + struct fuse_workitem *wi = malloc(sizeof(struct fuse_workitem) + bufsize); + if(!wi) { + err = -ENOMEM; + goto out_membufwi; + } + bbuf_put(mt.membuf, wi); + } + + /* Create worker threads */ + for (w = 0; w < threads; w++) { + err = fuse_start_thread(&wt[w], &mt); + if (err) + goto out_workers; + } + + w = 0; + + /* Main processing loop */ + while (!fuse_session_exited(mt.se)) { + struct fuse_workitem *wi = bbuf_get(mt.membuf); + struct fuse_in_header *in; + void *inarg; + int res; + + res = wi->len = fuse_chan_recv(&mt.prevch, wi->data, bufsize); + + if (res <= 0) { + if (res == -EINTR) + continue; + if (res < 0) { + fuse_session_exit(mt.se); + } + break; + } + + /* Check if the request needs serialising */ + in = (struct fuse_in_header *)wi->data; + inarg = wi->data + sizeof(struct fuse_in_header); + switch (in->opcode) { + case FUSE_READ: + case FUSE_WRITE: + case FUSE_FLUSH: + case FUSE_RELEASE: + case FUSE_FSYNC: + /* Serialised */ + bbuf_put(wt[in->nodeid % threads].bbuf, wi); + break; + + /* Could serialse readdir/releasedir/fsyncdir too */ + + default: + /* Round-robin */ + bbuf_put(wt[w].bbuf, wi); + w = (w + 1) % threads; + break; + } + } + +out_workers: + /* Tidy up workers by sending NULL to cause the thread to exit once done */ + for (w = 0; w < threads; w++) { + bbuf_put(wt[w].bbuf, NULL); + pthread_join(wt[w].thread_id, NULL); + bbuf_destroy(wt[w].bbuf); + } + +out_membufwi: + /* Free the memory buffers and bounded buffer */ + while (!bbuf_isempty(mt.membuf)) + free(bbuf_get(mt.membuf)); + +out_membuf: + bbuf_destroy(mt.membuf); + + fuse_session_reset(se); + return err; +} + diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/fuse_lowlevel.c fuse-2.8.5/lib/fuse_lowlevel.c - --- fuse-2.8.5.orig/lib/fuse_lowlevel.c 2011-05-16 22:52:04.911712222 +0100 +++ fuse-2.8.5/lib/fuse_lowlevel.c 2011-06-22 18:38:40.854756559 +0100 @@ -1531,6 +1531,8 @@ static struct fuse_opt fuse_ll_opts[] = { "max_readahead=%u", offsetof(struct fuse_ll, conn.max_readahead), 0 }, { "async_read", offsetof(struct fuse_ll, conn.async_read), 1 }, { "sync_read", offsetof(struct fuse_ll, conn.async_read), 0 }, + { "async_inode_read", offsetof(struct fuse_ll, conn.async_inode_read), 1 }, + { "sync_inode_read", offsetof(struct fuse_ll, conn.async_inode_read), 0 }, { "atomic_o_trunc", offsetof(struct fuse_ll, atomic_o_trunc), 1}, { "no_remote_lock", offsetof(struct fuse_ll, no_remote_lock), 1}, { "big_writes", offsetof(struct fuse_ll, big_writes), 1}, @@ -1555,6 +1557,8 @@ static void fuse_ll_help(void) " -o max_readahead=N set maximum readahead\n" " -o async_read perform reads asynchronously (default)\n" " -o sync_read perform reads synchronously\n" +" -o async_inode_read perform reads asynchronously per inode (default)\n" +" -o sync_inode_read perform reads synchronously per inode\n" " -o atomic_o_trunc enable atomic open+truncate support\n" " -o big_writes enable larger than 4kB writes\n" " -o no_remote_lock disable remote file locking\n"); @@ -1628,6 +1632,7 @@ struct fuse_session *fuse_lowlevel_new_c } f->conn.async_read = 1; + f->conn.async_inode_read = 1; f->conn.max_write = UINT_MAX; f->conn.max_readahead = UINT_MAX; f->atomic_o_trunc = 0; diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/fuse_mt.c fuse-2.8.5/lib/fuse_mt.c - --- fuse-2.8.5.orig/lib/fuse_mt.c 2011-05-16 22:52:04.909734252 +0100 +++ fuse-2.8.5/lib/fuse_mt.c 2011-06-22 20:20:13.489006159 +0100 @@ -12,10 +12,13 @@ #include <stdio.h> #include <stdlib.h> +#include <unistd.h> #include <string.h> #include <pthread.h> #include <assert.h> +#define DEFAULT_ASYNC_INODE_THREADS 13 + struct procdata { struct fuse *f; struct fuse_chan *prevch; @@ -107,10 +110,34 @@ int fuse_loop_mt_proc(struct fuse *f, fu int fuse_loop_mt(struct fuse *f) { + struct fuse_session *se; + struct fuse_ll *ll; + if (f == NULL) return -1; - - return fuse_session_loop_mt(fuse_get_session(f)); + se = fuse_get_session(f); + ll = (struct fuse_ll *)fuse_session_data(se); + + if (ll->conn.async_inode_read) { + if (ll->debug) + fprintf(stderr, "asynchronous\n"); + + return fuse_session_loop_mt(se); + } else { + int threads = DEFAULT_ASYNC_INODE_THREADS; + +#ifdef linux + /* Create 4x the number of online processor core threads */ + threads = sysconf(_SC_NPROCESSORS_CONF) * 4; + if (threads == -1) + threads = DEFAULT_ASYNC_INODE_THREADS; +#endif + if (ll->debug) + fprintf(stderr, "synchronous per inode (%d threads)\n", threads); + + return fuse_session_loop_it(se, threads); + } } FUSE_SYMVER(".symver fuse_loop_mt_proc,__fuse_loop_mt@"); diff -rupN --exclude=.svn --exclude='*.in' --exclude=autom4te.cache --exclude=configure fuse-2.8.5.orig/lib/Makefile.am fuse-2.8.5/lib/Makefile.am - --- fuse-2.8.5.orig/lib/Makefile.am 2011-05-16 22:52:04.913711145 +0100 +++ fuse-2.8.5/lib/Makefile.am 2011-06-22 19:28:45.335757197 +0100 @@ -23,6 +23,7 @@ libfuse_la_SOURCES = \ fuse_kern_chan.c \ fuse_loop.c \ fuse_loop_mt.c \ + fuse_loop_it.c \ fuse_lowlevel.c \ fuse_misc.h \ fuse_mt.c \ @@ -31,6 +32,8 @@ libfuse_la_SOURCES = \ fuse_signals.c \ cuse_lowlevel.c \ helper.c \ + bbuf.c \ + bbuf.h \ modules/subdir.c \ $(iconv_source) \ $(mount_source) -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.11 (GNU/Linux) Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org/ iQIcBAEBAgAGBQJOAmXcAAoJEE97MuPr96xS2j8P/1Ja+OZZyctcIHGEPwr2d8gX 5vH0hd8EmjTx20zcjye0RF1g/qwrXcnjEWrdnjeii9KkW2S9819ni0d/w2kyhkrd oUdVdp428HQsmLw8DrBDwPGWFpUiMy0bFrP0rtC2BPYT9zoZCnvovgVLof2QdsZ2 OEf5yo2F4dIxSZ7Bah72b0QAtfBitiSDRuj9tydQIWv/EYjs1ylc7Sc3Vff1IdPB VMnDe3RypiBsUQe1Lt/08r7mlkuljaeu24kCvjw14gD4v+Kejo8NVT7FsjshhaYN ucLxqU5PbaAYVPBSCuj3F3FHCTJuH6KhwwtsSA2MWzsoGqVdp1n9B73jCkGoWPK3 aJX/SGt/Ho2+790v/MbomTXcDDyX7nb0GHYu8DYV9BrJjSKiiMoLHblPE3ZyncAh CYCfZpCPrbUlFtRLJV21F0oLfdcWNM8BuXvnApOkhtVy7Yuq2+KThDcDGI9SRDjv DhGd/5dqJ0mG3+Kw2QGUQUlK1IRXG/lENJAlFyhOELWkFL65fJYNX2Yheg5pvlKk 7SqRScZTXGSzhPd/LNuVogwPLJ5NK7bvQTYmVr2DwTq6ZgGbTorsYF5L3aT4I+bz qXMFOVmjeK1c/kZzGB7Pc+odau7nsqV54PzSnZYkWuOvn5Fto7KW74rEu5sCeusb C7WUCI8LT041CVhd3dQ7 =Rzu5 -----END PGP SIGNATURE----- |
From: Mike V. <mvitalo@Crossroads.com> - 2011-06-23 15:47:19
|
Hi Michael, > -----Original Message----- > From: Michael McTernan [mailto:Mic...@cs...] > Sent: Wednesday, June 22, 2011 5:00 PM > To: fus...@li... > Subject: [fuse-devel] [Patch] sync_inode_read thread mode > > -----BEGIN PGP SIGNED MESSAGE----- Thanks for posting your updated patch! > Since others have expressed interest in the earlier version of this patch > and the threading model it offers, I would be pleased if the following > patch can be considered for inclusion into FUSE. The changes you have made are really helpful to us, so I reckon we would be happy also if your patch were to be considered for inclusion into FUSE. Thanks again and take care, Mike v |
From: Lucas C. V. R. <lu...@go...> - 2011-06-23 20:02:35
|
On Wed, Jun 22, 2011 at 6:59 PM, Michael McTernan <Mic...@cs...> wrote: > > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > Hi, > > Here's an improved version of my previous patch. It now adds a distinct new threading mode to FUSE while preserving the old options. > Hi, Michael! Thanks for this updated patch, it's looking much less intrusive now. The numbers you've posted are quite impressive, too. I'm sure many will find this useful. I took some minutes to review the patch and I've some comments. Please find them below. > +void bbuf_destroy(struct bbuf *b) > +{ > + pthread_mutex_destroy(&b->lock); > + pthread_cond_destroy(&b->notfull); > + pthread_cond_destroy(&b->notempty); > + free(b); > +} It's good idea to check if "b" is not NULL before accessing its members. The first "goto" in label fuse_session_loop_it (when bbuf_init() fails) causes bbuf_destroy() to be called with a NULL argument. > +static int fuse_start_thread(struct fuse_worker *w, struct fuse_mt *mt) > +{ > + sigset_t oldset; > + sigset_t newset; > + int res; > + pthread_attr_t attr; > + char *stack_size; > + > + memset(w, 0, sizeof(struct fuse_worker)); > + w->mt = mt; > + > + /* Create bounded-buffer for task handoff */ > + w->bbuf = bbuf_init(MAX_QUEUE_DEPTH); > + if (!w->bbuf) { > + fprintf(stderr, "fuse: error creating bounded buffer\n"); > + return -1; > + } > + > + /* Override default stack size */ > + pthread_attr_init(&attr); > + stack_size = getenv(ENVNAME_THREAD_STACK); > + if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size))) > + fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size); > + > + /* Disallow signal reception in worker threads */ > + sigemptyset(&newset); > + sigaddset(&newset, SIGTERM); > + sigaddset(&newset, SIGINT); > + sigaddset(&newset, SIGHUP); > + sigaddset(&newset, SIGQUIT); > + pthread_sigmask(SIG_BLOCK, &newset, &oldset); > + res = pthread_create(&w->thread_id, &attr, fuse_do_work, w); > + pthread_sigmask(SIG_SETMASK, &oldset, NULL); > + pthread_attr_destroy(&attr); > + if (res != 0) { > + fprintf(stderr, "fuse: error creating thread: %s\n", > + strerror(res)); > + return -1; I believe you want to call bbuf_destroy(w->bbuf) before returning -1, otherwise that will leak. > +int fuse_session_loop_it(struct fuse_session *se, const int threads) > +{ > + size_t bufsize; > + int err, w; > + struct fuse_mt mt; > + struct fuse_worker wt[threads]; > + > + memset(&mt, 0, sizeof(struct fuse_mt)); > + mt.se = se; > + mt.prevch = fuse_session_next_chan(se, NULL); > + > + bufsize = fuse_chan_bufsize(mt.prevch); > + > + /* Create bounded-buffer to pass back empty work items */ > + mt.membuf = bbuf_init(threads * MAX_QUEUE_DEPTH); > + if(!mt.membuf) { > + err = -ENOMEM; > + goto out_membuf; This goto will cause bbuf_destroy() to be called with a NULL argument, which will crash. You can either return -ENOMEM straight from this point or make the adjustment to bbuf_destroy() that I mentioned before. > + } > + > + /* Pre-allocate work item structures */ > + while (!bbuf_isfull(mt.membuf)) { > + struct fuse_workitem *wi = malloc(sizeof(struct fuse_workitem) + bufsize); > + if(!wi) { > + err = -ENOMEM; > + goto out_membufwi; > + } > + bbuf_put(mt.membuf, wi); > + } > + > + /* Create worker threads */ > + for (w = 0; w < threads; w++) { > + err = fuse_start_thread(&wt[w], &mt); > + if (err) > + goto out_workers; The cleanup loop in out_workers assumes that "threads" workers have successfully started. One easy fix is to create an auxiliar variable such as "int total_workers" and use that instead of "w" for this loop. Then, in out_workers, iterate from w=0 to 'total_workers' rather than from w=0 to 'threads'. > +out_workers: > + /* Tidy up workers by sending NULL to cause the thread to exit once done */ > + for (w = 0; w < threads; w++) { This is where 'threads' could be replaced by 'total_workers'. > + bbuf_put(wt[w].bbuf, NULL); > + pthread_join(wt[w].thread_id, NULL); > + bbuf_destroy(wt[w].bbuf); > + } > + > +out_membufwi: > + /* Free the memory buffers and bounded buffer */ > + while (!bbuf_isempty(mt.membuf)) > + free(bbuf_get(mt.membuf)); > + > +out_membuf: > + bbuf_destroy(mt.membuf); And here mt.membuf should be verified before calling bbuf_destroy() on it, or alternatively make that verification in the implementation of bbuf_destroy(). Thanks! -- Lucas "If you're looking for a reason I've a reason to give: pleasure, little treasure" |
From: Michael M. <Mic...@cs...> - 2011-06-23 20:49:08
|
On 23/06/11 21:02, Lucas C. Villa Real wrote: > Thanks for this updated patch, it's looking much less intrusive now. > The numbers you've posted are quite impressive, too. I'm sure many > will find this useful. Thanks Lucas! The numbers do of course depend on the cost of a seek - other file-systems will vary. You can see my early motivation though :) > I took some minutes to review the patch and I've some comments. Please > find them below. This is excellent feedback - thank you very much. I'll wait a while for any other comments, then fix these errors and audit the other error paths before re-issuing the patch at some point. I'm still keen for other feedback too, particularly on whether there is any chance of this being included in FUSE, or if the form/concept needs further change. Kind Regards, Mike |
From: Stef B. <st...@gm...> - 2011-06-24 19:12:53
|
Hi, some errors Ive corrected, and added a new xattr system.workspace_cache_dirs to get/set the caching of dirs. The address is: git clone git://gitorious.org/basefs/basefs.git basefs cd basefs I'll work on adding the special reading of aiff files, and maybe others. Stef |
From: Michael M. <Mic...@cs...> - 2011-06-24 20:13:36
|
On 24/06/11 20:12, Stef Bon wrote: > Hi, > > some errors Ive corrected, and added a new xattr > system.workspace_cache_dirs to get/set > the caching of dirs. > > The address is: > > git clone git://gitorious.org/basefs/basefs.git basefs > cd basefs > > I'll work on adding the special reading of aiff files, and maybe others. This doesn't look like a review of my patch or related. Could I request you put new thoughts in a new thread please. Maybe you could start a new thread with an explanation or something about what you've created/contributed here. Kind Regards, Mike |
From: Stef B. <st...@gm...> - 2011-06-25 08:50:20
|
Hi, Why do you ask? Well it's related. Not exactly a review to your patch, but still related to the discussion on the maillingslist on the long "debate" on how to serialize requests. Did you read the comments?? I've put much effort in describing why and what. Stef Bon 2011/6/24 Michael McTernan <Mic...@cs...>: > On 24/06/11 20:12, Stef Bon wrote: >> Hi, >> >> some errors Ive corrected, and added a new xattr >> system.workspace_cache_dirs to get/set >> the caching of dirs. >> >> The address is: >> >> git clone git://gitorious.org/basefs/basefs.git basefs >> cd basefs >> >> I'll work on adding the special reading of aiff files, and maybe others. > > This doesn't look like a review of my patch or related. > > Could I request you put new thoughts in a new thread please. > > Maybe you could start a new thread with an explanation or something > about what you've created/contributed here. > > Kind Regards, > > Mike > |