[Getdata-commits] SF.net SVN: getdata:[684] trunk/defile
Scientific Database Format
Brought to you by:
ketiltrout
|
From: <ket...@us...> - 2012-03-22 23:01:58
|
Revision: 684
http://getdata.svn.sourceforge.net/getdata/?rev=684&view=rev
Author: ketiltrout
Date: 2012-03-22 23:01:51 +0000 (Thu, 22 Mar 2012)
Log Message:
-----------
Append mode. Also, be threadsafe.
Modified Paths:
--------------
trunk/defile/bin/cli.c
trunk/defile/input/Makefile.am
trunk/defile/input/ascii.c
trunk/defile/input/dirfile.c
trunk/defile/lib/defile.h
trunk/defile/lib/internal.h
trunk/defile/lib/libdefile.c
Modified: trunk/defile/bin/cli.c
===================================================================
--- trunk/defile/bin/cli.c 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/bin/cli.c 2012-03-22 23:01:51 UTC (rev 684)
@@ -25,6 +25,7 @@
#include "internal.h"
+#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
@@ -87,20 +88,23 @@
#define CLRSTRING(x,y) SETSTATE(x,y,free(df-> y),df ->y = NULL,"%s")
#define SETFLOAT(x,y) SETSIMPLE(x,y,atof(config->argument),,"%g")
#define SETLFLOAT(x,y,b) SETSIMPLE(x,y,b,,"%g" )
-#define SETINT(x,y) SETSIMPLE(x,y,strol(config->argument, NULL, 0),,"%i")
+#define SETINT(x,y) SETSIMPLE(x,y,strtol(config->argument, NULL, 0),,"%i")
#define SETLINT(x,y,b) SETSIMPLE(x,y,b,,"%i" )
#define SETULONG(x,y) SETSIMPLE(x,y,strtoul(config->argument, NULL, 0),,\
"%lu")
+#define SETULLONG(x,y) SETSIMPLE(x,y,strtoull(config->argument, NULL, 0),,\
+ "%llu")
-#define ABORT_DELAY 60000000 /* sixty seconds; this is less than the maximum TCP
- * round-trip timeout. Do we care? */
+#define ABORT_DELAY 10000000 /* ten seconds */
/* the very defile object */
static struct df_defile the_defile;
struct df_defile *const df = &the_defile;
-/* state mutex */
+/* mutices */
pthread_mutex_t state_mx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t config_mx = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t rate_mx = PTHREAD_MUTEX_INITIALIZER;
static void DF_SignalHandler(int sig)
{
@@ -264,7 +268,7 @@
result = 4;
lt_dlclose(lib);
}
- if (df->config.mode_flags & DF_MODE_DEBUG) {
+ if (df->mode_flags & DF_MODE_DEBUG) {
fprintf(stderr, "defile: probing %s; result: %s\n", filename,
(result) ? "fail" : "pass");
}
@@ -333,7 +337,7 @@
/* run through the search path looking for candidate libraries */
if (dlp) {
- if (df->config.mode_flags & DF_MODE_DEBUG)
+ if (df->mode_flags & DF_MODE_DEBUG)
fprintf(stderr, "defile: searching local paths: %s\n", dlp);
lt_dlforeachfile(dlp, DF_LTDLSearchFunc, l);
}
@@ -341,7 +345,7 @@
/* the default plugin dir if the above didn't work; or if we're compiling a
* list */
if (type == NULL || l->r != 0) {
- if (df->config.mode_flags & DF_MODE_DEBUG)
+ if (df->mode_flags & DF_MODE_DEBUG)
fprintf(stderr, "defile: searching default path: %s\n", DEFILE_MODULEDIR);
lt_dlforeachfile(DEFILE_MODULEDIR, DF_LTDLSearchFunc, l);
}
@@ -503,11 +507,13 @@
}
enum {
- DF_WOPT_ADD = 1, DF_WOPT_ASYNC, DF_WOPT_AUTOTYPE, DF_WOPT_BHEAD, DF_WOPT_BLEN,
- DF_WOPT_CLOBBER, DF_WOPT_DEBUG, DF_WOPT_DEL, DF_WOPT_DIR, DF_WOPT_GZIP,
- DF_WOPT_INPUT, DF_WOPT_LIBDIR, DF_WOPT_LIST, DF_WOPT_NOCLOBBER,
- DF_WOPT_NODEBUG, DF_WOPT_NOFOLLOW, DF_WOPT_OFLCLR, DF_WOPT_OUTPUT,
- DF_WOPT_FOLLOW, DF_WOPT_QUIET, DF_WOPT_SYNC, DF_WOPT_TYPE, DF_WOPT_VERBOSE
+ DF_WOPT_ADD = 1, DF_WOPT_APPEND, DF_WOPT_ASYNC, DF_WOPT_AUTOTYPE,
+ DF_WOPT_BHEAD, DF_WOPT_BLEN, DF_WOPT_CLOBBER, DF_WOPT_CUTDIR, DF_WOPT_DEBUG,
+ DF_WOPT_DEL, DF_WOPT_DIR, DF_WOPT_FOLLOW, DF_WOPT_GZIP, DF_WOPT_INPUT,
+ DF_WOPT_LIBDIR, DF_WOPT_LIST, DF_WOPT_NOAPPEND, DF_WOPT_NOCLOBBER,
+ DF_WOPT_NOCUTDIR, DF_WOPT_NODEBUG, DF_WOPT_NOFOLLOW, DF_WOPT_OFLCLR,
+ DF_WOPT_OUTPUT, DF_WOPT_QUIET, DF_WOPT_SKIP, DF_WOPT_SYNC, DF_WOPT_TYPE,
+ DF_WOPT_VERBOSE
};
static int DF_OutputOpt(struct df_defile *df, struct df_rc_config *config,
@@ -572,27 +578,35 @@
return 0;
+ SETBIT(APPEND, mode_flags, DF_MODE_APPEND);
+ CLRBIT(NOAPPEND, mode_flags, DF_MODE_APPEND);
+
SETULONG(BHEAD, bufhead);
- SETULONG(BLEN, config.buflen);
+ SETULONG(BLEN, buflen);
SETLINT(CLOBBER, clobber, 1);
SETLINT(NOCLOBBER, clobber, 0);
+ SETINT(CUTDIR, cutdir);
+ SETLINT(NOCUTDIR, cutdir, 0);
+
SETSTRING(DIR, dir);
SETSTRING(OUTPUT, output);
- SETBIT(FOLLOW, config.mode_flags, DF_MODE_FOLLOW);
- CLRBIT(NOFOLLOW, config.mode_flags, DF_MODE_FOLLOW);
+ SETBIT(FOLLOW, mode_flags, DF_MODE_FOLLOW);
+ CLRBIT(NOFOLLOW, mode_flags, DF_MODE_FOLLOW);
SETLINT(QUIET, quiet, 1);
SETLINT(VERBOSE, quiet, 0);
- SETBIT(DEBUG, config.mode_flags, DF_MODE_DEBUG);
- CLRBIT(NODEBUG, config.mode_flags, DF_MODE_DEBUG);
+ SETBIT(DEBUG, mode_flags, DF_MODE_DEBUG);
+ CLRBIT(NODEBUG, mode_flags, DF_MODE_DEBUG);
- SETBIT(ASYNC, config.mode_flags, DF_MODE_ASYNC);
- CLRBIT(SYNC, config.mode_flags, DF_MODE_ASYNC);
+ SETULLONG(SKIP, offset);
+ SETBIT(ASYNC, mode_flags, DF_MODE_ASYNC);
+ CLRBIT(SYNC, mode_flags, DF_MODE_ASYNC);
+
SETSTRING(TYPE, type);
CLRSTRING(AUTOTYPE, type);
}
@@ -612,15 +626,27 @@
} else
ptr = strdup(output);
} else if (output == NULL) {
- char *ptr2, *ptr3;
- ptr2 = strdup(input);
+ int cutleft = df->cutdir;
+ char *ptr_base, *ptr2, *ptr3;
+ ptr_base = strdup(input);
+
+ /* cut directories */
+ if (cutleft == 0)
+ ptr2 = ptr_base;
+ else {
+ cutleft = df->cutdir;
+ for (ptr2 = ptr_base + strlen(ptr_base); ptr2 > ptr_base; --ptr2)
+ if (*(ptr2 - 1) == '/')
+ if (--cutleft == 0)
+ break;
+ }
/* remove forbidden characters */
for (ptr3 = ptr2; *ptr3; ++ptr3)
if (*ptr3 == '/')
*ptr3 = '_';
ptr = malloc(strlen(dir) + strlen(ptr2) + 2);
sprintf(ptr, "%s/%s", dir, ptr2);
- free(ptr2);
+ free(ptr_base);
} else if (output[0] == '/') {
ptr = strdup(output);
} else {
@@ -658,29 +684,61 @@
return 0;
}
-static int DF_CreateOutput(unsigned long flags)
+static int DF_CreateOutput(int clobber, int append)
{
- int gderr;
+ int new = 1;
- df->flags = flags;
+ /* check wether it already exists */
+ df->D = gd_open(df->dirfile, GD_RDWR | GD_CREAT | GD_EXCL);
- df->D = gd_open(df->dirfile, GD_RDWR | GD_CREAT | ((flags & DF_CLOBBER) ?
- (GD_TRUNC | GD_TRUNCSUB) : GD_EXCL) | GD_VERBOSE);
+ /* abort on error */
+ if (df->D == NULL) {
+ fprintf(stderr, "defile: unexplained catastrophe attempting to create %s\n",
+ df->dirfile);
+ return 1;
+ } else if (gd_error(df->D) == GD_E_EXISTS && (clobber || append)) {
+ new = 0;
+ /* destroy invalid object and reopen */
+ gd_discard(df->D);
+ df->D = gd_open(df->dirfile, GD_RDWR |
+ (append ? 0 : GD_TRUNC | GD_TRUNCSUB));
- if ((gderr = gd_error(df->D))) {
- char *err = gd_error_string(df->D, NULL, 0);
- fprintf(stderr, "defile: getdata error: %s\n", err);
- free(err);
- if (gderr == GD_E_CREAT) { /* no clobber: allow the input to try again */
- pthread_mutex_lock(&state_mx);
- DF_OUTSTATE(INIT);
- pthread_mutex_unlock(&state_mx);
- } else
+ if (df->D == NULL) {
+ fprintf(stderr,
+ "defile: unexplained catastrophe attempting to create %s\n",
+ df->dirfile);
return 1;
+ }
+ } else if (gd_error(df->D) == GD_E_OK) {
+ pthread_mutex_lock(&config_mx);
+ df->mode_flags &= ~DF_MODE_APPEND;
+ pthread_mutex_unlock(&config_mx);
+ append = 0;
}
+ if (gd_error(df->D)) {
+ char *estring = gd_error_string(df->D, NULL, 0);
+ fprintf(stderr, "defile: libgetdata: %s\n", estring);
+ gd_discard(df->D);
+ free(estring);
+ return 1;
+ }
+
+ /* set up GetData's verbose mode */
+ gd_flags(df->D, GD_VERBOSE, 0);
+ gd_verbose_prefix(df->D, "defile: ");
+
+ if (append) {
+ unsigned long long nf = gd_nframes(df->D);
+ pthread_mutex_lock(&config_mx);
+ if (df->offset < nf)
+ df->offset = nf;
+ pthread_mutex_unlock(&config_mx);
+ }
+
if (!df->quiet)
- printf("defile: writing to %s\n\n", df->dirfile);
+ printf("defile: %s %s\n\n", append ? "appending to" : new ? "created" :
+ "overwriting", df->dirfile);
return 0;
}
@@ -731,7 +789,7 @@
/* wait while the input does something */
static inline void DF_WaitWhileInputStates(int state1, int state2, int quiet)
{
- if (!quiet && df->config.mode_flags & DF_MODE_DEBUG)
+ if (!quiet && df_mode() & DF_MODE_DEBUG)
fprintf(stderr, "defile: output waiting on input for end of %s and %s\n",
DF_STATENAME(state1), DF_STATENAME(state2));
while (df->input_state == state1 || df->input_state == state2)
@@ -741,7 +799,7 @@
/* wait while the input does something */
static inline void DF_WaitWhileInputState(int state, int quiet)
{
- if (!quiet && df->config.mode_flags & DF_MODE_DEBUG)
+ if (!quiet && df_mode() & DF_MODE_DEBUG)
fprintf(stderr, "defile: output waiting on input for end of %s\n",
DF_STATENAME(state));
while (df->input_state == state)
@@ -756,7 +814,7 @@
if (!df->dirfile)
return 1;
- if (DF_CreateOutput(df->clobber ? DF_CLOBBER : 0))
+ if (DF_CreateOutput(df->clobber, df_mode() & DF_MODE_APPEND))
return 1;
if (df->output_state == DF_ST_ABORT)
@@ -766,7 +824,7 @@
DF_OUTSTATE_MX(BUILD);
/* wait for the input to build the dirfile */
- DF_WaitWhileInputStates(DF_ST_BUILD, DF_ST_INIT, 0);
+ DF_WaitWhileInputState(DF_ST_BUILD, 0);
if (df->output_state == DF_ST_ABORT)
return 1;
@@ -783,10 +841,11 @@
/* output thread */
static void *DF_OutputThread(void)
{
- int i, j, last_pass = 0;
+ int i, j, last_pass = 0, wait = 1;
size_t nwrote;
struct df_raw *r;
- unsigned long in, out, nf;
+ unsigned long nf, out;
+ const int debug = df_mode() & DF_MODE_DEBUG;
/* wait for the input to initialise */
DF_WaitWhileInputState(DF_ST_START, 0);
@@ -803,6 +862,7 @@
/* loop through raws, writing ones that have full buffer heads */
while (!last_pass) {
+ wait = 1;
if (df->input_state != DF_ST_RUN)
last_pass = 1;
@@ -818,7 +878,8 @@
if (DF_OutputFini(delete)) {
DF_OUTSTATE_MX(ERROR);
return NULL;
- } else if (DF_MakeOutput()) {
+ }
+ if (DF_MakeOutput()) {
DF_OUTSTATE_MX(ERROR);
return NULL;
}
@@ -836,26 +897,45 @@
}
}
r = df->raw + i;
- in = r->in;
+
+ /* skip this field for now, if we can't lock it */
+ if (pthread_mutex_trylock(&r->mx) == EBUSY) {
+ if (debug)
+ fprintf(stderr, "\nlockmiss on %s\n", r->name);
+ continue;
+ }
+
out = r->out;
- nf = (in == out) ? df->config.buflen - 1 :
- ((in + r->buflen - out) % r->buflen) / r->framesize - 1;
- if (nf == 0 || (!last_pass && nf <= df->bufhead))
+ nf = (r->in == out) ? df->buflen - 1 :
+ ((r->in + r->buflen - out) % r->buflen) / r->framesize - 1;
+
+ if (nf == 0 || (!last_pass && nf <= df->bufhead)) {
+ pthread_mutex_unlock(&r->mx);
continue;
+ }
+
+ if (!(df_mode() & DF_MODE_ASYNC)) {
+ /* in synchronous mode, the reference field is special: we only write
+ * frames to it if all other fields have those frames committed */
+ if (i == df->ref_ind) {
+ /* adjust for shorter sticks */
+ for (j = 0; j < df->nraw; ++j)
+ if (j != df->ref_ind)
+ if (nf > df->raw[j].ahead)
+ nf = df->raw[j].ahead;
+ } else {
+ /* in synchronous mode, we never want to get more than one buflen
+ * ahead of the referenc efield */
+ if (r->ahead + nf >= df->buflen)
+ nf = df->buflen - r->ahead - 1;
+ }
- /* in synchronous mode, the reference field is special: we only write
- * frames to it if all other fields have those frames committed */
- if (i == df->ref_ind && !(df->config.mode_flags & DF_MODE_ASYNC)) {
- /* adjust for shorter sticks */
- for (j = 0; j < df->nraw; ++j)
- if (j != df->ref_ind)
- if (nf > df->raw[j].ahead)
- nf = df->raw[j].ahead;
-
/* check again */
- if (nf == 0 || (!last_pass && nf <= df->bufhead))
+ if (nf == 0 || (!last_pass && nf <= df->bufhead)) {
+ pthread_mutex_unlock(&r->mx);
continue;
+ }
}
/* point the local output pointer to the next datum to write */
@@ -876,6 +956,7 @@
/* bail on error */
if (gd_error(df->D)) {
DF_OUTSTATE_MX(ERROR);
+ pthread_mutex_unlock(&r->mx);
return NULL;
}
@@ -886,12 +967,13 @@
if (j != df->ref_ind) /* XXX */
df->raw[j].ahead -= nf;
} else
- r->ahead += nwrote / r->spf;
+ r->ahead += nf;
if (nwrote > 0) {
last_pass = 0;
- if (df->config.mode_flags & DF_MODE_DEBUG) {
- if (i != df->ref_ind && !(df->config.mode_flags & DF_MODE_ASYNC))
+ wait = 0;
+ if (debug) {
+ if (i != df->ref_ind && !(df_mode() & DF_MODE_ASYNC))
fprintf(stderr, "defile: output %lu %s of field %s "
"(lead is %lu %s)\n",
nf, (nf == 1) ? "frame" : "frames", r->name, r->ahead,
@@ -904,15 +986,19 @@
/* update write pointer */
r->out = (r->out + nwrote * GD_SIZE(r->type)) % r->buflen;
+ pthread_mutex_unlock(&r->mx);
/* update write counts */
+ pthread_mutex_lock(&rate_mx);
df->wpartial += nwrote * GD_SIZE(r->type);
while (df->wpartial >= df->wframesize) {
df->nwrote++;
df->wpartial -= df->wframesize;
}
+ pthread_mutex_unlock(&rate_mx);
}
- usleep(10000);
+ if (wait)
+ usleep(10000);
}
/* finished */
@@ -947,16 +1033,19 @@
static unsigned long long lw = 0;
static int init = 2;
- const char eol = (df->config.mode_flags & DF_MODE_DEBUG) ? '\n' : '\r';
+ const char eol = (df_mode() & DF_MODE_DEBUG) ? '\n' : '\r';
double delta, drate;
- unsigned long long nf;
- const unsigned long long nw = df->nwrote;
- const unsigned long long nr = df->nread;
+ unsigned long long nf, nw, nr;
char si;
struct timezone tz;
struct timeval now;
+ pthread_mutex_lock(&rate_mx);
+ nw = df->nwrote;
+ nr = df->nread;
+ pthread_mutex_unlock(&rate_mx);
+
if (df->quiet)
return rate;
@@ -1063,6 +1152,12 @@
struct sigaction action;
const struct df_optdef options[] = {
+ { DF_WOPT_APPEND, DF_OPT_NO_ARG, 'A', "append", "Append", 0, NULL,
+ "append data to an already existing output"
+ },
+ { DF_WOPT_NOAPPEND, DF_OPT_NO_ARG, 'A', "no-append", "NoAppend", 0, NULL,
+ "don't append data (default)"
+ },
{ DF_WOPT_ASYNC, DF_OPT_NO_ARG, 'a', "asyncrhonous", "Asynchronous", 0,
NULL, "operate in asynchronous mode"
},
@@ -1075,6 +1170,12 @@
{ DF_WOPT_NOCLOBBER, DF_OPT_NO_ARG, 0, "no-clobber", "NoClobber", 0, NULL,
"don't overwrite an existing dirfile (default)"
},
+ { DF_WOPT_CUTDIR, DF_OPT_ARG_RQ, '0', "cut-dirs", "CutDirs", 0, "NUM",
+ "remove all but the last NUM path elements from the output name"
+ },
+ { DF_WOPT_NOCUTDIR, DF_OPT_NO_ARG, '0', "no-cut-dirs", "NpCutDirs", 0, NULL,
+ "don't remove path elements from the output name (default)"
+ },
{ DF_WOPT_DIR, DF_OPT_ARG_RQ, 'D', "directory", "Directory", DF_OPT_EXPAND,
"DIR", "write the output dirfile as a subdirectory under DIR"
},
@@ -1084,6 +1185,14 @@
{ DF_WOPT_NODEBUG, DF_OPT_NO_ARG, 'd', "no-debug", "NoDebug", DF_OPT_PLUS,
NULL, "suppress printing debugging messages (default)"
},
+ { DF_WOPT_FOLLOW, DF_OPT_NO_ARG, 'f', "follow", "Follow", 0, NULL,
+ "keep monitoring the input and write data to the outpu as it becomes "
+ "available"
+ },
+ { DF_WOPT_NOFOLLOW, DF_OPT_NO_ARG, 'f', "no-follow", "NoFollow",
+ DF_OPT_PLUS, NULL,
+ "exit successfully upon reaching the end of the input (default)"
+ },
{ DF_WOPT_ADD, DF_OPT_ARG_RQ, 'F', "include", "IncludeField", 0, "FIELD",
"include FIELD in the output dirfile, if available"
},
@@ -1117,29 +1226,28 @@
{ DF_WOPT_OUTPUT, DF_OPT_ARG_RQ, 'o', "output", "Output", DF_OPT_EXPAND,
"PATH", "write data to a dirfile called PATH"
},
- { DF_WOPT_FOLLOW, DF_OPT_NO_ARG, 'f', "follow", "Follow", 0, NULL,
- "keep monitoring the input and write data to the outpu as it becomes "
- "available"
- },
- { DF_WOPT_NOFOLLOW, DF_OPT_NO_ARG, 'f', "no-follow", "NoFollow",
- DF_OPT_PLUS, NULL,
- "exit successfully upon reaching the end of the input (default)"
- },
{ DF_WOPT_QUIET, DF_OPT_NO_ARG, 'q', "quiet", "Quiet", 0, NULL,
"be less verbose"
},
{ DF_WOPT_VERBOSE, DF_OPT_NO_ARG, 'q', "verbose", "Verbose", DF_OPT_PLUS,
NULL, "be normally verbose"
},
+#if 0
+ { DF_WOPT_SKIP, DF_OPT_ARG_RQ, 's', "skip", "SkipFrames", 0, "NUM",
+ "skip the first NUM frames of the input (default: 0)"
+ },
+#endif
{ DF_WOPT_TYPE, DF_OPT_ARG_RQ, 't', "type", "InputType", 0, "TYPE",
"use TYPE as the input data type"
},
{ DF_WOPT_AUTOTYPE, DF_OPT_NO_ARG, 't', "autotype", "AutoType", DF_OPT_PLUS,
NULL, "attempt to automatically determine the data type of the input"
},
- // { DF_WOPT_GZIP, DF_OPT_NO_ARG, 'z', "gzip", "GZip", 0, NULL,
- // "gzip compress the output raw data"
- // },
+#if 0
+ { DF_WOPT_GZIP, DF_OPT_NO_ARG, 'z', "gzip", "GZip", 0, NULL,
+ "gzip compress the output raw data"
+ },
+#endif
DF_OPT_ENDOPT
};
@@ -1163,7 +1271,7 @@
df->iargv = malloc(sizeof(char*));
df->iargv[0] = argv[0];
df->iargc = 1;
- df->config.buflen = DF_BUFLEN;
+ df->buflen = DF_BUFLEN;
/* initialise LTDL and register its cleanup function */
lt_dlinit();
@@ -1172,10 +1280,10 @@
DF_Parse(argc, argv, &rcd, df);
/* sanitise */
- if (df->config.buflen < df->bufhead)
- df->config.buflen = df->bufhead;
- if (df->config.buflen < 2)
- df->config.buflen = 2;
+ if (df->buflen < df->bufhead)
+ df->buflen = df->bufhead;
+ if (df->buflen < 2)
+ df->buflen = 2;
if (!df->quiet)
puts(PACKAGE_STRING " " DEFILE_COPYRIGHT);
@@ -1251,7 +1359,7 @@
/* wait until the initialisation is done */
fflush(stdout);
- DF_WaitWhileInputStates(DF_ST_START, DF_ST_INIT, 1);
+ DF_WaitWhileInputState(DF_ST_START, 1);
DF_WaitWhileInputState(DF_ST_BUILD, 1);
/* record our "start" time */
Modified: trunk/defile/input/Makefile.am
===================================================================
--- trunk/defile/input/Makefile.am 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/input/Makefile.am 2012-03-22 23:01:51 UTC (rev 684)
@@ -20,11 +20,10 @@
#
AUTOMAKE_OPTIONS = foreign
+LDFLAGS = -module -shared -release $(PACKAGE_VERSION)
module_LTLIBRARIES = libdefile-dirfile.la libdefile-ascii.la
libdefile_dirfile_la_SOURCES = dirfile.c
-libdefile_dirfile_la_LDFLAGS = -module
libdefile_dirfile_la_LIBADD = ../lib/libdefile.la
libdefile_ascii_la_SOURCES = ascii.c
-libdefile_ascii_la_LDFLAGS = -module
libdefile_ascii_la_LIBADD = ../lib/libdefile.la
Modified: trunk/defile/input/ascii.c
===================================================================
--- trunk/defile/input/ascii.c 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/input/ascii.c 2012-03-22 23:01:51 UTC (rev 684)
@@ -216,7 +216,7 @@
}
/* in non-persistant mode, count lines */
- if (!(config->mode_flags & DF_MODE_FOLLOW)) {
+ if (!(df_mode() & DF_MODE_FOLLOW)) {
while ((n = fgetc(s)) != EOF)
if (ferror(s)) {
perror("defile-ascii: read error");
@@ -322,7 +322,7 @@
}
/* in persistant mode, wait a bit, and then go back and try again */
- if (config->mode_flags & DF_MODE_FOLLOW)
+ if (df_mode() & DF_MODE_FOLLOW)
usleep(100000);
else /* otherwise, we're done */
break;
Modified: trunk/defile/input/dirfile.c
===================================================================
--- trunk/defile/input/dirfile.c 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/input/dirfile.c 2012-03-22 23:01:51 UTC (rev 684)
@@ -156,7 +156,7 @@
unsigned int r = 0;
ssize_t nw;
const char **field;
- const int debug = config->mode_flags & DF_MODE_DEBUG;
+ const int debug = df_mode() & DF_MODE_DEBUG;
void *buffer = NULL;
/* set the abort handler */
@@ -170,7 +170,7 @@
/* In non-persist mode, it's useful to tell the writer how many frames we
* have */
- if (!(config->mode_flags & DF_MODE_FOLLOW))
+ if (!(df_mode() & DF_MODE_FOLLOW))
nframes = (uint64_t)gd_nframes(D);
/* Initialise -- dirfiles don't contain rate information so we leave that
@@ -333,7 +333,7 @@
} while (have_data);
/* in persistant mode, wait a bit, and then go back and try again */
- if (config->mode_flags & DF_MODE_FOLLOW)
+ if (df_mode() & DF_MODE_FOLLOW)
usleep(100000);
else /* otherwise, we're done */
break;
Modified: trunk/defile/lib/defile.h
===================================================================
--- trunk/defile/lib/defile.h 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/lib/defile.h 2012-03-22 23:01:51 UTC (rev 684)
@@ -77,14 +77,13 @@
};
/* mode flags */
-#define DF_MODE_ASYNC 0x1 /* asynchronous writing */
-#define DF_MODE_FOLLOW 0x2 /* follow mode */
+#define DF_MODE_APPEND 0x1 /* append mode */
+#define DF_MODE_ASYNC 0x2 /* asynchronous writing */
#define DF_MODE_DEBUG 0x4 /* debug mode */
+#define DF_MODE_FOLLOW 0x8 /* follow mode */
/* defile configuration */
struct df_config {
- unsigned int mode_flags;
- unsigned long buflen;
int n_config;
int *value;
const char **arg;
@@ -153,6 +152,8 @@
int df_check_abort(void);
int df_field_in_output(const char *name);
const char *df_input_name(int shell_expand);
+unsigned int df_mode(void);
+unsigned long long df_offset(void);
df_abort_func_t df_on_abort(df_abort_func_t func);
char *df_shell_expand(const char *s);
int df_strcmp(const void *needle, const void *candidate);
Modified: trunk/defile/lib/internal.h
===================================================================
--- trunk/defile/lib/internal.h 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/lib/internal.h 2012-03-22 23:01:51 UTC (rev 684)
@@ -26,6 +26,7 @@
#include <getdata.h>
#include <syslog.h>
+#include <pthread.h>
#ifdef HAVE_LTDL_H
#include <ltdl.h>
@@ -41,7 +42,7 @@
/* maximum allowed length of a input type name */
#define DF_TYPE_MAXLEN 256
-/* Plugin status values */
+/* plugin state values */
#define DF_ST_ERROR -1 /* thread reported an error */
#define DF_ST_START 0 /* thread start */
#define DF_ST_BUILD 1 /* building the frame */
@@ -49,8 +50,7 @@
#define DF_ST_DONE 3 /* finished */
#define DF_ST_ABORT 4 /* thread must abort processing and exit */
-/* non-public (ie. output-only) states */
-#define DF_ST_INIT 5 /* the output couldn't initialise */
+/* special states */
#define DF_ST_SAVESTART 6 /* save the current output and restart (OUT only) */
#define DF_ST_DROPSTART 7 /* drop the current output and restart (OUT only) */
@@ -59,7 +59,7 @@
#define DF_SETSTATE(w,n,s) \
do { \
df-> w ## _state = DF_ST_ ## s; \
- if (df->config.mode_flags & DF_MODE_DEBUG) \
+ if (df->mode_flags & DF_MODE_DEBUG) \
fputs("defile: " n " state: " #s "\n", stderr); \
} while (0)
#define DF_SETSTATE_MX(w,n,s) \
@@ -80,7 +80,6 @@
((i) == DF_ST_DONE) ? "DONE" : \
((i) == DF_ST_ABORT) ? "ABORT" : \
((i) == DF_ST_ERROR) ? "ERROR" : \
- ((i) == DF_ST_INIT) ? "INIT" : \
((i) == DF_ST_DROPSTART) ? "DROPSTART" : \
((i) == DF_ST_SAVESTART) ? "SAVESTART" : \
"UNKNOWN")
@@ -97,6 +96,7 @@
unsigned long ahead; /* difference between the number of frames written to
this raw and the reference -- only used in synchronous
mode */
+ pthread_mutex_t mx; /* buffer mutex */
};
/* internal framedef */
@@ -115,14 +115,19 @@
};
struct df_defile {
- unsigned long flags;
+ /* the input plugin */
+ lt_dlhandle inhandle;
+ const struct df_input_framework *input_framework;
+
+ /* pathname handling */
char *input;
- lt_dlhandle inhandle;
char *dir;
char *type;
char *output;
char *dirfile;
+ int cutdir;
+ /* threadkeeping */
pthread_t input_id;
int input_state;
pthread_t output_id;
@@ -143,10 +148,15 @@
unsigned long long nwrote, nread;
unsigned long long nframes;
+ /* operations */
struct df_config config;
+ unsigned long long offset;
+ unsigned long buflen;
+ unsigned int mode_flags;
unsigned long bufhead;
+ /* output buffering */
int nraw;
struct df_raw *raw;
@@ -157,18 +167,18 @@
int ref_ind;
/* parser stuff */
- const struct df_input_framework *input_framework;
int mode_switch;
int quiet;
int clobber;
int iargc;
char **iargv;
- unsigned long long offset;
DIRFILE *D;
};
extern struct df_defile *const df;
extern pthread_mutex_t state_mx;
+extern pthread_mutex_t config_mx;
+extern pthread_mutex_t rate_mx;
#endif
Modified: trunk/defile/lib/libdefile.c
===================================================================
--- trunk/defile/lib/libdefile.c 2012-03-22 01:39:48 UTC (rev 683)
+++ trunk/defile/lib/libdefile.c 2012-03-22 23:01:51 UTC (rev 684)
@@ -147,28 +147,60 @@
/* skip RAW fields */
if (E->field_type == GD_RAW_ENTRY) {
- fputs("libdefile: Attempt to define RAW field via df_add_entry().\n",
+ fputs("libdefile: attempt to define RAW field via df_add_entry().\n",
stderr);
return DF_INPUT;
}
/* skip fields we've been asked to exclude */
- if (df_field_in_output(E->field))
- if (gd_add(df->D, E))
+ if (df_field_in_output(E->field)) {
+ if (df_mode() & DF_MODE_APPEND) {
+ /* check for existing field */
+ gd_entype_t type = gd_entry_type(df->D, E->field);
+ if (type == GD_NO_ENTRY)
+ return DF_INPUT;
+ else if (E->field_type != type) {
+ fprintf(stderr, "libdefile: incongruous specification of derived "
+ "field %s\n", E->field);
+ return DF_OUTPUT;
+ }
+ } else if (gd_add(df->D, E))
return DF_OUTPUT;
+ }
return DF_SUCCESS;
}
+#define DF_CHAR2ENTYPE(n) if (strcmp(t, #n) == 0) return GD_ ## n ## _ENTRY;
+const static df_type_from_token(const char *t)
+{
+ DF_CHAR2ENTYPE(BIT);
+ DF_CHAR2ENTYPE(CARRAY);
+ DF_CHAR2ENTYPE(CONST);
+ DF_CHAR2ENTYPE(DIVIDE);
+ DF_CHAR2ENTYPE(LINCOM);
+ DF_CHAR2ENTYPE(LINTERP);
+ DF_CHAR2ENTYPE(MPLEX);
+ DF_CHAR2ENTYPE(MULTIPLY);
+ DF_CHAR2ENTYPE(PHASE);
+ DF_CHAR2ENTYPE(POLYNOM);
+ DF_CHAR2ENTYPE(RECIP);
+ DF_CHAR2ENTYPE(SBIT);
+ DF_CHAR2ENTYPE(STRING);
+ DF_CHAR2ENTYPE(WINDOW);
+
+ return GD_NO_ENTRY;
+}
+
int df_add_spec(const char *spec, int fragment)
{
- char *ptr;
+ char *name, *type;
DF_CHECK_SEQUENCE(DF_ST_BUILD);
/* skip fields we've been asked to exclude, this requires isolating the
* field name; use GetData's parser to do this. */
- ptr = gd_tokenise(df->D, spec);
- if (ptr == NULL) {
+ name = gd_tokenise(df->D, spec);
+ if (name == NULL) {
if (gd_error(df->D) == GD_E_ALLOC)
return DF_OUTPUT; /* better to just fold this into gd_add_spec failures
than return DF_SYSTEM */
@@ -176,24 +208,42 @@
}
/* check OFL */
- if (!df_field_in_output(ptr)) {
- free(ptr);
+ if (!df_field_in_output(name)) {
+ free(name);
return DF_SUCCESS;
}
- free(ptr);
/* check the second token to see if it's a RAW field */
- ptr = gd_tokenise(df->D, NULL);
- if (ptr && strcmp(ptr, "RAW") == 0) {
+ type = gd_tokenise(df->D, NULL);
+ if (type && strcmp(type, "RAW") == 0) {
fputs("libdefile: Attempt to define RAW field via df_add_spec().\n",
stderr);
+ free(name);
+ free(type);
return DF_INPUT;
}
- free(ptr);
-
- /* add it */
- if (gd_add_spec(df->D, spec, fragment))
+
+ /* check for existing field in append mode */
+ if (df_mode() & DF_MODE_APPEND) {
+ gd_entype_t entype = gd_entry_type(df->D, name);
+ if (type == GD_NO_ENTRY) {
+ free(name);
+ free(type);
+ return DF_INPUT;
+ } else if (df_type_from_token(type) != entype) {
+ fprintf(stderr, "libdefile: incongruous specification of derived "
+ "field: %s\n", name);
+ free(name);
+ free(type);
+ return DF_OUTPUT;
+ }
+ } else if (gd_add_spec(df->D, spec, fragment)) {
+ free(name);
+ free(type);
return DF_OUTPUT;
+ }
+ free(name);
+ free(type);
return DF_SUCCESS;
}
@@ -252,6 +302,7 @@
static int df_set_raw(const struct df_fdef_field *f, int i, int count,
int fragment)
{
+ const int append = df_mode() & DF_MODE_APPEND;
int r, is_new = 0;
void *ptr;
@@ -270,10 +321,13 @@
perror("libdefile: strdup");
return DF_SYSTEM;
}
- /* try adding it to the dirfile */
- if (gd_add_raw(df->D, f->name, f->type, f->spf, fragment))
- return DF_OUTPUT;
+ if (!append) {
+ /* try adding it to the dirfile */
+ if (gd_add_raw(df->D, f->name, f->type, f->spf, fragment))
+ return DF_OUTPUT;
+ }
+
/* add a new record */
ptr = realloc(df->raw, sizeof(struct df_raw) * (df->nraw + 1));
if (ptr == NULL) {
@@ -297,16 +351,17 @@
df->raw[r].spf = f->spf * count;
df->raw[r].in = 0;
df->raw[r].b = NULL;
+ pthread_mutex_init(&df->raw[r].mx, NULL);
} else {
/* increase the samples per frame of this raw */
df->raw[r].spf += f->spf * count;
- if (gd_alter_raw(df->D, f->name, f->type, df->raw[r].spf, 0))
+ if (!append && gd_alter_raw(df->D, f->name, f->type, df->raw[r].spf, 0))
return DF_OUTPUT;
}
/* finalise buffer */
df->raw[r].framesize = GD_SIZE(f->type) * df->raw[r].spf;
- df->raw[r].buflen = df->raw[r].framesize * df->config.buflen;
+ df->raw[r].buflen = df->raw[r].framesize * df->buflen;
df->raw[r].out = df->raw[r].buflen - df->raw[r].framesize;
ptr = realloc(df->raw[r].b, df->raw[r].buflen);
if (ptr == NULL) {
@@ -418,7 +473,7 @@
df->rframesize += fd->framesize * count;
/* debug */
- if (df->config.mode_flags & DF_MODE_DEBUG) {
+ if (df_mode() & DF_MODE_DEBUG) {
fprintf(stderr, "defile: fdef #%i (x%i):\ndefile: size: %lli bytes\n"
"defile: [\n", df->nfd, count, (long long)df->fd[df->nfd].framesize);
for (i = 0; i < fd->n_fields; ++i) {
@@ -443,7 +498,7 @@
/* wait while the output does something */
static inline void df_wait_while_output_state(int state)
{
- if (df->config.mode_flags & DF_MODE_DEBUG)
+ if (df_mode() & DF_MODE_DEBUG)
fprintf(stderr, "defile: input waiting on output for end of %s\n",
DF_STATENAME(state));
while (df->output_state == state)
@@ -463,27 +518,26 @@
return DF_SYSTEM;
}
}
+
+ pthread_mutex_lock(&config_mx);
if (rate >= 0)
df->rate = rate;
- df->nframes = (uint64_t)nframes;
- df->offset = (uint64_t)offset;
+ df->nframes = nframes;
+ if (offset > df->offset)
+ df->offset = offset;
+ pthread_mutex_unlock(&config_mx);
/* wait for the output thread to create the dirfile */
- DF_INSTATE_MX(INIT);
+ DF_INSTATE_MX(BUILD);
df_wait_while_output_state(DF_ST_START);
- pthread_mutex_lock(&state_mx);
if (df->output_state != DF_ST_BUILD) {
- /* something went wrong: stay in start state */
- DF_INSTATE(START);
- pthread_mutex_unlock(&state_mx);
- return DF_OUTPUT;
+ /* something went wrong: time for aborting */
+ DF_INSTATE_MX(ABORT);
+ return df_input_abort();
}
/* good to go */
- DF_INSTATE(BUILD);
- pthread_mutex_unlock(&state_mx);
-
return DF_SUCCESS;
}
@@ -504,10 +558,55 @@
int df_ready(const char *reference)
{
- int missing = 0, set_reference = 1;
+ int i, missing = 0, set_reference = 1;
+ const int append = df_mode() & DF_MODE_APPEND;
+ unsigned long long offset;
DF_CHECK_SEQUENCE(DF_ST_BUILD);
+ /* in append mode, make sure the input has defined all the existing raw
+ * fields, and that all raw fields it DID define are in the dirfile
+ */
+ if (append) {
+ const char **fl;
+ gd_entry_t E;
+
+ /* check fields defined by the input */
+ for (i = 0; i < df->nraw; ++i) {
+ if (gd_entry(df->D, df->raw[i].name, &E))
+ return DF_OUTPUT;
+
+ if (E.field_type != GD_RAW_ENTRY) {
+ fprintf(stderr, "libdefile: existing raw field %s redefined as derived "
+ "type 0x%X\n", df->raw[i].name, E.field_type);
+ gd_free_entry_strings(&E);
+ return DF_OUTPUT;
+ } else if (E.spf != df->raw[i].spf) {
+ fprintf(stderr, "libdefile: existing raw field %s redefined with "
+ "different sample rate %u\n", df->raw[i].name, E.spf);
+ gd_free_entry_strings(&E);
+ return DF_OUTPUT;
+ } else if (E.data_type != df->raw[i].type) {
+ fprintf(stderr, "libdefile: existing raw field %s redefined with "
+ "different data type 0x%X\n", df->raw[i].name, E.data_type);
+ gd_free_entry_strings(&E);
+ return DF_OUTPUT;
+ }
+ gd_free_entry_strings(&E);
+ }
+
+ /* cross-check fields defined in the existing dirifle */
+ for (fl = gd_field_list_by_type(df->D, GD_RAW_ENTRY); *fl; ++fl) {
+ missing = 0;
+ df_rawsearch(df->raw, df->nraw, &missing, *fl);
+ if (missing) {
+ fprintf(stderr, "libdefile: missing definition of existing field %s",
+ *fl);
+ return DF_OUTPUT;
+ }
+ }
+ }
+
/* if there's no reference field given, or if the chosen reference field has
* been excluded by the OFL, just use whichever one GetData was going to use
* by default */
@@ -518,6 +617,7 @@
/* set the reference field, if possible */
if (reference) {
+ missing = 0;
df->ref_ind = df_rawsearch(df->raw, df->nraw, &missing, reference);
if (missing) {
fprintf(stderr, "libdefile: internal error in input plugin: bad "
@@ -528,6 +628,16 @@
gd_reference(df->D, reference);
}
+ /* seek to the start frame */
+ offset = df_offset();
+ if (offset > 0)
+ for (i = 0; i < df->nraw; ++i)
+ if (gd_seek(df->D, df->raw[i].name, offset, 0,
+ GD_SEEK_SET | GD_SEEK_WRITE) < 0)
+ {
+ return DF_OUTPUT;
+ }
+
DF_INSTATE_MX(RUN);
/* wait for the output thread to finish the metadata */
@@ -539,10 +649,9 @@
ssize_t df_nframes_allowed(int ifd)
{
int i;
- size_t nf = df->config.buflen, n;
+ size_t nf = df->buflen, n;
const struct df_infdef *fd = df->fd + ifd;
- const struct df_raw *r;
- unsigned long out;
+ struct df_raw *r;
DF_CHECK_SEQUENCE(DF_ST_RUN);
@@ -555,9 +664,10 @@
continue;
r = df->raw + fd->field[i].index;
- out = r->out;
- n = (r->in == out) ? 0 : (r->buflen -
- ((r->in + r->buflen - out) % r->buflen)) / fd->field[i].framesize;
+ pthread_mutex_lock(&r->mx);
+ n = (r->in == r->out) ? 0 : (r->buflen -
+ ((r->in + r->buflen - r->out) % r->buflen)) / fd->field[i].framesize;
+ pthread_mutex_unlock(&r->mx);
if (n < nf)
nf = n;
}
@@ -579,6 +689,7 @@
continue;
r = df->raw + f->index;
+ pthread_mutex_lock(&r->mx);
if (f->cadence == 0) {
/* contiguous data */
if (r->in + f->framesize > r->buflen) {
@@ -598,6 +709,7 @@
r->in = (r->in + GD_SIZE(r->type)) % r->buflen;
}
}
+ pthread_mutex_unlock(&r->mx);
}
}
@@ -637,17 +749,19 @@
usleep(10000);
}
- if (df->config.mode_flags & DF_MODE_DEBUG && nf_total > 0)
+ if (df_mode() & DF_MODE_DEBUG)
fprintf(stderr, "defile: buffered %zi of %zu %s offered of fdef #%i\n",
nf_total, nf_offered, (nf_offered == 1) ? "frame" : "frames", ifd);
/* update read counts */
+ pthread_mutex_lock(&rate_mx);
df->nread += (fd->framesize * nf_total) / df->rframesize;
df->rpartial += (fd->framesize * nf_total) % df->rframesize;
while (df->rpartial >= df->rframesize) {
df->nread++;
df->rpartial -= df->rframesize;
}
+ pthread_mutex_unlock(&rate_mx);
return nf_total;
}
@@ -655,9 +769,26 @@
int df_reinit(unsigned long long nframes, unsigned long long offset,
double rate, const char *name, int mode)
{
- DF_CHECK_SEQUENCE2(DF_ST_RUN, DF_ST_INIT);
+ DF_CHECK_SEQUENCE(DF_ST_RUN);
+ /* update input */
+ if (name) {
+ free(df->input);
+ df->input = strdup(name);
+ if (df->input == NULL) {
+ perror("libdefile: strdup");
+ return DF_SYSTEM;
+ }
+ }
+
+ /* restart */
+ pthread_mutex_lock(&config_mx);
+ df->offset = offset;
+ df->mode_flags &= ~DF_MODE_APPEND;
+ pthread_mutex_unlock(&config_mx);
+
/* trigger and wait for completion of the output */
+ DF_INSTATE_MX(BUILD);
if (mode == DF_REINIT_DROP) {
DF_OUTSTATE_MX(DROPSTART);
df_wait_while_output_state(DF_ST_DROPSTART);
@@ -667,36 +798,18 @@
}
/* after the restart of the output we should either be in BUILD state, or
- * else something's gone wrong. If something's gone wrong, either it's
- * catastrophic and the output has already exited (ERROR state which will
- * transition to ABORT the next time the main thread notices) or else there
- * was an error with the dirfile's name, in which case we'll be in the INIT
- * state
- */
- pthread_mutex_lock(&state_mx);
+ * else something's gone wrong. If something's gone wrong, the output has
+ * already exited (ERROR), so we might as well skip a step and just tell the
+ * input to abort now. */
if (df->output_state != DF_ST_BUILD) {
- /* half-finished? */
- DF_INSTATE(INIT);
+ DF_INSTATE_MX(ABORT);
pthread_mutex_unlock(&state_mx);
- return DF_OUTPUT;
+ return df_input_abort();
}
- /* restart */
- DF_INSTATE(BUILD);
- pthread_mutex_unlock(&state_mx);
-
if (rate >= 0)
df->rate = rate;
df->nframes = (uint64_t)nframes;
- df->offset = (uint64_t)offset;
- if (name) {
- free(df->input);
- df->input = strdup(name);
- if (df->input == NULL) {
- perror("libdefile: strdup");
- return DF_SYSTEM;
- }
- }
return DF_SUCCESS;
}
@@ -768,6 +881,7 @@
int df_add_fragment(const char *subdir, const char *name, unsigned int encoding,
unsigned long byte_sex, int parent)
{
+ const int append = df_mode() & DF_MODE_APPEND;
int ind;
struct stat stat_buf;
char *subdirpath;
@@ -786,6 +900,13 @@
if (S_ISDIR(stat_buf.st_mode))
goto DF_HAVE_DIR;
+ if (append) {
+ fprintf(stderr, "libdefile: unexpected request for creation of "
+ "non-existant directory: %s\n", subdir);
+ free(subdirpath);
+ return DF_INPUT;
+ }
+
if (mkdir(subdirpath, 0777) == 0)
goto DF_HAVE_DIR;
free(subdirpath);
@@ -805,14 +926,35 @@
perror("libdefile: strdup");
return DF_SYSTEM;
}
+ if (append)
+ return 0;
/* attempt to add the new fragment */
ind = gd_include(df->D, subdirpath, 0,
GD_CREAT | GD_EXCL | encoding | byte_sex);
free(subdirpath);
+
if (ind < 0)
return DF_OUTPUT;
return ind;
}
+
+unsigned int df_mode(void)
+{
+ unsigned int mode;
+ pthread_mutex_lock(&config_mx);
+ mode = df->mode_flags;
+ pthread_mutex_unlock(&config_mx);
+ return mode;
+}
+
+unsigned long long df_offset(void)
+{
+ unsigned long long offset;
+ pthread_mutex_lock(&config_mx);
+ offset = df->offset;
+ pthread_mutex_unlock(&config_mx);
+ return offset;
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|